mariadb/sql/rpl_injector.cc
unknown f3d010f734 Bug#17095 circular replication for ndb
- test case
- reset server id in injector


client/mysqltest.c:
  save master pos for ndb should look at handled epoch instead of applied
sql/rpl_injector.cc:
  Bug#17095 circular replication for ndb
  - reset server id in injector
mysql-test/r/rpl_ndb_circular.result:
  New BitKeeper file ``mysql-test/r/rpl_ndb_circular.result''
mysql-test/t/rpl_ndb_circular.test:
  New BitKeeper file ``mysql-test/t/rpl_ndb_circular.test''
2007-04-12 16:13:49 +02:00

216 lines
5.7 KiB
C++

/* Copyright (C) 2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "mysql_priv.h"
#include "rpl_injector.h"
/*
injector::transaction - member definitions
*/
/* inline since it's called below */
inline
injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
: m_state(START_STATE), m_thd(thd)
{
/*
Default initialization of m_start_pos (which initializes it to garbage).
We need to fill it in using the code below.
*/
LOG_INFO log_info;
log->get_current_log(&log_info);
/* !!! binlog_pos does not follow RAII !!! */
m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
m_start_pos.m_file_pos= log_info.pos;
begin_trans(m_thd);
thd->set_current_stmt_binlog_row_based();
}
injector::transaction::~transaction()
{
if (!good())
return;
/* Needed since my_free expects a 'char*' (instead of 'void*'). */
char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
/*
We set the first character to null just to give all the copies of the
start position a (minimal) chance of seening that the memory is lost.
All assuming the my_free does not step over the memory, of course.
*/
*the_memory= '\0';
my_free(the_memory, MYF(0));
}
int injector::transaction::commit()
{
DBUG_ENTER("injector::transaction::commit()");
m_thd->binlog_flush_pending_rows_event(true);
end_trans(m_thd, COMMIT);
DBUG_RETURN(0);
}
int injector::transaction::use_table(server_id_type sid, table tbl)
{
DBUG_ENTER("injector::transaction::use_table");
int error;
if ((error= check_state(TABLE_STATE)))
DBUG_RETURN(error);
server_id_type save_id= m_thd->server_id;
m_thd->set_server_id(sid);
error= m_thd->binlog_write_table_map(tbl.get_table(),
tbl.is_transactional());
m_thd->set_server_id(save_id);
DBUG_RETURN(error);
}
int injector::transaction::write_row (server_id_type sid, table tbl,
MY_BITMAP const* cols, size_t colcnt,
record_type record)
{
DBUG_ENTER("injector::transaction::write_row(...)");
if (int error= check_state(ROW_STATE))
DBUG_RETURN(error);
server_id_type save_id= m_thd->server_id;
m_thd->set_server_id(sid);
m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, record);
m_thd->set_server_id(save_id);
DBUG_RETURN(0);
}
int injector::transaction::delete_row(server_id_type sid, table tbl,
MY_BITMAP const* cols, size_t colcnt,
record_type record)
{
DBUG_ENTER("injector::transaction::delete_row(...)");
if (int error= check_state(ROW_STATE))
DBUG_RETURN(error);
server_id_type save_id= m_thd->server_id;
m_thd->set_server_id(sid);
m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, record);
m_thd->set_server_id(save_id);
DBUG_RETURN(0);
}
int injector::transaction::update_row(server_id_type sid, table tbl,
MY_BITMAP const* cols, size_t colcnt,
record_type before, record_type after)
{
DBUG_ENTER("injector::transaction::update_row(...)");
if (int error= check_state(ROW_STATE))
DBUG_RETURN(error);
server_id_type save_id= m_thd->server_id;
m_thd->set_server_id(sid);
m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, before, after);
m_thd->set_server_id(save_id);
DBUG_RETURN(0);
}
injector::transaction::binlog_pos injector::transaction::start_pos() const
{
return m_start_pos;
}
/*
injector - member definitions
*/
/* This constructor is called below */
inline injector::injector()
{
}
static injector *s_injector= 0;
injector *injector::instance()
{
if (s_injector == 0)
s_injector= new injector;
/* "There can be only one [instance]" */
return s_injector;
}
void injector::free_instance()
{
injector *inj = s_injector;
if (inj != 0)
{
s_injector= 0;
delete inj;
}
}
injector::transaction injector::new_trans(THD *thd)
{
DBUG_ENTER("injector::new_trans(THD*)");
/*
Currently, there is no alternative to using 'mysql_bin_log' since that
is hardcoded into the way the handler is using the binary log.
*/
DBUG_RETURN(transaction(&mysql_bin_log, thd));
}
void injector::new_trans(THD *thd, injector::transaction *ptr)
{
DBUG_ENTER("injector::new_trans(THD *, transaction *)");
/*
Currently, there is no alternative to using 'mysql_bin_log' since that
is hardcoded into the way the handler is using the binary log.
*/
transaction trans(&mysql_bin_log, thd);
ptr->swap(trans);
DBUG_VOID_RETURN;
}
int injector::record_incident(THD *thd, Incident incident)
{
Incident_log_event ev(thd, incident);
if (int error= mysql_bin_log.write(&ev))
return error;
mysql_bin_log.rotate_and_purge(RP_FORCE_ROTATE);
return 0;
}
int injector::record_incident(THD *thd, Incident incident, LEX_STRING const message)
{
Incident_log_event ev(thd, incident, message);
if (int error= mysql_bin_log.write(&ev))
return error;
mysql_bin_log.rotate_and_purge(RP_FORCE_ROTATE);
return 0;
}