mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 10:56:12 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			402 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			402 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2013 Codership Oy <info@codership.com>
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License along
 | |
|    with this program; if not, write to the Free Software Foundation, Inc.,
 | |
|    51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
 | |
| 
 | |
| #include "mariadb.h"
 | |
| #include "mysql/service_wsrep.h"
 | |
| #include "wsrep_binlog.h"
 | |
| #include "log.h"
 | |
| #include "slave.h"
 | |
| #include "log_event.h"
 | |
| #include "wsrep_applier.h"
 | |
| #include "wsrep_mysqld.h"
 | |
| 
 | |
| #include "transaction.h"
 | |
| 
 | |
| extern handlerton *binlog_hton;
 | |
| /*
 | |
|   Write the contents of a cache to a memory buffer.
 | |
| 
 | |
|   This function quite the same as MYSQL_BIN_LOG::write_cache(),
 | |
|   with the exception that here we write in buffer instead of log file.
 | |
|  */
 | |
| int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
 | |
| {
 | |
|   *buf= NULL;
 | |
|   *buf_len= 0;
 | |
|   my_off_t const saved_pos(my_b_tell(cache));
 | |
|   DBUG_ENTER("wsrep_write_cache_buf");
 | |
| 
 | |
|   if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
 | |
|   {
 | |
|     WSREP_ERROR("failed to initialize io-cache");
 | |
|     DBUG_RETURN(ER_ERROR_ON_WRITE);
 | |
|   }
 | |
| 
 | |
|   uint length= my_b_bytes_in_cache(cache);
 | |
|   if (unlikely(0 == length)) length= my_b_fill(cache);
 | |
| 
 | |
|   size_t total_length= 0;
 | |
| 
 | |
|   if (likely(length > 0)) do
 | |
|   {
 | |
|       total_length += length;
 | |
|       /*
 | |
|         Bail out if buffer grows too large.
 | |
|         A temporary fix to avoid allocating indefinitely large buffer,
 | |
|         not a real limit on a writeset size which includes other things
 | |
|         like header and keys.
 | |
|       */
 | |
|       if (total_length > wsrep_max_ws_size)
 | |
|       {
 | |
|           WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
 | |
|                      wsrep_max_ws_size, total_length);
 | |
|           goto error;
 | |
|       }
 | |
|       uchar* tmp= (uchar *)my_realloc(PSI_INSTRUMENT_ME, *buf, total_length,
 | |
|                                        MYF(MY_ALLOW_ZERO_PTR));
 | |
|       if (!tmp)
 | |
|       {
 | |
|           WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
 | |
|                       *buf_len, length);
 | |
|           goto error;
 | |
|       }
 | |
|       *buf= tmp;
 | |
| 
 | |
|       memcpy(*buf + *buf_len, cache->read_pos, length);
 | |
|       *buf_len= total_length;
 | |
| 
 | |
|       if (cache->file < 0)
 | |
|       {
 | |
|         cache->read_pos= cache->read_end;
 | |
|         break;
 | |
|       }
 | |
|   } while ((length= my_b_fill(cache)));
 | |
| 
 | |
|   if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
 | |
|   {
 | |
|     WSREP_WARN("failed to initialize io-cache");
 | |
|     goto cleanup;
 | |
|   }
 | |
| 
 | |
|   DBUG_RETURN(0);
 | |
| 
 | |
| error:
 | |
|   if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
 | |
|   {
 | |
|     WSREP_WARN("failed to initialize io-cache");
 | |
|   }
 | |
| cleanup:
 | |
|   my_free(*buf);
 | |
|   *buf= NULL;
 | |
|   *buf_len= 0;
 | |
|   DBUG_RETURN(ER_ERROR_ON_WRITE);
 | |
| }
 | |
| 
 | |
| #define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
 | |
|                          * many transactions would fit in there
 | |
|                          * so there is no need to reach for the heap */
 | |
| 
 | |
| /*
 | |
|   Write the contents of a cache to wsrep provider.
 | |
| 
 | |
|   This function quite the same as MYSQL_BIN_LOG::write_cache(),
 | |
|   with the exception that here we write in buffer instead of log file.
 | |
| 
 | |
|   This version uses incremental data appending as it reads it from cache.
 | |
|  */
 | |
| static int wsrep_write_cache_inc(THD*      const thd,
 | |
|                                  IO_CACHE* const cache,
 | |
|                                  size_t*   const len)
 | |
| {
 | |
|   DBUG_ENTER("wsrep_write_cache_inc");
 | |
|   my_off_t const saved_pos(my_b_tell(cache));
 | |
| 
 | |
|   if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
 | |
|   {
 | |
|     WSREP_ERROR("failed to initialize io-cache");
 | |
|     DBUG_RETURN(1);;
 | |
|   }
 | |
| 
 | |
|   int ret= 0;
 | |
|   size_t total_length(0);
 | |
| 
 | |
|   uint length(my_b_bytes_in_cache(cache));
 | |
|   if (unlikely(0 == length)) length= my_b_fill(cache);
 | |
| 
 | |
|   if (likely(length > 0))
 | |
|   {
 | |
|     do
 | |
|     {
 | |
|       total_length += length;
 | |
|       /* bail out if buffer grows too large
 | |
|          not a real limit on a writeset size which includes other things
 | |
|          like header and keys.
 | |
|       */
 | |
|       if (unlikely(total_length > wsrep_max_ws_size))
 | |
|       {
 | |
|         WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
 | |
|                    wsrep_max_ws_size, total_length);
 | |
|         ret= 1;
 | |
|         goto cleanup;
 | |
|       }
 | |
|       if (thd->wsrep_cs().append_data(wsrep::const_buffer(cache->read_pos, length)))
 | |
|         goto cleanup;
 | |
|       cache->read_pos= cache->read_end;
 | |
|     } while ((cache->file >= 0) && (length= my_b_fill(cache)));
 | |
|     if (ret == 0)
 | |
|     {
 | |
|       assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
 | |
|     }
 | |
|   }
 | |
| 
 | |
| cleanup:
 | |
|   *len= total_length;
 | |
|   if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
 | |
|   {
 | |
|     WSREP_ERROR("failed to reinitialize io-cache");
 | |
|   }
 | |
|   DBUG_RETURN(ret);
 | |
| }
 | |
| 
 | |
| /*
 | |
|   Write the contents of a cache to wsrep provider.
 | |
| 
 | |
|   This function quite the same as MYSQL_BIN_LOG::write_cache(),
 | |
|   with the exception that here we write in buffer instead of log file.
 | |
|  */
 | |
| int wsrep_write_cache(THD*      const thd,
 | |
|                       IO_CACHE* const cache,
 | |
|                       size_t*   const len)
 | |
| {
 | |
|   return wsrep_write_cache_inc(thd, cache, len);
 | |
| }
 | |
| 
 | |
| void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
 | |
| {
 | |
|   int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld.log",
 | |
|                     wsrep_data_home_dir, (longlong) thd->thread_id,
 | |
|                     (longlong) wsrep_thd_trx_seqno(thd));
 | |
|   if (len < 0)
 | |
|   {
 | |
|     WSREP_ERROR("snprintf error: %d, skipping dump.", len);
 | |
|     return;
 | |
|   }
 | |
|   /*
 | |
|     len doesn't count the \0 end-of-string. Use len+1 below
 | |
|     to alloc and pass as an argument to snprintf.
 | |
|   */
 | |
| 
 | |
|   char *filename= (char *) my_malloc(key_memory_WSREP, len+1, 0);
 | |
|   int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld.log",
 | |
|                     wsrep_data_home_dir, (longlong) thd->thread_id,
 | |
|                     (long long)wsrep_thd_trx_seqno(thd));
 | |
| 
 | |
|   if (len > len1)
 | |
|   {
 | |
|     WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
 | |
|     my_free(filename);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   FILE *of= fopen(filename, "wb");
 | |
| 
 | |
|   if (of)
 | |
|   {
 | |
|     if (fwrite(rbr_buf, buf_len, 1, of) == 0)
 | |
|        WSREP_ERROR("Failed to write buffer of length %llu to '%s'",
 | |
|                    (unsigned long long)buf_len, filename);
 | |
| 
 | |
|     fclose(of);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     WSREP_ERROR("Failed to open file '%s': %d (%s)",
 | |
|                 filename, errno, strerror(errno));
 | |
|   }
 | |
|   my_free(filename);
 | |
| }
 | |
| 
 | |
| /* Dump replication buffer along with header to a file. */
 | |
| void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
 | |
|                                     size_t buf_len)
 | |
| {
 | |
|   DBUG_ENTER("wsrep_dump_rbr_buf_with_header");
 | |
| 
 | |
|   File file;
 | |
|   IO_CACHE cache;
 | |
|   enum_binlog_checksum_alg checksum_alg=
 | |
|     (enum_binlog_checksum_alg) binlog_checksum_options;
 | |
|   Log_event_writer writer(&cache, 0, checksum_alg, NULL);
 | |
|   Format_description_log_event *ev= 0;
 | |
| 
 | |
|   longlong thd_trx_seqno= (long long)wsrep_thd_trx_seqno(thd);
 | |
|   int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld_v2.log",
 | |
|                     wsrep_data_home_dir, (longlong)thd->thread_id,
 | |
|                     thd_trx_seqno);
 | |
|   /*
 | |
|     len doesn't count the \0 end-of-string. Use len+1 below
 | |
|     to alloc and pass as an argument to snprintf.
 | |
|   */
 | |
|   char *filename;
 | |
|   if (len < 0 || !(filename= (char*) my_malloc(key_memory_WSREP, len+1, 0)))
 | |
|   {
 | |
|     WSREP_ERROR("snprintf error: %d, skipping dump.", len);
 | |
|     DBUG_VOID_RETURN;
 | |
|   }
 | |
| 
 | |
|   int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld_v2.log",
 | |
|                      wsrep_data_home_dir, (longlong) thd->thread_id,
 | |
|                      thd_trx_seqno);
 | |
| 
 | |
|   if (len > len1)
 | |
|   {
 | |
|     WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
 | |
|     my_free(filename);
 | |
|     DBUG_VOID_RETURN;
 | |
|   }
 | |
| 
 | |
|   if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
 | |
|                              O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
 | |
|   {
 | |
|     WSREP_ERROR("Failed to open file '%s' : %d (%s)",
 | |
|                 filename, errno, strerror(errno));
 | |
|     goto cleanup1;
 | |
|   }
 | |
| 
 | |
|   if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
 | |
|   {
 | |
|     goto cleanup2;
 | |
|   }
 | |
| 
 | |
|   if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
 | |
|   {
 | |
|     goto cleanup2;
 | |
|   }
 | |
| 
 | |
|   /*
 | |
|     Instantiate an FDLE object for non-wsrep threads (to be written
 | |
|     to the dump file).
 | |
|   */
 | |
|   ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) :
 | |
|     (new Format_description_log_event(4, NULL, checksum_alg));
 | |
| 
 | |
|   if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
 | |
|       flush_io_cache(&cache))
 | |
|   {
 | |
|     WSREP_ERROR("Failed to write to '%s'.", filename);
 | |
|     goto cleanup2;
 | |
|   }
 | |
| 
 | |
| cleanup2:
 | |
|   end_io_cache(&cache);
 | |
| 
 | |
| cleanup1:
 | |
|   my_free(filename);
 | |
|   mysql_file_close(file, MYF(MY_WME));
 | |
| 
 | |
|   if (!thd->wsrep_applier) delete ev;
 | |
| 
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| int wsrep_write_skip_event(THD* thd)
 | |
| {
 | |
|   DBUG_ENTER("wsrep_write_skip_event");
 | |
|   Ignorable_log_event skip_event(thd);
 | |
|   int ret= mysql_bin_log.write_event(&skip_event);
 | |
|   if (ret)
 | |
|   {
 | |
|     WSREP_WARN("wsrep_write_skip_event: write to binlog failed: %d", ret);
 | |
|   }
 | |
|   if (!ret && (ret= trans_commit_stmt(thd)))
 | |
|   {
 | |
|     WSREP_WARN("wsrep_write_skip_event: statt commit failed");
 | |
|   }
 | |
|   DBUG_RETURN(ret);
 | |
| }
 | |
| 
 | |
| int wsrep_write_dummy_event_low(THD *thd, const char *msg)
 | |
| {
 | |
|   ::abort();
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| int wsrep_write_dummy_event(THD *orig_thd, const char *msg)
 | |
| {
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| bool wsrep_commit_will_write_binlog(THD *thd)
 | |
| {
 | |
|   return (!wsrep_emulate_bin_log && /* binlog enabled*/
 | |
|           (wsrep_thd_is_local(thd) || /* local thd*/
 | |
|            (thd->wsrep_applier_service && /* applier and log-slave-updates */
 | |
|             opt_log_slave_updates)));
 | |
| }
 | |
| 
 | |
| /*
 | |
|   The last THD/commit_for_wait registered for group commit.
 | |
| */
 | |
| static wait_for_commit *commit_order_tail= NULL;
 | |
| 
 | |
| void wsrep_register_for_group_commit(THD *thd)
 | |
| {
 | |
|   DBUG_ENTER("wsrep_register_for_group_commit");
 | |
|   if (wsrep_emulate_bin_log)
 | |
|   {
 | |
|     /* Binlog is off, no need to maintain group commit queue */
 | |
|     DBUG_VOID_RETURN;
 | |
|   }
 | |
| 
 | |
|   DBUG_ASSERT(thd->wsrep_trx().ordered());
 | |
| 
 | |
|   wait_for_commit *wfc= thd->wait_for_commit_ptr= &thd->wsrep_wfc;
 | |
| 
 | |
|   mysql_mutex_lock(&LOCK_wsrep_group_commit);
 | |
|   if (commit_order_tail)
 | |
|   {
 | |
|     wfc->register_wait_for_prior_commit(commit_order_tail);
 | |
|   }
 | |
|   commit_order_tail= thd->wait_for_commit_ptr;
 | |
|   mysql_mutex_unlock(&LOCK_wsrep_group_commit);
 | |
| 
 | |
|   /*
 | |
|     Now we have queued for group commit. If the commit will go
 | |
|     through TC log_and_order(), the commit ordering is done
 | |
|     by TC group commit. Otherwise the wait for prior
 | |
|     commits to complete is done in ha_commit_one_phase().
 | |
|   */
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | |
| 
 | |
| void wsrep_unregister_from_group_commit(THD *thd)
 | |
| {
 | |
|   DBUG_ASSERT(thd->wsrep_trx().ordered());
 | |
|   wait_for_commit *wfc= thd->wait_for_commit_ptr;
 | |
| 
 | |
|   if (wfc)
 | |
|   {
 | |
|     mysql_mutex_lock(&LOCK_wsrep_group_commit);
 | |
|     wfc->unregister_wait_for_prior_commit();
 | |
|     thd->wakeup_subsequent_commits(0);
 | |
| 
 | |
|     /* The last one queued for group commit has completed commit, it is
 | |
|        safe to set tail to NULL. */
 | |
|     if (wfc == commit_order_tail)
 | |
|       commit_order_tail= NULL;
 | |
|     mysql_mutex_unlock(&LOCK_wsrep_group_commit);
 | |
|     thd->wait_for_commit_ptr= NULL;
 | |
|   }
 | |
| }
 | 
