mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 02:46:29 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			472 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			472 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|    Copyright (c) 2005, 2013, Oracle and/or its affiliates.
 | |
|    Copyright (c) 2022, MariaDB Corporation.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or modify
 | |
|    it under the terms of the GNU General Public License as published by
 | |
|    the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1335  USA */
 | |
| 
 | |
| #include "mariadb.h"
 | |
| #include "sql_priv.h"
 | |
| #include "sql_binlog.h"
 | |
| #include "sql_parse.h"
 | |
| #include "sql_acl.h"
 | |
| #include "rpl_rli.h"
 | |
| #include "rpl_mi.h"
 | |
| #include "slave.h"
 | |
| #include "log_event.h"
 | |
| 
 | |
| 
 | |
| /**
 | |
|   Check if the event type is allowed in a BINLOG statement.
 | |
| 
 | |
|   @retval 0 if the event type is ok.
 | |
|   @retval 1 if the event type is not ok.
 | |
| */
 | |
| static int check_event_type(int type, Relay_log_info *rli)
 | |
| {
 | |
|   Format_description_log_event *fd_event=
 | |
|     rli->relay_log.description_event_for_exec;
 | |
| 
 | |
|   /*
 | |
|     Convert event type id of certain old versions (see comment in
 | |
|     Format_description_log_event::Format_description_log_event(char*,...)).
 | |
|   */
 | |
|   if (fd_event && fd_event->event_type_permutation)
 | |
|   {
 | |
| #ifdef DBUG_TRACE
 | |
|     int new_type= fd_event->event_type_permutation[type];
 | |
|     DBUG_PRINT("info",
 | |
|                ("converting event type %d to %d (%s)",
 | |
|                 type, new_type,
 | |
|                 Log_event::get_type_str((Log_event_type)new_type)));
 | |
| #endif
 | |
|     type= fd_event->event_type_permutation[type];
 | |
|   }
 | |
| 
 | |
|   switch (type)
 | |
|   {
 | |
|   case START_EVENT_V3:
 | |
|   case FORMAT_DESCRIPTION_EVENT:
 | |
|     /*
 | |
|       We need a preliminary FD event in order to parse the FD event,
 | |
|       if we don't already have one.
 | |
|     */
 | |
|     if (!fd_event)
 | |
|       if (!(rli->relay_log.description_event_for_exec=
 | |
|             new Format_description_log_event(4)))
 | |
|       {
 | |
|         my_error(ER_OUTOFMEMORY, MYF(0), 1);
 | |
|         return 1;
 | |
|       }
 | |
| 
 | |
|     /* It is always allowed to execute FD events. */
 | |
|     return 0;
 | |
| 
 | |
|   case QUERY_EVENT:
 | |
|   case TABLE_MAP_EVENT:
 | |
|   case WRITE_ROWS_EVENT_V1:
 | |
|   case UPDATE_ROWS_EVENT_V1:
 | |
|   case DELETE_ROWS_EVENT_V1:
 | |
|   case WRITE_ROWS_EVENT:
 | |
|   case UPDATE_ROWS_EVENT:
 | |
|   case DELETE_ROWS_EVENT:
 | |
|   case PRE_GA_WRITE_ROWS_EVENT:
 | |
|   case PRE_GA_UPDATE_ROWS_EVENT:
 | |
|   case PRE_GA_DELETE_ROWS_EVENT:
 | |
|     /*
 | |
|       Row events are only allowed if a Format_description_event has
 | |
|       already been seen.
 | |
|     */
 | |
|     if (fd_event)
 | |
|       return 0;
 | |
|     else
 | |
|     {
 | |
|       my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
 | |
|                MYF(0), Log_event::get_type_str((Log_event_type)type));
 | |
|       return 1;
 | |
|     }
 | |
|     break;
 | |
| 
 | |
|   default:
 | |
|     /*
 | |
|       It is not meaningful to execute other events than row-events and
 | |
|       FD events. It would even be dangerous to execute Stop_log_event
 | |
|       and Rotate_log_event since they call Relay_log_info::flush(), which
 | |
|       is not allowed to call by other threads than the slave SQL
 | |
|       thread when the slave SQL thread is running.
 | |
|     */
 | |
|     my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT,
 | |
|              MYF(0), Log_event::get_type_str((Log_event_type)type));
 | |
|     return 1;
 | |
|   }
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Copy fragments into the standard placeholder thd->lex->comment.str.
 | |
| 
 | |
|   Compute the size of the (still) encoded total,
 | |
|   allocate and then copy fragments one after another.
 | |
|   The size can exceed max(max_allowed_packet) which is not a
 | |
|   problem as no String instance is created off this char array.
 | |
| 
 | |
|   @param thd  THD handle
 | |
|   @return
 | |
|      0        at success,
 | |
|     -1        otherwise.
 | |
| */
 | |
| int binlog_defragment(THD *thd)
 | |
| {
 | |
|   user_var_entry *entry[2];
 | |
|   LEX_CSTRING name[2]= { thd->lex->comment, thd->lex->ident };
 | |
| 
 | |
|   /* compute the total size */
 | |
|   thd->lex->comment.str= NULL;
 | |
|   thd->lex->comment.length= 0;
 | |
|   for (uint k= 0; k < 2; k++)
 | |
|   {
 | |
|     entry[k]=
 | |
|       (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str,
 | |
|                                        name[k].length);
 | |
|     if (!entry[k] || entry[k]->type_handler()->result_type() != STRING_RESULT)
 | |
|     {
 | |
|       my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str);
 | |
|       return -1;
 | |
|     }
 | |
|     thd->lex->comment.length += entry[k]->length;
 | |
|   }
 | |
| 
 | |
|   thd->lex->comment.str=                            // to be freed by the caller
 | |
|     (char *) my_malloc(PSI_INSTRUMENT_ME, thd->lex->comment.length, MYF(MY_WME));
 | |
|   if (!thd->lex->comment.str)
 | |
|   {
 | |
|     my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);
 | |
|     return -1;
 | |
|   }
 | |
| 
 | |
|   /* fragments are merged into allocated buf while the user var:s get reset */
 | |
|   size_t gathered_length= 0;
 | |
|   for (uint k=0; k < 2; k++)
 | |
|   {
 | |
|     memcpy(const_cast<char*>(thd->lex->comment.str) + gathered_length, entry[k]->value,
 | |
|            entry[k]->length);
 | |
|     gathered_length += entry[k]->length;
 | |
|   }
 | |
|   for (uint k=0; k < 2; k++)
 | |
|     update_hash(entry[k], true, NULL, 0,
 | |
|                 &type_handler_long_blob, &my_charset_bin);
 | |
| 
 | |
|   DBUG_ASSERT(gathered_length == thd->lex->comment.length);
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /**
 | |
|   Wraps Log_event::apply_event to save and restore
 | |
|   session context in case of Query_log_event.
 | |
| 
 | |
|   @param ev   replication event
 | |
|   @param rgi  execution context for the event
 | |
| 
 | |
|   @return
 | |
|     0         on success,
 | |
|     non-zero  otherwise.
 | |
| */
 | |
| #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 | |
| int save_restore_context_apply_event(Log_event *ev, rpl_group_info *rgi)
 | |
| {
 | |
|   if (ev->get_type_code() != QUERY_EVENT)
 | |
|     return ev->apply_event(rgi);
 | |
| 
 | |
|   THD *thd= rgi->thd;
 | |
|   Relay_log_info *rli= thd->rli_fake;
 | |
|   DBUG_ASSERT(!rli->mi);
 | |
|   LEX_CSTRING connection_name= { STRING_WITH_LEN("BINLOG_BASE64_EVENT") };
 | |
| 
 | |
|   if (!(rli->mi= new Master_info(&connection_name, false)))
 | |
|   {
 | |
|     my_error(ER_OUT_OF_RESOURCES, MYF(0));
 | |
|     return -1;
 | |
|   }
 | |
| 
 | |
|   sql_digest_state *m_digest= thd->m_digest;
 | |
|   PSI_statement_locker *m_statement_psi= thd->m_statement_psi;;
 | |
|   LEX_CSTRING save_db= thd->db;
 | |
|   my_thread_id m_thread_id= thd->variables.pseudo_thread_id;
 | |
| 
 | |
|   thd->system_thread_info.rpl_sql_info= NULL;
 | |
|   thd->reset_db(&null_clex_str);
 | |
| 
 | |
|   thd->m_digest= NULL;
 | |
|   thd->m_statement_psi= NULL;
 | |
| 
 | |
|   int err= ev->apply_event(rgi);
 | |
| 
 | |
|   thd->m_digest= m_digest;
 | |
|   thd->m_statement_psi= m_statement_psi;
 | |
|   thd->variables.pseudo_thread_id= m_thread_id;
 | |
|   thd->reset_db(&save_db);
 | |
|   delete rli->mi;
 | |
|   rli->mi= NULL;
 | |
| 
 | |
|   return err;
 | |
| }
 | |
| #endif
 | |
| 
 | |
| /**
 | |
|   Execute a BINLOG statement.
 | |
| 
 | |
|   To execute the BINLOG command properly the server needs to know
 | |
|   which format the BINLOG command's event is in.  Therefore, the first
 | |
|   BINLOG statement seen must be a base64 encoding of the
 | |
|   Format_description_log_event, as outputted by mysqlbinlog.  This
 | |
|   Format_description_log_event is cached in
 | |
|   rli->description_event_for_exec.
 | |
| 
 | |
|   @param thd Pointer to THD object for the client thread executing the
 | |
|   statement.
 | |
| */
 | |
| 
 | |
| void mysql_client_binlog_statement(THD* thd)
 | |
| {
 | |
|   DBUG_ENTER("mysql_client_binlog_statement");
 | |
|   DBUG_PRINT("info",("binlog base64: '%*s'",
 | |
|                      (int) (thd->lex->comment.length < 2048 ?
 | |
|                             thd->lex->comment.length : 2048),
 | |
|                      thd->lex->comment.str));
 | |
| 
 | |
|   if (check_global_access(thd, PRIV_STMT_BINLOG))
 | |
|     DBUG_VOID_RETURN;
 | |
| 
 | |
|   /*
 | |
|     option_bits will be changed when applying the event. But we don't expect
 | |
|     it be changed permanently after BINLOG statement, so backup it first.
 | |
|     It will be restored at the end of this function.
 | |
|   */
 | |
|   ulonglong thd_options= thd->variables.option_bits;
 | |
| 
 | |
|   /*
 | |
|     Allocation
 | |
|   */
 | |
| 
 | |
|   int err;
 | |
|   Relay_log_info *rli;
 | |
|   rpl_group_info *rgi;
 | |
|   uchar *buf= NULL;
 | |
|   size_t coded_len= 0, decoded_len= 0;
 | |
| 
 | |
|   rli= thd->rli_fake;
 | |
|   if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE, "BINLOG_BASE64_EVENT")))
 | |
|     rli->sql_driver_thd= thd;
 | |
|   if (!(rgi= thd->rgi_fake))
 | |
|     rgi= thd->rgi_fake= new rpl_group_info(rli);
 | |
|   rgi->thd= thd;
 | |
|   const char *error= 0;
 | |
|   Log_event *ev = 0;
 | |
|   my_bool is_fragmented= FALSE;
 | |
|   /*
 | |
|     Out of memory check
 | |
|   */
 | |
|   if (!(rli))
 | |
|   {
 | |
|     my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);  /* needed 1 bytes */
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   DBUG_ASSERT(rli->belongs_to_client());
 | |
| 
 | |
|   if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str))
 | |
|     if (binlog_defragment(thd))
 | |
|       goto end;
 | |
| 
 | |
|   if (!(coded_len= thd->lex->comment.length))
 | |
|   {
 | |
|     my_error(ER_SYNTAX_ERROR, MYF(0));
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   decoded_len= my_base64_needed_decoded_length((int)coded_len);
 | |
|   if (!(buf= (uchar *) my_malloc(key_memory_binlog_statement_buffer,
 | |
|                                 decoded_len, MYF(MY_WME))))
 | |
|   {
 | |
|     my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);
 | |
|     goto end;
 | |
|   }
 | |
| 
 | |
|   for (char const *strptr= thd->lex->comment.str ;
 | |
|        strptr < thd->lex->comment.str + thd->lex->comment.length ; )
 | |
|   {
 | |
|     char const *endptr= 0;
 | |
|     int bytes_decoded= my_base64_decode(strptr, coded_len, buf, &endptr,
 | |
|                                      MY_BASE64_DECODE_ALLOW_MULTIPLE_CHUNKS);
 | |
| 
 | |
| #ifndef HAVE_valgrind
 | |
|       /*
 | |
|         This debug printout should not be used for valgrind builds
 | |
|         since it will read from unassigned memory.
 | |
|       */
 | |
|     DBUG_PRINT("info",
 | |
|                ("bytes_decoded: %d  strptr: %p  endptr: %p ('%c':%d)",
 | |
|                 bytes_decoded, strptr, endptr, *endptr,
 | |
|                 *endptr));
 | |
| #endif
 | |
| 
 | |
|     if (bytes_decoded < 0)
 | |
|     {
 | |
|       my_error(ER_BASE64_DECODE_ERROR, MYF(0));
 | |
|       goto end;
 | |
|     }
 | |
|     else if (bytes_decoded == 0)
 | |
|       break; // If no bytes where read, the string contained only whitespace
 | |
| 
 | |
|     DBUG_ASSERT(bytes_decoded > 0);
 | |
|     DBUG_ASSERT(endptr > strptr);
 | |
|     coded_len-= endptr - strptr;
 | |
|     strptr= endptr;
 | |
| 
 | |
|     /*
 | |
|       Now we have one or more events stored in the buffer. The size of
 | |
|       the buffer is computed based on how much base64-encoded data
 | |
|       there were, so there should be ample space for the data (maybe
 | |
|       even too much, since a statement can consist of a considerable
 | |
|       number of events).
 | |
| 
 | |
|       TODO: Switch to use a stream-based base64 encoder/decoder in
 | |
|       order to be able to read exactly what is necessary.
 | |
|     */
 | |
| 
 | |
|     DBUG_PRINT("info",("binlog base64 decoded_len: %lu  bytes_decoded: %d",
 | |
|                        (ulong) decoded_len, bytes_decoded));
 | |
| 
 | |
|     /*
 | |
|       Now we start to read events of the buffer, until there are no
 | |
|       more.
 | |
|     */
 | |
|     for (uchar *bufptr= buf ; bytes_decoded > 0 ; )
 | |
|     {
 | |
|       /*
 | |
|         Checking that the first event in the buffer is not truncated.
 | |
|       */
 | |
|       ulong event_len;
 | |
|       if (bytes_decoded < EVENT_LEN_OFFSET + 4 || 
 | |
|           (event_len= uint4korr(bufptr + EVENT_LEN_OFFSET)) > 
 | |
|            (uint) bytes_decoded)
 | |
|       {
 | |
|         my_error(ER_SYNTAX_ERROR, MYF(0));
 | |
|         goto end;
 | |
|       }
 | |
|       DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
 | |
|                           event_len, bytes_decoded));
 | |
| 
 | |
|       if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli))
 | |
|         goto end;
 | |
| 
 | |
|       ev= Log_event::read_log_event(bufptr, event_len, &error,
 | |
|                                     rli->relay_log.description_event_for_exec,
 | |
|                                     0);
 | |
| 
 | |
|       DBUG_PRINT("info",("binlog base64 err=%s", error));
 | |
|       if (!ev)
 | |
|       {
 | |
|         /*
 | |
|           This could actually be an out-of-memory, but it is more likely
 | |
|           caused by a bad statement
 | |
|         */
 | |
|         my_error(ER_SYNTAX_ERROR, MYF(0));
 | |
|         goto end;
 | |
|       }
 | |
| 
 | |
|       bytes_decoded -= event_len;
 | |
|       bufptr += event_len;
 | |
| 
 | |
|       DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
 | |
|       ev->thd= thd;
 | |
|       /*
 | |
|         We go directly to the application phase, since we don't need
 | |
|         to check if the event shall be skipped or not.
 | |
| 
 | |
|         Neither do we have to update the log positions, since that is
 | |
|         not used at all: the rli_fake instance is used only for error
 | |
|         reporting.
 | |
|       */
 | |
| #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 | |
|       ulonglong save_skip_replication=
 | |
|                         thd->variables.option_bits & OPTION_SKIP_REPLICATION;
 | |
|       thd->variables.option_bits=
 | |
|         (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
 | |
|         (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
 | |
|          OPTION_SKIP_REPLICATION : 0);
 | |
| 
 | |
|       {
 | |
|         /*
 | |
|           For conventional statements thd->lex points to thd->main_lex, that is
 | |
|           thd->lex == &thd->main_lex. On the other hand, for prepared statement
 | |
|           thd->lex points to the LEX object explicitly allocated for execution
 | |
|           of the prepared statement and in this case thd->lex != &thd->main_lex.
 | |
|           On handling the BINLOG statement, invocation of ev->apply_event(rgi)
 | |
|           initiates the following sequence of calls
 | |
|             Rows_log_event::do_apply_event -> THD::reset_for_next_command
 | |
|           Since the method THD::reset_for_next_command() contains assert
 | |
|             DBUG_ASSERT(lex == &main_lex)
 | |
|           this sequence of calls results in crash when a binlog event is
 | |
|           applied in PS mode. So, reset the current lex temporary to point to
 | |
|           thd->main_lex before running ev->apply_event() and restore its
 | |
|           original value on return.
 | |
|         */
 | |
|         LEX *backup_lex;
 | |
| 
 | |
|         thd->backup_and_reset_current_lex(&backup_lex);
 | |
|         err= save_restore_context_apply_event(ev, rgi);
 | |
|         thd->restore_current_lex(backup_lex);
 | |
|       }
 | |
|       thd->variables.option_bits=
 | |
|         (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
 | |
|         save_skip_replication;
 | |
| #else
 | |
|       err= 0;
 | |
| #endif
 | |
|       /*
 | |
|         Format_description_log_event should not be deleted because it
 | |
|         will be used to read info about the relay log's format; it
 | |
|         will be deleted when the SQL thread does not need it,
 | |
|         i.e. when this thread terminates.
 | |
|       */
 | |
|       if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
 | |
|         delete ev;
 | |
|       ev= 0;
 | |
|       if (err)
 | |
|       {
 | |
|         /*
 | |
|           TODO: Maybe a better error message since the BINLOG statement
 | |
|           now contains several events.
 | |
|         */
 | |
|         if (!thd->is_error())
 | |
|           my_error(ER_UNKNOWN_ERROR, MYF(0));
 | |
|         goto end;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
| 
 | |
|   DBUG_PRINT("info",("binlog base64 execution finished successfully"));
 | |
|   my_ok(thd);
 | |
| 
 | |
| end:
 | |
|   if (unlikely(is_fragmented))
 | |
|     my_free(const_cast<char*>(thd->lex->comment.str));
 | |
|   thd->variables.option_bits= thd_options;
 | |
|   rgi->slave_close_thread_tables(thd);
 | |
|   my_free(buf);
 | |
|   delete rgi;
 | |
|   rgi= thd->rgi_fake= NULL;
 | |
|   DBUG_VOID_RETURN;
 | |
| }
 | 
