From abb87914ecb4caa1becce4fc4d30c110a6b2c041 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 12 Feb 2010 15:12:28 +0200 Subject: [PATCH] Group commit for maria engine. mysql-test/suite/maria/r/group_commit.result: Test of gruoup commit. mysql-test/suite/maria/r/maria3.result: The new variables added. mysql-test/suite/maria/t/group_commit.test: Test of group commit. randgen: Randon query generator tests. randgen/conf: Random query generator tests. randgen/conf/maria_group_commit.yy: Random query generator test for group commit. storage/maria/ha_maria.cc: New variables and support procedures for group commit added. storage/maria/ma_init.c: Correct shutdown of group commit service thread and group commit. storage/maria/ma_loghandler.c: Group commit added. Initialization of variables for embeded server edded. storage/maria/ma_loghandler.h: Group commit types and routines. --- mysql-test/suite/maria/r/group_commit.result | 17 + mysql-test/suite/maria/r/maria3.result | 3 + mysql-test/suite/maria/t/group_commit.test | 71 ++ randgen/conf/maria_group_commit.yy | 181 +++++ storage/maria/ha_maria.cc | 127 +++- storage/maria/ma_init.c | 5 + storage/maria/ma_loghandler.c | 743 ++++++++++++++++--- storage/maria/ma_loghandler.h | 16 + 8 files changed, 1045 insertions(+), 118 deletions(-) create mode 100644 mysql-test/suite/maria/r/group_commit.result create mode 100644 mysql-test/suite/maria/t/group_commit.test create mode 100644 randgen/conf/maria_group_commit.yy diff --git a/mysql-test/suite/maria/r/group_commit.result b/mysql-test/suite/maria/r/group_commit.result new file mode 100644 index 00000000000..75d7318340a --- /dev/null +++ b/mysql-test/suite/maria/r/group_commit.result @@ -0,0 +1,17 @@ +drop table if exists t1; +create table t1 (a int); +SET GLOBAL maria_group_commit="NONE"; +SET GLOBAL maria_group_commit_interval= 0; +SET GLOBAL maria_group_commit="NONE"; +SET GLOBAL maria_group_commit_interval= 100; +SET GLOBAL maria_group_commit="HARD"; +SET GLOBAL maria_group_commit_interval= 0; +SET GLOBAL maria_group_commit="HARD"; +SET GLOBAL maria_group_commit_interval= 100; +SET GLOBAL maria_group_commit="SOFT"; +SET GLOBAL maria_group_commit_interval= 0; +SET GLOBAL maria_group_commit="SOFT"; +SET GLOBAL maria_group_commit_interval= 100; +SET GLOBAL maria_group_commit="NONE"; +SET GLOBAL maria_group_commit_interval= 0; +drop table t1; diff --git a/mysql-test/suite/maria/r/maria3.result b/mysql-test/suite/maria/r/maria3.result index a49b25beb33..a2a6a7214fa 100644 --- a/mysql-test/suite/maria/r/maria3.result +++ b/mysql-test/suite/maria/r/maria3.result @@ -306,6 +306,8 @@ Variable_name Value maria_block_size 8192 maria_checkpoint_interval 30 maria_force_start_after_recovery_failures 0 +maria_group_commit none +maria_group_commit_interval 0 maria_log_file_size 4294959104 maria_log_purge_type immediate maria_max_sort_file_size 9223372036853727232 @@ -328,6 +330,7 @@ Maria_pagecache_read_requests # Maria_pagecache_reads # Maria_pagecache_write_requests # Maria_pagecache_writes # +Maria_transaction_log_syncs # create table t1 (b char(0)); insert into t1 values(NULL),(""); select length(b) from t1; diff --git a/mysql-test/suite/maria/t/group_commit.test b/mysql-test/suite/maria/t/group_commit.test new file mode 100644 index 00000000000..abc79270919 --- /dev/null +++ b/mysql-test/suite/maria/t/group_commit.test @@ -0,0 +1,71 @@ +# Test different ways of syncing (mostly syntax) + +--disable_warnings +drop table if exists t1; +--enable_warnings + +create table t1 (a int); + +SET GLOBAL maria_group_commit="NONE"; +SET GLOBAL maria_group_commit_interval= 0; +--disable_query_log +let $num = 5000; +while ($num) +{ + insert into t1 values (1); + dec $num; +} +--enable_query_log +SET GLOBAL maria_group_commit="NONE"; +SET GLOBAL maria_group_commit_interval= 100; +--disable_query_log +let $num = 5000; +while ($num) +{ + insert into t1 values (1); + dec $num; +} +--enable_query_log +SET GLOBAL maria_group_commit="HARD"; +SET GLOBAL maria_group_commit_interval= 0; +--disable_query_log +let $num = 5000; +while ($num) +{ + insert into t1 values (1); + dec $num; +} +--enable_query_log +SET GLOBAL maria_group_commit="HARD"; +SET GLOBAL maria_group_commit_interval= 100; +--disable_query_log +let $num = 5000; +while ($num) +{ + insert into t1 values (1); + dec $num; +} +--enable_query_log +SET GLOBAL maria_group_commit="SOFT"; +SET GLOBAL maria_group_commit_interval= 0; +--disable_query_log +let $num = 5000; +while ($num) +{ + insert into t1 values (1); + dec $num; +} +--enable_query_log +SET GLOBAL maria_group_commit="SOFT"; +SET GLOBAL maria_group_commit_interval= 100; +--disable_query_log +let $num = 5000; +while ($num) +{ + insert into t1 values (1); + dec $num; +} +--enable_query_log +SET GLOBAL maria_group_commit="NONE"; +SET GLOBAL maria_group_commit_interval= 0; +drop table t1; diff --git a/randgen/conf/maria_group_commit.yy b/randgen/conf/maria_group_commit.yy new file mode 100644 index 00000000000..e136cbf7b07 --- /dev/null +++ b/randgen/conf/maria_group_commit.yy @@ -0,0 +1,181 @@ +# test of group commit switching + +query: + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + select | insert | update| delete | + change_group_commit | change_interval; + + +select: + SELECT select_item FROM join where order_by limit; + +select_item: + * | X . _field ; + +join: + _table AS X | + _table AS X LEFT JOIN _table AS Y ON ( X . _field = Y . _field ) ; + +where: + | + WHERE X . _field < value | + WHERE X . _field > value | + WHERE X . _field = value ; + +where_delete: + | + WHERE _field < value | + WHERE _field > value | + WHERE _field = value ; + +order_by: + | ORDER BY X . _field ; + +limit: + | LIMIT _digit ; + +insert: + INSERT INTO _table ( _field , _field ) VALUES ( value , value ) ; + +update: + UPDATE _table AS X SET _field = value where order_by limit ; + +delete: + DELETE FROM _table where_delete LIMIT _digit ; + +value: + ' _letter ' | _digit | _date | _datetime | _time | _english ; + +change_group_commit: + SET GLOBAL MARIA_GROUP_COMMIT=none_soft_hard; + +none_soft_hard: + NONE | SOFT | HARD; + +change_interval: + set_interval | set_interval | set_interval | set_interval | + drop_interval; + +set_interval: + SET GLOBAL MARIA_GROUP_COMMIT_INTERVAL=_tinyint_unsigned; + +drop_interval: + SET GLOBAL MARIA_GROUP_COMMIT_INTERVAL=0; diff --git a/storage/maria/ha_maria.cc b/storage/maria/ha_maria.cc index 5ec0a6b748e..4da650a5f00 100644 --- a/storage/maria/ha_maria.cc +++ b/storage/maria/ha_maria.cc @@ -102,22 +102,40 @@ TYPELIB maria_translog_purge_type_typelib= array_elements(maria_translog_purge_type_names) - 1, "", maria_translog_purge_type_names, NULL }; + +/* transactional log directory sync */ const char *maria_sync_log_dir_names[]= { "NEVER", "NEWFILE", "ALWAYS", NullS }; - TYPELIB maria_sync_log_dir_typelib= { array_elements(maria_sync_log_dir_names) - 1, "", maria_sync_log_dir_names, NULL }; +/* transactional log group commit */ +const char *maria_group_commit_names[]= +{ + "none", "hard", "soft", NullS +}; +TYPELIB maria_group_commit_typelib= +{ + array_elements(maria_group_commit_names) - 1, "", + maria_group_commit_names, NULL +}; + /** Interval between background checkpoints in seconds */ static ulong checkpoint_interval; static void update_checkpoint_interval(MYSQL_THD thd, struct st_mysql_sys_var *var, void *var_ptr, const void *save); +static void update_maria_group_commit(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); +static void update_maria_group_commit_interval(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save); /** After that many consecutive recovery failures, remove logs */ static ulong force_start_after_recovery_failures; static void update_log_file_size(MYSQL_THD thd, @@ -164,6 +182,24 @@ static MYSQL_SYSVAR_ULONG(log_file_size, log_file_size, NULL, update_log_file_size, TRANSLOG_FILE_SIZE, TRANSLOG_MIN_FILE_SIZE, 0xffffffffL, TRANSLOG_PAGE_SIZE); +static MYSQL_SYSVAR_ENUM(group_commit, maria_group_commit, + PLUGIN_VAR_RQCMDARG, + "Specifies maria group commit mode. " + "Possible values are \"none\" (no group commit), " + "\"hard\" (with waiting to actual commit), " + "\"soft\" (no wait for commit (DANGEROUS!!!))", + NULL, update_maria_group_commit, + TRANSLOG_GCOMMIT_NONE, &maria_group_commit_typelib); + +static MYSQL_SYSVAR_ULONG(group_commit_interval, maria_group_commit_interval, + PLUGIN_VAR_RQCMDARG, + "Interval between commite in microseconds (1/1000000c)." + " 0 stands for no waiting" + " for other threads to come and do a commit in \"hard\" mode and no" + " sync()/commit at all in \"soft\" mode. Option has only an effect" + " if maria_group_commit is used", + NULL, update_maria_group_commit_interval, 0, 0, UINT_MAX, 1); + static MYSQL_SYSVAR_ENUM(log_purge_type, log_purge_type, PLUGIN_VAR_RQCMDARG, "Specifies how maria transactional log will be purged. " @@ -3278,6 +3314,8 @@ static struct st_mysql_sys_var* system_variables[]= { MYSQL_SYSVAR(block_size), MYSQL_SYSVAR(checkpoint_interval), MYSQL_SYSVAR(force_start_after_recovery_failures), + MYSQL_SYSVAR(group_commit), + MYSQL_SYSVAR(group_commit_interval), MYSQL_SYSVAR(page_checksum), MYSQL_SYSVAR(log_dir_path), MYSQL_SYSVAR(log_file_size), @@ -3308,6 +3346,92 @@ static void update_checkpoint_interval(MYSQL_THD thd, ma_checkpoint_init(*(ulong *)var_ptr= (ulong)(*(long *)save)); } +/** + @brief Updates group commit mode +*/ + +static void update_maria_group_commit(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong value= (ulong)*((long *)var_ptr); + DBUG_ENTER("update_maria_group_commit"); + DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu", + value, (ulong)(*(long *)save), + maria_group_commit_interval)); + /* old value */ + switch (value) { + case TRANSLOG_GCOMMIT_NONE: + break; + case TRANSLOG_GCOMMIT_HARD: + translog_hard_group_commit(FALSE); + break; + case TRANSLOG_GCOMMIT_SOFT: + translog_soft_sync(FALSE); + if (maria_group_commit_interval) + translog_soft_sync_end(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + value= *(ulong *)var_ptr= (ulong)(*(long *)save); + translog_sync(); + /* new value */ + switch (value) { + case TRANSLOG_GCOMMIT_NONE: + break; + case TRANSLOG_GCOMMIT_HARD: + translog_hard_group_commit(TRUE); + break; + case TRANSLOG_GCOMMIT_SOFT: + translog_soft_sync(TRUE); + /* variable change made under global lock so we can just read it */ + if (maria_group_commit_interval) + translog_soft_sync_start(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + DBUG_VOID_RETURN; +} + +/** + @brief Updates group commit interval +*/ + +static void update_maria_group_commit_interval(MYSQL_THD thd, + struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong new_value= (ulong)*((long *)save); + ulong *value_ptr= (ulong*) var_ptr; + DBUG_ENTER("update_maria_group_commit_interval"); + DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu", + *value_ptr, new_value, maria_group_commit)); + + /* variable change made under global lock so we can just read it */ + switch (maria_group_commit) { + case TRANSLOG_GCOMMIT_NONE: + *value_ptr= new_value; + translog_set_group_commit_interval(new_value); + break; + case TRANSLOG_GCOMMIT_HARD: + *value_ptr= new_value; + translog_set_group_commit_interval(new_value); + break; + case TRANSLOG_GCOMMIT_SOFT: + if (*value_ptr) + translog_soft_sync_end(); + translog_set_group_commit_interval(new_value); + if ((*value_ptr= new_value)) + translog_soft_sync_start(); + break; + default: + DBUG_ASSERT(0); /* impossible */ + } + DBUG_VOID_RETURN; +} + /** @brief Updates the transaction log file limit. */ @@ -3330,6 +3454,7 @@ static SHOW_VAR status_variables[]= { {"Maria_pagecache_reads", (char*) &maria_pagecache_var.global_cache_read, SHOW_LONGLONG}, {"Maria_pagecache_write_requests", (char*) &maria_pagecache_var.global_cache_w_requests, SHOW_LONGLONG}, {"Maria_pagecache_writes", (char*) &maria_pagecache_var.global_cache_write, SHOW_LONGLONG}, + {"Maria_transaction_log_syncs", (char*) &translog_syncs, SHOW_LONGLONG}, {NullS, NullS, SHOW_LONG} }; diff --git a/storage/maria/ma_init.c b/storage/maria/ma_init.c index 1f2eddd7e30..552b0767bec 100644 --- a/storage/maria/ma_init.c +++ b/storage/maria/ma_init.c @@ -82,6 +82,11 @@ void maria_end(void) maria_inited= maria_multi_threaded= FALSE; ft_free_stopwords(); ma_checkpoint_end(); + if (translog_status == TRANSLOG_OK) + { + translog_soft_sync_end(); + translog_sync(); + } if ((trid= trnman_get_max_trid()) > max_trid_in_control_file) { /* diff --git a/storage/maria/ma_loghandler.c b/storage/maria/ma_loghandler.c index 536336d795d..73a2737b223 100644 --- a/storage/maria/ma_loghandler.c +++ b/storage/maria/ma_loghandler.c @@ -18,6 +18,7 @@ #include "ma_blockrec.h" /* for some constants and in-write hooks */ #include "ma_key_recover.h" /* For some in-write hooks */ #include "ma_checkpoint.h" +#include "ma_servicethread.h" /* On Windows, neither my_open() nor my_sync() work for directories. @@ -47,6 +48,15 @@ #include #endif +/** @brief protects checkpoint_in_progress */ +static pthread_mutex_t LOCK_soft_sync; +/** @brief for killing the background checkpoint thread */ +static pthread_cond_t COND_soft_sync; +/** @brief control structure for checkpoint background thread */ +static MA_SERVICE_THREAD_CONTROL soft_sync_control= + {THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync}; + + /* transaction log file descriptor */ typedef struct st_translog_file { @@ -123,11 +133,25 @@ struct st_translog_buffer TRANSLOG_ADDRESS next_buffer_offset; /* Previous buffer offset to detect it flush finish */ TRANSLOG_ADDRESS prev_buffer_offset; + /* + If the buffer was forced to close it save value of its horizon + otherwise LSN_IMPOSSIBLE + */ + TRANSLOG_ADDRESS pre_force_close_horizon; /* How much is written (or will be written when copy_to_buffer_in_progress become 0) to this buffer */ translog_size_t size; + /* + When moving from one log buffer to another, we write the last of the + previous buffer to file and then move to start using the new log + buffer. In the case of a part filed last page, this page is not moved + to the start of the new buffer but instead we set the 'skip_data' + variable to tell us how much data at the beginning of the buffer is not + relevant. + */ + uint skipped_data; /* File handler for this buffer */ TRANSLOG_FILE *file; /* Threads which are waiting for buffer filling/freeing */ @@ -304,6 +328,7 @@ struct st_translog_descriptor */ pthread_mutex_t log_flush_lock; pthread_cond_t log_flush_cond; + pthread_cond_t new_goal_cond; /* Protects changing of headers of finished files (max_lsn) */ pthread_mutex_t file_header_lock; @@ -344,13 +369,39 @@ static struct st_translog_descriptor log_descriptor; ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE; ulong log_file_size= TRANSLOG_FILE_SIZE; +/* sync() of log files directory mode */ ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE; +ulong maria_group_commit= TRANSLOG_GCOMMIT_NONE; +ulong maria_group_commit_interval= 0; /* Marker for end of log */ static uchar end_of_log= 0; #define END_OF_LOG &end_of_log +/** + Switch for "soft" sync (no real sync() but periodical sync by service + thread) +*/ +static volatile my_bool soft_sync= FALSE; +/** + Switch for "hard" group commit mode +*/ +static volatile my_bool hard_group_commit= FALSE; +/** + File numbers interval which have to be sync() +*/ +static uint32 soft_sync_min= 0; +static uint32 soft_sync_max= 0; +static uint32 soft_need_sync= 1; +/** + stores interval in microseconds +*/ +static uint32 group_commit_wait= 0; enum enum_translog_status translog_status= TRANSLOG_UNINITED; +ulonglong translog_syncs= 0; /* Number of sync()s */ + +/* time of last flush */ +static ulonglong flush_start= 0; /* chunk types */ #define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */ @@ -980,12 +1031,17 @@ static TRANSLOG_FILE *get_logfile_by_number(uint32 file_no) static TRANSLOG_FILE *get_current_logfile() { TRANSLOG_FILE *file; + DBUG_ENTER("get_current_logfile"); rw_rdlock(&log_descriptor.open_files_lock); + DBUG_PRINT("info", ("max_file: %lu min_file: %lu open_files: %lu", + (ulong) log_descriptor.max_file, + (ulong) log_descriptor.min_file, + (ulong) log_descriptor.open_files.elements)); DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 == log_descriptor.open_files.elements); file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **); rw_unlock(&log_descriptor.open_files_lock); - return (file); + DBUG_RETURN(file); } uchar NEAR maria_trans_file_magic[]= @@ -1069,6 +1125,7 @@ static my_bool translog_write_file_header() static my_bool translog_max_lsn_to_header(File file, LSN lsn) { uchar lsn_buff[LSN_STORE_SIZE]; + my_bool rc; DBUG_ENTER("translog_max_lsn_to_header"); DBUG_PRINT("enter", ("File descriptor: %ld " "lsn: (%lu,0x%lx)", @@ -1077,11 +1134,17 @@ static my_bool translog_max_lsn_to_header(File file, LSN lsn) lsn_store(lsn_buff, lsn); - DBUG_RETURN(my_pwrite(file, lsn_buff, - LSN_STORE_SIZE, - (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), - log_write_flags) != 0 || - my_sync(file, MYF(MY_WME)) != 0); + rc= (my_pwrite(file, lsn_buff, + LSN_STORE_SIZE, + (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE), + log_write_flags) != 0 || + my_sync(file, MYF(MY_WME)) != 0); + /* + We should not increase counter in case of error above, but it is so + unlikely that we can ignore this case + */ + translog_syncs++; + DBUG_RETURN(rc); } @@ -1423,7 +1486,9 @@ LSN translog_get_file_max_lsn_stored(uint32 file) static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num) { DBUG_ENTER("translog_buffer_init"); - buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; + buffer->pre_force_close_horizon= + buffer->prev_last_lsn= buffer->last_lsn= + LSN_IMPOSSIBLE; DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx", (ulong) buffer)); @@ -1435,6 +1500,7 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num) memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER); /* Buffer size */ buffer->size= 0; + buffer->skipped_data= 0; /* cond of thread which is waiting for buffer filling */ if (pthread_cond_init(&buffer->waiting_filling_buffer, 0)) DBUG_RETURN(1); @@ -1489,7 +1555,10 @@ static my_bool translog_close_log_file(TRANSLOG_FILE *file) TODO: sync only we have changed the log */ if (!file->is_sync) + { rc= my_sync(file->handler.file, MYF(MY_WME)); + translog_syncs++; + } rc|= my_close(file->handler.file, MYF(MY_WME)); my_free(file, MYF(0)); return test(rc); @@ -2044,7 +2113,8 @@ static void translog_start_buffer(struct st_translog_buffer *buffer, (ulong) LSN_OFFSET(log_descriptor.horizon), (ulong) LSN_OFFSET(log_descriptor.horizon))); DBUG_ASSERT(buffer_no == buffer->buffer_no); - buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; + buffer->pre_force_close_horizon= + buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE; DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx", (ulong) buffer)); buffer->offset= log_descriptor.horizon; @@ -2052,6 +2122,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer, buffer->file= get_current_logfile(); buffer->overlay= 0; buffer->size= 0; + buffer->skipped_data= 0; translog_cursor_init(cursor, buffer, buffer_no); DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx " "chaser: %d Size: %lu (%lu)", @@ -2523,6 +2594,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) TRANSLOG_ADDRESS offset= buffer->offset; TRANSLOG_FILE *file= buffer->file; uint8 ver= buffer->ver; + uint skipped_data; DBUG_ENTER("translog_buffer_flush"); DBUG_PRINT("enter", ("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu", @@ -2557,6 +2629,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) disk */ file= buffer->file; + skipped_data= buffer->skipped_data; + DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE); for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE; i < buffer->size; i+= TRANSLOG_PAGE_SIZE, pg++) @@ -2573,13 +2647,16 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN) DBUG_RETURN(1); - if (pagecache_inject(log_descriptor.pagecache, + if (pagecache_write_part(log_descriptor.pagecache, &file->handler, pg, 3, buffer->buffer + i, PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_LEFT_UNLOCKED, - PAGECACHE_PIN_LEFT_UNPINNED, 0, - LSN_IMPOSSIBLE)) + PAGECACHE_PIN_LEFT_UNPINNED, + PAGECACHE_WRITE_DONE, 0, + LSN_IMPOSSIBLE, + skipped_data, + TRANSLOG_PAGE_SIZE - skipped_data)) { DBUG_PRINT("error", ("Can't write page (%lu,0x%lx) to pagecache, error: %d", @@ -2589,10 +2666,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) translog_stop_writing(); DBUG_RETURN(1); } + skipped_data= 0; } file->is_sync= 0; - if (my_pwrite(file->handler.file, buffer->buffer, - buffer->size, LSN_OFFSET(buffer->offset), + if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data, + buffer->size - buffer->skipped_data, + LSN_OFFSET(buffer->offset) + buffer->skipped_data, log_write_flags)) { DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu " @@ -2985,6 +3064,7 @@ restart: uchar *from, *table= NULL; int is_last_unfinished_page; uint last_protected_sector= 0; + uint skipped_data= curr_buffer->skipped_data; TRANSLOG_FILE file_copy; uint8 ver= curr_buffer->ver; translog_wait_for_writers(curr_buffer); @@ -2997,7 +3077,38 @@ restart: } DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset)); from= curr_buffer->buffer + (addr - curr_buffer->offset); - memcpy(buffer, from, TRANSLOG_PAGE_SIZE); + if (skipped_data && addr == curr_buffer->offset) + { + /* + We read page part of which is not present in buffer, + so we should read absent part from file (page cache actually) + */ + file= get_logfile_by_number(file_no); + DBUG_ASSERT(file != NULL); + /* + it's ok to not lock the page because: + - The log handler has it's own page cache. + - There is only one thread that can access the log + cache at a time + */ + if (!(buffer= pagecache_read(log_descriptor.pagecache, + &file->handler, + LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE, + 3, buffer, + PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, + NULL))) + DBUG_RETURN(NULL); + } + else + skipped_data= 0; /* Read after skipped in buffer data */ + /* + Now we have correct data in buffer up to 'skipped_data'. The + following memcpy() will move the data from the internal buffer + that was not yet on disk. + */ + memcpy(buffer + skipped_data, from + skipped_data, + TRANSLOG_PAGE_SIZE - skipped_data); /* We can use copy then in translog_page_validator() because it do not put it permanently somewhere. @@ -3291,6 +3402,7 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr) uint32 next_page_offset, page_rest; uint32 i; File fd; + int rc; TRANSLOG_VALIDATOR_DATA data; char path[FN_REFLEN]; uchar page_buff[TRANSLOG_PAGE_SIZE]; @@ -3316,14 +3428,19 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr) TRANSLOG_PAGE_SIZE); page_rest= next_page_offset - LSN_OFFSET(addr); memset(page_buff, TRANSLOG_FILLER, page_rest); - if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 || - ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) || - (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr), - log_write_flags)) || - my_sync(fd, MYF(MY_WME))) | - my_close(fd, MYF(MY_WME))) || - (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && - sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)))) + rc= ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 || + ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) || + (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr), + log_write_flags)) || + my_sync(fd, MYF(MY_WME))))); + translog_syncs++; + rc|= (fd > 0 && my_close(fd, MYF(MY_WME))); + if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS) + { + rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); + translog_syncs++; + } + if (rc) DBUG_RETURN(1); /* fix the horizon */ @@ -3483,7 +3600,10 @@ my_bool translog_init_with_table(const char *directory, my_bool version_changed= 0; DBUG_ENTER("translog_init_with_table"); + translog_syncs= 0; + flush_start= 0; id_to_share= NULL; + log_descriptor.directory_fd= -1; log_descriptor.is_everything_flushed= 1; log_descriptor.flush_in_progress= 0; @@ -3511,6 +3631,7 @@ my_bool translog_init_with_table(const char *directory, pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock, MY_MUTEX_INIT_FAST) || pthread_cond_init(&log_descriptor.log_flush_cond, 0) || + pthread_cond_init(&log_descriptor.new_goal_cond, 0) || my_rwlock_init(&log_descriptor.open_files_lock, NULL) || my_init_dynamic_array(&log_descriptor.open_files, @@ -3912,7 +4033,6 @@ my_bool translog_init_with_table(const char *directory, log_descriptor.flushed= log_descriptor.horizon; log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset; log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */ - log_descriptor.previous_flush_horizon= log_descriptor.horizon; /* Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially) address of the next LSN and we want indicate that all LSNs that are @@ -3995,6 +4115,10 @@ my_bool translog_init_with_table(const char *directory, It is beginning of the log => there is no LSNs in the log => There is no harm in leaving it "as-is". */ + log_descriptor.previous_flush_horizon= log_descriptor.horizon; + DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor. + previous_flush_horizon))); DBUG_RETURN(0); } file_no--; @@ -4070,6 +4194,9 @@ my_bool translog_init_with_table(const char *directory, translog_free_record_header(&rec); } } + log_descriptor.previous_flush_horizon= log_descriptor.horizon; + DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor.previous_flush_horizon))); DBUG_RETURN(0); err: ma_message_no_user(0, "log initialization failed"); @@ -4157,6 +4284,7 @@ void translog_destroy() pthread_mutex_destroy(&log_descriptor.log_flush_lock); pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock); pthread_cond_destroy(&log_descriptor.log_flush_cond); + pthread_cond_destroy(&log_descriptor.new_goal_cond); rwlock_destroy(&log_descriptor.open_files_lock); delete_dynamic(&log_descriptor.open_files); delete_dynamic(&log_descriptor.unfinished_files); @@ -6885,11 +7013,11 @@ int translog_read_record_header_from_buffer(uchar *page, { translog_size_t res; DBUG_ENTER("translog_read_record_header_from_buffer"); + DBUG_PRINT("info", ("page byte: 0x%x offset: %u", + (uint) page[page_offset], (uint) page_offset)); DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset])); DBUG_ASSERT(translog_status == TRANSLOG_OK || translog_status == TRANSLOG_READONLY); - DBUG_PRINT("info", ("page byte: 0x%x offset: %u", - (uint) page[page_offset], (uint) page_offset)); buff->type= (page[page_offset] & TRANSLOG_REC_TYPE); buff->short_trid= uint2korr(page + page_offset + 1); DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)", @@ -7356,27 +7484,27 @@ static void translog_force_current_buffer_to_finish() "Buffer addr: (%lu,0x%lx) " "Page addr: (%lu,0x%lx) " "size: %lu (%lu) Pg: %u left: %u in progress %u", - (uint) log_descriptor.bc.buffer_no, - (ulong) log_descriptor.bc.buffer, - LSN_IN_PARTS(log_descriptor.bc.buffer->offset), + (uint) old_buffer_no, + (ulong) old_buffer, + LSN_IN_PARTS(old_buffer->offset), (ulong) LSN_FILE_NO(log_descriptor.horizon), (ulong) (LSN_OFFSET(log_descriptor.horizon) - log_descriptor.bc.current_page_fill), - (ulong) log_descriptor.bc.buffer->size, + (ulong) old_buffer->size, (ulong) (log_descriptor.bc.ptr -log_descriptor.bc. buffer->buffer), (uint) log_descriptor.bc.current_page_fill, (uint) left, - (uint) log_descriptor.bc.buffer-> + (uint) old_buffer-> copy_to_buffer_in_progress)); translog_lock_assert_owner(); LINT_INIT(current_page_fill); - new_buff_beginning= log_descriptor.bc.buffer->offset; - new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */ + new_buff_beginning= old_buffer->offset; + new_buff_beginning+= old_buffer->size; /* increase offset */ DBUG_ASSERT(log_descriptor.bc.ptr !=NULL); DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) == - LSN_FILE_NO(log_descriptor.bc.buffer->offset)); + LSN_FILE_NO(old_buffer->offset)); translog_check_cursor(&log_descriptor.bc); DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE); if (left) @@ -7387,18 +7515,20 @@ static void translog_force_current_buffer_to_finish() */ DBUG_PRINT("info", ("left: %u", (uint) left)); + old_buffer->pre_force_close_horizon= + old_buffer->offset + old_buffer->size; /* decrease offset */ new_buff_beginning-= log_descriptor.bc.current_page_fill; current_page_fill= log_descriptor.bc.current_page_fill; memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left); - log_descriptor.bc.buffer->size+= left; + old_buffer->size+= left; DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx " "Size: %lu", - (uint) log_descriptor.bc.buffer->buffer_no, - (ulong) log_descriptor.bc.buffer, - (ulong) log_descriptor.bc.buffer->size)); - DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no == + (uint) old_buffer->buffer_no, + (ulong) old_buffer, + (ulong) old_buffer->size)); + DBUG_ASSERT(old_buffer->buffer_no == log_descriptor.bc.buffer_no); } else @@ -7509,11 +7639,21 @@ static void translog_force_current_buffer_to_finish() if (left) { - /* - TODO: do not copy beginning of the page if we have no CRC or sector - checks on - */ - memcpy(new_buffer->buffer, data, current_page_fill); + if (log_descriptor.flags & + (TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION)) + memcpy(new_buffer->buffer, data, current_page_fill); + else + { + /* + This page header does not change if we add more data to the page so + we can not copy it and will not overwrite later + */ + new_buffer->skipped_data= current_page_fill; +#ifndef DBUG_OFF + memset(new_buffer->buffer, 0xa5, current_page_fill); +#endif + DBUG_ASSERT(new_buffer->skipped_data < TRANSLOG_PAGE_SIZE); + } } old_buffer->next_buffer_offset= new_buffer->offset; translog_buffer_lock(new_buffer); @@ -7561,6 +7701,7 @@ void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn) { log_descriptor.next_pass_max_lsn= lsn; log_descriptor.max_lsn_requester= pthread_self(); + pthread_cond_broadcast(&log_descriptor.new_goal_cond); } while (flush_no == log_descriptor.flush_no) { @@ -7572,67 +7713,79 @@ void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn) /** - @brief Flush the log up to given LSN (included) + @brief sync() range of files (inclusive) and directory (by request) - @param lsn log record serial number up to which (inclusive) - the log has to be flushed + @param min min internal file number to flush + @param max max internal file number to flush + @param sync_dir need sync directory - @return Operation status + return Operation status @retval 0 OK @retval 1 Error - */ -my_bool translog_flush(TRANSLOG_ADDRESS lsn) +static my_bool translog_sync_files(uint32 min, uint32 max, + my_bool sync_dir) { - LSN sent_to_disk= LSN_IMPOSSIBLE; - TRANSLOG_ADDRESS flush_horizon; - uint fn, i; - dirty_buffer_mask_t dirty_buffer_mask; - uint8 last_buffer_no, start_buffer_no; + uint fn; my_bool rc= 0; - DBUG_ENTER("translog_flush"); - DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); - DBUG_ASSERT(translog_status == TRANSLOG_OK || - translog_status == TRANSLOG_READONLY); - LINT_INIT(sent_to_disk); + ulonglong flush_interval; + DBUG_ENTER("translog_sync_files"); + DBUG_PRINT("info", ("min: %lu max: %lu sync dir: %d", + (ulong) min, (ulong) max, (int) sync_dir)); + DBUG_ASSERT(min <= max); - pthread_mutex_lock(&log_descriptor.log_flush_lock); - DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)", - LSN_IN_PARTS(log_descriptor.flushed))); - if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) + flush_interval= group_commit_wait; + if (flush_interval) + flush_start= my_micro_time(); + for (fn= min; fn <= max; fn++) { - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - DBUG_RETURN(0); - } - if (log_descriptor.flush_in_progress) - { - translog_flush_set_new_goal_and_wait(lsn); - if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self())) + TRANSLOG_FILE *file= get_logfile_by_number(fn); + DBUG_ASSERT(file != NULL); + if (!file->is_sync) { - /* fix lsn if it was horizon */ - if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0) - lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer); - translog_flush_wait_for_end(lsn); - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - DBUG_RETURN(0); + if (my_sync(file->handler.file, MYF(MY_WME))) + { + rc= 1; + translog_stop_writing(); + DBUG_RETURN(rc); + } + translog_syncs++; + file->is_sync= 1; } - log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; } - log_descriptor.flush_in_progress= 1; - flush_horizon= log_descriptor.previous_flush_horizon; - DBUG_PRINT("info", ("flush_in_progress is set")); - pthread_mutex_unlock(&log_descriptor.log_flush_lock); - translog_lock(); - if (log_descriptor.is_everything_flushed) + if (sync_dir) { - DBUG_PRINT("info", ("everything is flushed")); - rc= (translog_status == TRANSLOG_READONLY); - translog_unlock(); - goto out; + if (!(rc= sync_dir(log_descriptor.directory_fd, + MYF(MY_WME | MY_IGNORE_BADFD)))) + translog_syncs++; } + DBUG_RETURN(rc); +} + + +/* + @brief Flushes buffers with LSNs in them less or equal address + + @param lsn address up to which all LSNs should be flushed, + can be reset to real last LSN address + @parem sent_to_disk returns 'sent to disk' position + @param flush_horizon returns horizon of the flush + + @note About terminology see comment to translog_flush(). +*/ + +void translog_flush_buffers(TRANSLOG_ADDRESS *lsn, + TRANSLOG_ADDRESS *sent_to_disk, + TRANSLOG_ADDRESS *flush_horizon) +{ + dirty_buffer_mask_t dirty_buffer_mask; + uint i; + uint8 last_buffer_no, start_buffer_no; + DBUG_ENTER("translog_flush_buffers"); + /* We will recheck information when will lock buffers one by one so we can use unprotected read here (this is just for @@ -7656,15 +7809,15 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) /* if LSN up to which we have to flush bigger then maximum LSN of previous buffer and at least one LSN was saved in the current buffer (last_lsn != - LSN_IMPOSSIBLE) then we better finish the current buffer. + LSN_IMPOSSIBLE) then we have to close the current buffer. */ - if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 && + if (cmp_translog_addr(*lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 && log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE) { struct st_translog_buffer *buffer= log_descriptor.bc.buffer; - lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */ + *lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */ DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)", - LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn))); + LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn))); last_buffer_no= log_descriptor.bc.buffer_no; log_descriptor.is_everything_flushed= 1; translog_force_current_buffer_to_finish(); @@ -7676,8 +7829,10 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) TRANSLOG_BUFFERS_NO); translog_unlock(); } - sent_to_disk= translog_get_sent_to_disk(); - if (cmp_translog_addr(lsn, sent_to_disk) > 0) + + /* flush buffers */ + *sent_to_disk= translog_get_sent_to_disk(); + if (cmp_translog_addr(*lsn, *sent_to_disk) > 0) { DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u", @@ -7697,53 +7852,238 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) LSN_IN_PARTS(buffer->last_lsn), (buffer->file ? "dirty" : "closed"))); - if (buffer->prev_last_lsn <= lsn && + if (buffer->prev_last_lsn <= *lsn && buffer->file != NULL) { - DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size); - flush_horizon= buffer->offset + buffer->size; + DBUG_ASSERT(*flush_horizon <= buffer->offset + buffer->size); + *flush_horizon= (buffer->pre_force_close_horizon != LSN_IMPOSSIBLE ? + buffer->pre_force_close_horizon : + buffer->offset + buffer->size); + /* pre_force_close_horizon is reset during new buffer start */ + DBUG_PRINT("info", ("flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(*flush_horizon))); + DBUG_ASSERT(*flush_horizon <= log_descriptor.horizon); + translog_buffer_flush(buffer); } translog_buffer_unlock(buffer); i= (i + 1) % TRANSLOG_BUFFERS_NO; } while (i != last_buffer_no); - sent_to_disk= translog_get_sent_to_disk(); + *sent_to_disk= translog_get_sent_to_disk(); } - /* sync files from previous flush till current one */ - for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++) + DBUG_VOID_RETURN; +} + +/** + @brief Flush the log up to given LSN (included) + + @param lsn log record serial number up to which (inclusive) + the log has to be flushed + + @return Operation status + @retval 0 OK + @retval 1 Error + + @note + + - Non group commit logic: Commits made in passes. Thread which started + flush first is performing actual flush, other threads sets new goal (LSN) + of the next pass (if it is maximum) and waits for the pass end or just + wait for the pass end. + + - If hard group commit enabled and rate set to zero: + The first thread sends all changed buffers to disk. This is repeated + as long as there are new LSNs added. The process can not loop + forever because we have limited number of threads and they will wait + for the data to be synced. + Pseudo code: + + do + send changed buffers to disk + while new_goal + sync + + - If hard group commit switched ON and less than rate microseconds has + passed from last sync, then after buffers have been sent to disk + wait until rate microseconds has passed since last sync, do sync and return. + This ensures that if we call sync infrequently we don't do any waits. + + - If soft group commit enabled everything works as with 'non group commit' + but the thread doesn't do any real sync(). If rate is not zero the + sync() will be performed by a service thread with the given rate + when needed (new LSN appears). + + @note Terminology: + 'sent to disk' means written to disk but not sync()ed, + 'flushed' mean sent to disk and synced(). +*/ + +my_bool translog_flush(TRANSLOG_ADDRESS lsn) +{ + struct timespec abstime; + ulonglong flush_interval; + ulonglong time_spent; + LSN sent_to_disk= LSN_IMPOSSIBLE; + TRANSLOG_ADDRESS flush_horizon; + my_bool rc= 0; + my_bool hgroup_commit_at_start; + DBUG_ENTER("translog_flush"); + DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn))); + DBUG_ASSERT(translog_status == TRANSLOG_OK || + translog_status == TRANSLOG_READONLY); + LINT_INIT(sent_to_disk); + + pthread_mutex_lock(&log_descriptor.log_flush_lock); + DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)", + LSN_IN_PARTS(log_descriptor.flushed))); + if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0) { - TRANSLOG_FILE *file= get_logfile_by_number(fn); - DBUG_ASSERT(file != NULL); - if (!file->is_sync) + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_RETURN(0); + } + if (log_descriptor.flush_in_progress) + { + translog_lock(); + /* fix lsn if it was horizon */ + if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0) + lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer); + translog_unlock(); + translog_flush_set_new_goal_and_wait(lsn); + if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self())) { - if (my_sync(file->handler.file, MYF(MY_WME))) + /* + translog_flush_wait_for_end() release log_flush_lock while is + waiting then acquire it again + */ + translog_flush_wait_for_end(lsn); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_RETURN(0); + } + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; + } + log_descriptor.flush_in_progress= 1; + flush_horizon= log_descriptor.previous_flush_horizon; + DBUG_PRINT("info", ("flush_in_progress is set, flush_horizon: (%lu,0x%lx)", + LSN_IN_PARTS(flush_horizon))); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + + hgroup_commit_at_start= hard_group_commit; + if (hgroup_commit_at_start) + flush_interval= group_commit_wait; + + translog_lock(); + if (log_descriptor.is_everything_flushed) + { + DBUG_PRINT("info", ("everything is flushed")); + translog_unlock(); + pthread_mutex_lock(&log_descriptor.log_flush_lock); + goto out; + } + + for (;;) + { + /* Following function flushes buffers and makes translog_unlock() */ + translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon); + + if (!hgroup_commit_at_start) + break; /* flush pass is ended */ + +retest: + /* + We do not check time here because pthread_mutex_lock rarely takes + a lot of time so we can sacrifice a bit precision to performance + (taking into account that my_micro_time() might be expensive call). + */ + if (flush_interval == 0) + break; /* flush pass is ended */ + + pthread_mutex_lock(&log_descriptor.log_flush_lock); + if (log_descriptor.next_pass_max_lsn == LSN_IMPOSSIBLE) + { + if (flush_interval == 0 || + (time_spent= (my_micro_time() - flush_start)) >= flush_interval) { - rc= 1; - translog_stop_writing(); - sent_to_disk= LSN_IMPOSSIBLE; - goto out; + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + break; } - file->is_sync= 1; + DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu", + flush_interval - time_spent, + flush_interval, time_spent)); + /* wait time or next goal */ + set_timespec_nsec(abstime, flush_interval - time_spent); + pthread_cond_timedwait(&log_descriptor.new_goal_cond, + &log_descriptor.log_flush_lock, + &abstime); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + DBUG_PRINT("info", ("retest conditions")); + goto retest; + } + + /* take next goal */ + lsn= log_descriptor.next_pass_max_lsn; + log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE; + /* prevent other thread from continue */ + log_descriptor.max_lsn_requester= pthread_self(); + DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)", + LSN_IN_PARTS(lsn))); + pthread_mutex_unlock(&log_descriptor.log_flush_lock); + + /* next flush pass */ + DBUG_PRINT("info", ("next flush pass")); + translog_lock(); + } + + /* + sync() files from previous flush till current one + */ + if (!soft_sync || hgroup_commit_at_start) + { + if ((rc= + translog_sync_files(LSN_FILE_NO(log_descriptor.flushed), + LSN_FILE_NO(lsn), + sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && + (LSN_FILE_NO(log_descriptor. + previous_flush_horizon) != + LSN_FILE_NO(flush_horizon) || + (LSN_OFFSET(log_descriptor. + previous_flush_horizon) / + TRANSLOG_PAGE_SIZE) != + (LSN_OFFSET(flush_horizon) / + TRANSLOG_PAGE_SIZE))))) + { + sent_to_disk= LSN_IMPOSSIBLE; + pthread_mutex_lock(&log_descriptor.log_flush_lock); + goto out; + } + /* keep values for soft sync() and forced sync() actual */ + { + uint32 fileno= LSN_FILE_NO(lsn); + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync_min, fileno); + my_atomic_store32(&soft_sync_max, fileno); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); } } + else + { + my_atomic_rwlock_wrlock(&soft_sync_rwl); + my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn)); + my_atomic_store32(&soft_need_sync, 1); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + } - if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS && - (LSN_FILE_NO(log_descriptor.previous_flush_horizon) != - LSN_FILE_NO(flush_horizon) || - ((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) / - TRANSLOG_PAGE_SIZE) != - ((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE))) - rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); + DBUG_ASSERT(flush_horizon <= log_descriptor.horizon); + + pthread_mutex_lock(&log_descriptor.log_flush_lock); log_descriptor.previous_flush_horizon= flush_horizon; out: - pthread_mutex_lock(&log_descriptor.log_flush_lock); if (sent_to_disk != LSN_IMPOSSIBLE) log_descriptor.flushed= sent_to_disk; log_descriptor.flush_in_progress= 0; log_descriptor.flush_no++; DBUG_PRINT("info", ("flush_in_progress is dropped")); - pthread_mutex_unlock(&log_descriptor.log_flush_lock);\ + pthread_mutex_unlock(&log_descriptor.log_flush_lock); pthread_cond_broadcast(&log_descriptor.log_flush_cond); DBUG_RETURN(rc); } @@ -8113,6 +8453,8 @@ LSN translog_first_theoretical_lsn() my_bool translog_purge(TRANSLOG_ADDRESS low) { uint32 last_need_file= LSN_FILE_NO(low); + uint32 min_unsync; + int soft; TRANSLOG_ADDRESS horizon= translog_get_horizon(); int rc= 0; DBUG_ENTER("translog_purge"); @@ -8120,12 +8462,26 @@ my_bool translog_purge(TRANSLOG_ADDRESS low) DBUG_ASSERT(translog_status == TRANSLOG_OK || translog_status == TRANSLOG_READONLY); + soft= soft_sync; + my_atomic_rwlock_wrlock(&soft_sync_rwl); + min_unsync= my_atomic_load32(&soft_sync_min); + my_atomic_rwlock_wrunlock(&soft_sync_rwl); + DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync)); + if (soft && min_unsync < last_need_file) + { + last_need_file= min_unsync; + DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file)); + } + pthread_mutex_lock(&log_descriptor.purger_lock); + DBUG_PRINT("info", ("last_lsn_checked file: %lu:", + (ulong) log_descriptor.last_lsn_checked)); if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file) { uint32 i; uint32 min_file= translog_first_file(horizon, 1); DBUG_ASSERT(min_file != 0); /* log is already started */ + DBUG_PRINT("info", ("min_file: %lu:",(ulong) min_file)); for(i= min_file; i < last_need_file && rc == 0; i++) { LSN lsn= translog_get_file_max_lsn_stored(i); @@ -8356,6 +8712,159 @@ my_bool translog_log_debug_info(TRN *trn __attribute__((unused)), } + +/** + Sets soft sync mode + + @param mode TRUE if we need switch soft sync on else off +*/ + +void translog_soft_sync(my_bool mode) +{ + soft_sync= mode; +} + + +/** + Sets hard group commit + + @param mode TRUE if we need switch hard group commit on else off +*/ + +void translog_hard_group_commit(my_bool mode) +{ + hard_group_commit= mode; +} + + +/** + @brief forced log sync (used when we are switching modes) +*/ + +void translog_sync() +{ + uint32 max= get_current_logfile()->number; + uint32 min; + DBUG_ENTER("ma_translog_sync"); + + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + if (!min) + min= max; + + translog_sync_files(min, max, sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS); + + DBUG_VOID_RETURN; +} + + +/** + @brief set rate for group commit + + @param interval interval to set. + + @note We use this function with additional variable because have to + restart service thread with new value which we can't make inside changing + variable routine (update_maria_group_commit_interval) +*/ + +void translog_set_group_commit_interval(uint32 interval) +{ + DBUG_ENTER("translog_set_group_commit_interval"); + group_commit_wait= interval; + DBUG_PRINT("info", ("wait: %llu", + (ulonglong)group_commit_wait)); + DBUG_VOID_RETURN; +} + + +/** + @brief syncing service thread +*/ + +static pthread_handler_t +ma_soft_sync_background( void *arg __attribute__((unused))) +{ + + my_thread_init(); + { + DBUG_ENTER("ma_soft_sync_background"); + for(;;) + { + ulonglong prev_loop= my_micro_time(); + ulonglong time, sleep; + uint32 min, max, sync_request; + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + max= my_atomic_load32(&soft_sync_max); + sync_request= my_atomic_load32(&soft_need_sync); + my_atomic_store32(&soft_sync_min, max); + my_atomic_store32(&soft_need_sync, 0); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + + sleep= group_commit_wait; + if (sync_request) + translog_sync_files(min, max, FALSE); + time= my_micro_time() - prev_loop; + if (time > sleep) + sleep= 0; + else + sleep-= time; + if (my_service_thread_sleep(&soft_sync_control, sleep)) + break; + } + my_service_thread_signal_end(&soft_sync_control); + my_thread_end(); + DBUG_RETURN(0); + } +} + + +/** + @brief Starts syncing thread +*/ + +int translog_soft_sync_start(void) +{ + pthread_t th; + int res= 0; + uint32 min, max; + DBUG_ENTER("translog_soft_sync_start"); + + /* check and init variables */ + my_atomic_rwlock_rdlock(&soft_sync_rwl); + min= my_atomic_load32(&soft_sync_min); + max= my_atomic_load32(&soft_sync_max); + if (!max) + my_atomic_store32(&soft_sync_max, (max= get_current_logfile()->number)); + if (!min) + my_atomic_store32(&soft_sync_min, max); + my_atomic_store32(&soft_need_sync, 1); + my_atomic_rwlock_rdunlock(&soft_sync_rwl); + + if (!(res= ma_service_thread_control_init(&soft_sync_control))) + if (!(res= pthread_create(&th, NULL, ma_soft_sync_background, NULL))) + soft_sync_control.status= THREAD_RUNNING; + DBUG_RETURN(res); +} + + +/** + @brief Stops syncing thread +*/ + +void translog_soft_sync_end(void) +{ + DBUG_ENTER("translog_soft_sync_end"); + if (soft_sync_control.inited) + { + ma_service_thread_control_end(&soft_sync_control); + } + DBUG_VOID_RETURN; +} + + #ifdef MARIA_DUMP_LOG #include extern void translog_example_table_init(); diff --git a/storage/maria/ma_loghandler.h b/storage/maria/ma_loghandler.h index dba6283e303..224d93fb24b 100644 --- a/storage/maria/ma_loghandler.h +++ b/storage/maria/ma_loghandler.h @@ -342,6 +342,14 @@ enum enum_translog_status TRANSLOG_SHUTDOWN /* going to shutdown the loghandler */ }; extern enum enum_translog_status translog_status; +extern ulonglong translog_syncs; /* Number of sync()s */ + +void translog_soft_sync(my_bool mode); +void translog_hard_group_commit(my_bool mode); +int translog_soft_sync_start(void); +void translog_soft_sync_end(void); +void translog_sync(); +void translog_set_group_commit_interval(uint32 interval); /* all the rest added because of recovery; should we make @@ -439,6 +447,14 @@ typedef struct st_log_record_type_descriptor extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES]; +typedef enum +{ + TRANSLOG_GCOMMIT_NONE, + TRANSLOG_GCOMMIT_HARD, + TRANSLOG_GCOMMIT_SOFT +} enum_maria_group_commit; +extern ulong maria_group_commit; +extern ulong maria_group_commit_interval; typedef enum { TRANSLOG_PURGE_IMMIDIATE,