diff --git a/sql/ha_innodb.cc b/sql/ha_innodb.cc index 3f592e36219..e68a85bdac9 100644 --- a/sql/ha_innodb.cc +++ b/sql/ha_innodb.cc @@ -45,10 +45,58 @@ have disables the InnoDB inlining in this file. */ #include "ha_innodb.h" -pthread_mutex_t innobase_share_mutex, // to protect innobase_open_files - prepare_commit_mutex; // to force correct commit order in binlog +pthread_mutex_t innobase_share_mutex, /* to protect innobase_open_files */ + prepare_commit_mutex; /* to force correct commit order in + binlog */ bool innodb_inited= 0; +/*-----------------------------------------------------------------*/ +/* These variables are used to implement (semi-)synchronous MySQL binlog +replication for InnoDB tables. */ + +pthread_cond_t innobase_repl_cond; /* Posix cond variable; + this variable is signaled + when enough binlog has been + sent to slave, so that a + waiting trx can return the + 'ok' message to the client + for a commit */ +pthread_mutex_t innobase_repl_cond_mutex; /* Posix cond variable mutex + that also protects the next + innobase_repl_... variables */ +uint innobase_repl_state; /* 1 if synchronous replication + is switched on and is working + ok; else 0 */ +uint innobase_repl_file_name_inited = 0; /* This is set to 1 when + innobase_repl_file_name + contains meaningful data */ +char* innobase_repl_file_name; /* The binlog name up to which + we have sent some binlog to + the slave */ +my_off_t innobase_repl_pos; /* The position in that file + up to which we have sent the + binlog to the slave */ +uint innobase_repl_n_wait_threads = 0; /* This tells how many + transactions currently are + waiting for the binlog to be + sent to the client */ +uint innobase_repl_wait_file_name_inited = 0; /* This is set to 1 + when we know the 'smallest' + wait position */ +char* innobase_repl_wait_file_name; /* NULL, or the 'smallest' + innobase_repl_file_name that + a transaction is waiting for */ +my_off_t innobase_repl_wait_pos; /* The smallest position in + that file that a trx is + waiting for: the trx can + proceed and send an 'ok' to + the client when MySQL has sent + the binlog up to this position + to the slave */ +/*-----------------------------------------------------------------*/ + + + /* Store MySQL definition of 'byte': in Linux it is char while InnoDB uses unsigned char; the header univ.i which we include next defines 'byte' as a macro which expands to 'unsigned char' */ @@ -97,7 +145,7 @@ long innobase_mirrored_log_groups, innobase_log_files_in_group, innobase_log_file_size, innobase_log_buffer_size, innobase_buffer_pool_awe_mem_mb, innobase_buffer_pool_size, innobase_additional_mem_pool_size, - innobase_file_io_threads, innobase_lock_wait_timeout, + innobase_file_io_threads, innobase_lock_wait_timeout, innobase_thread_concurrency, innobase_force_recovery, innobase_open_files; @@ -1531,10 +1579,10 @@ innobase_commit( DBUG_RETURN(0); } -/* The following defined-out code will be enabled later when we put the +/* TODO: put the MySQL-4.1 functionality back to 5.0. This is needed to get InnoDB Hot Backup to work. */ -#if 0 + /********************************************************************* This is called when MySQL writes the binlog entry for the current transaction. Writes to the InnoDB tablespace info which tells where the @@ -1563,6 +1611,24 @@ innobase_report_binlog_offset_and_commit( trx->mysql_log_file_name = log_file_name; trx->mysql_log_offset = (ib_longlong)end_offset; + if (thd->variables.sync_replication) { + /* Let us store the binlog file name and the position, so that + we know how long to wait for the binlog to the replicated to + the slave in synchronous replication. */ + + if (trx->repl_wait_binlog_name == NULL) { + + trx->repl_wait_binlog_name = + (char*)mem_alloc(FN_REFLEN + 100); + } + + ut_a(strlen(log_file_name) <= FN_REFLEN + 100); + + strcpy(trx->repl_wait_binlog_name, log_file_name); + + trx->repl_wait_binlog_pos = (ib_longlong)end_offset; + } + trx->flush_log_later = TRUE; innobase_commit(thd, trx_handle); @@ -1572,6 +1638,7 @@ innobase_report_binlog_offset_and_commit( return(0); } +#if 0 /*********************************************************************** This function stores the binlog offset and flushes logs. */ @@ -1602,7 +1669,6 @@ innobase_store_binlog_offset_and_flush_log( /* Syncronous flush of the log buffer to disk */ log_buffer_flush_to_disk(); } - #endif /********************************************************************* @@ -1615,7 +1681,10 @@ innobase_commit_complete( /* out: 0 */ THD* thd) /* in: user thread */ { + struct timespec abstime; trx_t* trx; + int cmp; + int ret; trx = (trx_t*) thd->ha_data[innobase_hton.slot]; @@ -1631,9 +1700,215 @@ innobase_commit_complete( trx_commit_complete_for_mysql(trx); } + printf("Wait binlog name %s, repl state %lu\n", + trx->repl_wait_binlog_name, + (uint)innobase_repl_state); + + if (thd->variables.sync_replication + && trx->repl_wait_binlog_name + && innobase_repl_state != 0) { + + /* In synchronous replication, let us wait until the MySQL + replication has sent the relevant binlog segment to the + replication slave. */ + +/* TODO: Make sure MySQL uses some way (TCP_NODELAY?) to ensure that the data +has been received in the slave! */ + + pthread_mutex_lock(&innobase_repl_cond_mutex); +try_again: + if (innobase_repl_state == 0) { + + pthread_mutex_unlock(&innobase_repl_cond_mutex); + + return(0); + } + + cmp = strcmp(innobase_repl_file_name, + trx->repl_wait_binlog_name); + if (cmp > 0 + || (cmp == 0 && innobase_repl_pos + >= (my_off_t)trx->repl_wait_binlog_pos)) { + /* We have already sent the relevant binlog to the + slave: no need to wait here */ + + pthread_mutex_unlock(&innobase_repl_cond_mutex); + +/* printf("Binlog now sent\n"); */ + + return(0); + } + + /* Let us update the info about the minimum binlog position + of waiting threads in the innobase_repl_... variables */ + + if (innobase_repl_wait_file_name_inited != 0) { + cmp = strcmp(trx->repl_wait_binlog_name, + innobase_repl_wait_file_name); + if (cmp < 0 + || (cmp == 0 && (my_off_t)trx->repl_wait_binlog_pos + <= innobase_repl_wait_pos)) { + /* This thd has an even lower position, let + us update the minimum info */ + + strcpy(innobase_repl_wait_file_name, + trx->repl_wait_binlog_name); + + innobase_repl_wait_pos = + trx->repl_wait_binlog_pos; + } + } else { + strcpy(innobase_repl_wait_file_name, + trx->repl_wait_binlog_name); + + innobase_repl_wait_pos = trx->repl_wait_binlog_pos; + + innobase_repl_wait_file_name_inited = 1; + } + set_timespec(abstime, thd->variables.sync_replication_timeout); + + /* Let us suspend this thread to wait on the condition; + when replication has progressed far enough, we will release + these waiting threads. The following call + pthread_cond_timedwait also atomically unlocks + innobase_repl_cond_mutex. */ + + innobase_repl_n_wait_threads++; + +/* printf("Waiting for binlog to be sent\n"); */ + + ret = pthread_cond_timedwait(&innobase_repl_cond, + &innobase_repl_cond_mutex, &abstime); + innobase_repl_n_wait_threads--; + + if (ret != 0) { + ut_print_timestamp(stderr); + + fprintf(stderr, +" InnoDB: Error: MySQL synchronous replication\n" +"InnoDB: was not able to send the binlog to the slave within the\n" +"InnoDB: timeout %lu. We assume that the slave has become inaccessible,\n" +"InnoDB: and switch off synchronous replication until the communication.\n" +"InnoDB: to the slave works again.\n", + thd->variables.sync_replication_timeout); + fprintf(stderr, +"InnoDB: MySQL synchronous replication has sent binlog\n" +"InnoDB: to the slave up to file %s, position %lu\n", innobase_repl_file_name, + (ulong)innobase_repl_pos); + fprintf(stderr, +"InnoDB: This transaction needs it to be sent up to\n" +"InnoDB: file %s, position %lu\n", trx->repl_wait_binlog_name, + (uint)trx->repl_wait_binlog_pos); + + innobase_repl_state = 0; + + pthread_mutex_unlock(&innobase_repl_cond_mutex); + + return(0); + } + + goto try_again; + } + return(0); } +/********************************************************************* +In synchronous replication, reports to InnoDB up to which binlog position +we have sent the binlog to the slave. Note that replication is synchronous +for one slave only. For other slaves, we do nothing in this function. This +function is used in a replication master. */ + +int +innobase_repl_report_sent_binlog( +/*=============================*/ + /* out: 0 */ + THD* thd, /* in: thread doing the binlog communication to + the slave */ + char* log_file_name, /* in: binlog file name */ + my_off_t end_offset) /* in: the offset in the binlog file up to + which we sent the contents to the slave */ +{ + int cmp; + ibool can_release_threads = 0; + + /* If synchronous replication is not switched on, or this thd is + sending binlog to a slave where we do not need synchronous replication, + then return immediately */ + + if (thd->server_id != thd->variables.sync_replication_slave_id) { + + /* Do nothing */ + + return(0); + } + + pthread_mutex_lock(&innobase_repl_cond_mutex); + + if (innobase_repl_state == 0) { + + ut_print_timestamp(stderr); + fprintf(stderr, +" InnoDB: Switching MySQL synchronous replication on again at\n" +"InnoDB: binlog file %s, position %lu\n", log_file_name, (ulong)end_offset); + + innobase_repl_state = 1; + } + + /* The position should increase monotonically, since just one thread + is sending the binlog to the slave for which we want synchronous + replication. Let us check this, and print an error to the .err log + if that is not the case. */ + + if (innobase_repl_file_name_inited) { + cmp = strcmp(log_file_name, innobase_repl_file_name); + + if (cmp < 0 + || (cmp == 0 && end_offset < innobase_repl_pos)) { + + ut_print_timestamp(stderr); + fprintf(stderr, +" InnoDB: Error: MySQL synchronous replication has sent binlog\n" +"InnoDB: to the slave up to file %s, position %lu\n", innobase_repl_file_name, + (ulong)innobase_repl_pos); + fprintf(stderr, +"InnoDB: but now MySQL reports that it sent the binlog only up to\n" +"InnoDB: file %s, position %lu\n", log_file_name, (ulong)end_offset); + + } + } + + strcpy(innobase_repl_file_name, log_file_name); + innobase_repl_pos = end_offset; + innobase_repl_file_name_inited = 1; + + if (innobase_repl_n_wait_threads > 0) { + /* Let us check if some of the waiting threads doing a trx + commit can now proceed */ + + cmp = strcmp(innobase_repl_file_name, + innobase_repl_wait_file_name); + if (cmp > 0 + || (cmp == 0 && innobase_repl_pos + >= innobase_repl_wait_pos)) { + + /* Yes, at least one waiting thread can now proceed: + let us release all waiting threads with a broadcast */ + + can_release_threads = 1; + + innobase_repl_wait_file_name_inited = 0; + } + } + + pthread_mutex_unlock(&innobase_repl_cond_mutex); + + if (can_release_threads) { + + pthread_cond_broadcast(&innobase_repl_cond); + } +} + /********************************************************************* Rolls back a transaction or the latest SQL statement. */ diff --git a/sql/ha_innodb.h b/sql/ha_innodb.h index 35f95ead757..6c412a889b2 100644 --- a/sql/ha_innodb.h +++ b/sql/ha_innodb.h @@ -321,3 +321,5 @@ int innobase_rollback_by_xid( int innobase_xa_end(THD *thd); +int innobase_repl_report_sent_binlog(THD *thd, char *log_file_name, + my_off_t end_offset); diff --git a/sql/handler.cc b/sql/handler.cc index 542efaba2bf..95fd4d97616 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2411,3 +2411,60 @@ TYPELIB *ha_known_exts(void) } return &known_extensions; } + +/* + Reports to table handlers up to which position we have sent the binlog + to a slave in replication + + SYNOPSIS + ha_repl_report_sent_binlog() + + NOTES + Only works for InnoDB at the moment + + RETURN VALUE + Always 0 (= success) + + PARAMETERS + THD *thd in: thread doing the binlog communication to + the slave + char *log_file_name in: binlog file name + my_off_t end_offset in: the offset in the binlog file up to + which we sent the contents to the slave +*/ + +int ha_repl_report_sent_binlog(THD *thd, char *log_file_name, + my_off_t end_offset) +{ +#ifdef HAVE_INNOBASE_DB + return innobase_repl_report_sent_binlog(thd,log_file_name,end_offset); +#else + /* remove warnings about unused parameters */ + thd=thd; log_file_name=log_file_name; end_offset=end_offset; + return 0; +#endif +} + +/* + Reports to table handlers that we stop replication to a specific slave + + SYNOPSIS + ha_repl_report_replication_stop() + + NOTES + Does nothing at the moment + + RETURN VALUE + Always 0 (= success) + + PARAMETERS + THD *thd in: thread doing the binlog communication to + the slave +*/ + +int ha_repl_report_replication_stop(THD *thd) +{ + thd = thd; + + return 0; +} diff --git a/sql/handler.h b/sql/handler.h index 4c06fe8299d..5e25f038c36 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -843,7 +843,7 @@ int ha_change_key_cache_param(KEY_CACHE *key_cache); int ha_change_key_cache(KEY_CACHE *old_key_cache, KEY_CACHE *new_key_cache); int ha_end_key_cache(KEY_CACHE *key_cache); -/* weird stuff */ +/* report to InnoDB that control passes to the client */ int ha_release_temporary_latches(THD *thd); /* transactions: interface to handlerton functions */ @@ -875,3 +875,7 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht); #define trans_need_2pc(thd, all) ((total_ha_2pc > 1) && \ !((all ? &thd->transaction.all : &thd->transaction.stmt)->no_2pc)) +/* semi-synchronous replication */ +int ha_repl_report_sent_binlog(THD *thd, char *log_file_name, + my_off_t end_offset); +int ha_repl_report_replication_stop(THD *thd); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index e96870d4088..f1efe0330db 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -5495,7 +5495,6 @@ The minimum value for this variable is 4096.", {"sync-frm", OPT_SYNC_FRM, "Sync .frm to disk on create. Enabled by default.", (gptr*) &opt_sync_frm, (gptr*) &opt_sync_frm, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0}, -#ifdef DOES_NOTHING_YET {"sync-replication", OPT_SYNC_REPLICATION, "Enable synchronous replication.", (gptr*) &global_system_variables.sync_replication, @@ -5511,7 +5510,6 @@ The minimum value for this variable is 4096.", (gptr*) &global_system_variables.sync_replication_timeout, (gptr*) &global_system_variables.sync_replication_timeout, 0, GET_ULONG, REQUIRED_ARG, 10, 0, ~0L, 0, 1, 0}, -#endif {"table_cache", OPT_TABLE_CACHE, "The number of open tables for all threads.", (gptr*) &table_cache_size, (gptr*) &table_cache_size, 0, GET_ULONG, REQUIRED_ARG, 64, 1, 512*1024L, diff --git a/sql/set_var.cc b/sql/set_var.cc index bb3db177936..0e6e36f63a2 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -956,11 +956,9 @@ struct show_var_st init_vars[]= { {"sql_warnings", (char*) &sys_sql_warnings, SHOW_BOOL}, #ifdef HAVE_REPLICATION {sys_sync_binlog_period.name,(char*) &sys_sync_binlog_period, SHOW_SYS}, -#ifdef DOES_NOTHING_YET {sys_sync_replication.name, (char*) &sys_sync_replication, SHOW_SYS}, {sys_sync_replication_slave_id.name, (char*) &sys_sync_replication_slave_id,SHOW_SYS}, {sys_sync_replication_timeout.name, (char*) &sys_sync_replication_timeout,SHOW_SYS}, -#endif #endif {sys_sync_frm.name, (char*) &sys_sync_frm, SHOW_SYS}, #ifdef HAVE_TZNAME diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 634b6ab0995..72470c487a3 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -385,6 +385,11 @@ impossible position"; goto err; } + printf("Binlog file name %s\n", log_file_name); + + if (thd->variables.sync_replication) + ha_repl_report_sent_binlog(thd, log_file_name, pos); + /* We need to start a packet with something other than 255 to distinguish it from error @@ -470,6 +475,10 @@ impossible position"; my_errno= ER_UNKNOWN_ERROR; goto err; } + + if (thd->variables.sync_replication) + ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log)); + /* No need to save this event. We are only doing simple reads (no real parsing of the events) so we don't need it. And so @@ -527,6 +536,13 @@ impossible position"; my_errno= ER_UNKNOWN_ERROR; goto err; } + + printf("Dump loop: %s: Current log position %lu\n", log_file_name, + (ulong)my_b_tell(&log)); + + if (thd->variables.sync_replication) + ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log)); + DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) @@ -640,6 +656,12 @@ impossible position"; goto err; } + printf("Second loop: %s: Current log position %lu\n", log_file_name, + (ulong)my_b_tell(&log)); + + if (thd->variables.sync_replication) + ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log)); + if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) { if (send_file(thd)) @@ -704,12 +726,22 @@ impossible position"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } + + if (thd->variables.sync_replication) + ha_repl_report_sent_binlog(thd, log_file_name, 0); + + printf("Binlog file name of a new binlog %s\n", log_file_name); + packet->length(0); packet->append('\0'); } } end: + printf("Ending replication\n"); + if (thd->variables.sync_replication) + ha_repl_report_replication_stop(thd); + end_io_cache(&log); (void)my_close(file, MYF(MY_WME)); @@ -721,6 +753,11 @@ end: DBUG_VOID_RETURN; err: + if (thd->variables.sync_replication) + ha_repl_report_replication_stop(thd); + + printf("Ending replication in error %s\n", errmsg); + thd->proc_info = "Waiting to finalize termination"; end_io_cache(&log); /*