mirror of
https://github.com/MariaDB/server.git
synced 2025-01-20 22:12:30 +01:00
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into mysql.com:/space/pekka/ndb/version/my51
This commit is contained in:
commit
81141285c0
5 changed files with 366 additions and 119 deletions
4
mysql-test/include/show_binlog_events.inc
Normal file
4
mysql-test/include/show_binlog_events.inc
Normal file
|
@ -0,0 +1,4 @@
|
|||
--let $binlog_start=102
|
||||
--replace_result $binlog_start <binlog_start>
|
||||
--replace_column 2 # 4 # 5 #
|
||||
--eval show binlog events from $binlog_start
|
|
@ -1,7 +1,7 @@
|
|||
drop database if exists mysqltest;
|
||||
drop table if exists t1,t2;
|
||||
drop table if exists t1,t2,t3;
|
||||
drop database if exists mysqltest;
|
||||
drop table if exists t1,t2;
|
||||
drop table if exists t1,t2,t3;
|
||||
reset master;
|
||||
reset master;
|
||||
create database mysqltest;
|
||||
|
@ -123,3 +123,68 @@ master-bin1.000001 # Table_map # # cluster.apply_status
|
|||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Query # # COMMIT
|
||||
master-bin1.000001 # Query # # use `test`; drop table `t1`
|
||||
reset master;
|
||||
show tables;
|
||||
Tables_in_test
|
||||
reset master;
|
||||
show tables;
|
||||
Tables_in_test
|
||||
create table t1 (a int key) engine=ndb;
|
||||
create table t2 (a int key) engine=ndb;
|
||||
create table t3 (a int key) engine=ndb;
|
||||
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
|
||||
show binlog events from <binlog_start>;
|
||||
Log_name Pos Event_type Server_id End_log_pos Info
|
||||
master-bin1.000001 # Query # # use `test`; create table t1 (a int key) engine=ndb
|
||||
master-bin1.000001 # Query # # use `test`; create table t2 (a int key) engine=ndb
|
||||
master-bin1.000001 # Query # # use `test`; create table t3 (a int key) engine=ndb
|
||||
master-bin1.000001 # Query # # BEGIN
|
||||
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Query # # COMMIT
|
||||
master-bin1.000001 # Query # # use `test`; rename table `test.t3` to `test.t4`
|
||||
master-bin1.000001 # Query # # BEGIN
|
||||
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Query # # COMMIT
|
||||
master-bin1.000001 # Query # # use `test`; rename table `test.t2` to `test.t3`
|
||||
master-bin1.000001 # Query # # BEGIN
|
||||
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Query # # COMMIT
|
||||
master-bin1.000001 # Query # # use `test`; rename table `test.t1` to `test.t2`
|
||||
master-bin1.000001 # Query # # BEGIN
|
||||
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Query # # COMMIT
|
||||
master-bin1.000001 # Query # # use `test`; rename table `test.t4` to `test.t1`
|
||||
drop table t1;
|
||||
drop table t2;
|
||||
drop table t3;
|
||||
reset master;
|
||||
show tables;
|
||||
Tables_in_test
|
||||
reset master;
|
||||
show tables;
|
||||
Tables_in_test
|
||||
create table t1 (a int key) engine=ndb;
|
||||
insert into t1 values(1);
|
||||
rename table t1 to t2;
|
||||
insert into t2 values(2);
|
||||
show binlog events from <binlog_start>;
|
||||
Log_name Pos Event_type Server_id End_log_pos Info
|
||||
master-bin1.000001 # Query # # use `test`; create table t1 (a int key) engine=ndb
|
||||
master-bin1.000001 # Query # # BEGIN
|
||||
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Table_map # # test.t1
|
||||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Query # # COMMIT
|
||||
master-bin1.000001 # Query # # use `test`; rename table `test.t1` to `test.t2`
|
||||
master-bin1.000001 # Query # # BEGIN
|
||||
master-bin1.000001 # Table_map # # cluster.apply_status
|
||||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Table_map # # test.t2
|
||||
master-bin1.000001 # Write_rows # #
|
||||
master-bin1.000001 # Query # # COMMIT
|
||||
drop table t2;
|
||||
|
|
|
@ -5,18 +5,16 @@
|
|||
--disable_warnings
|
||||
connection server2;
|
||||
drop database if exists mysqltest;
|
||||
drop table if exists t1,t2;
|
||||
drop table if exists t1,t2,t3;
|
||||
connection server1;
|
||||
drop database if exists mysqltest;
|
||||
drop table if exists t1,t2;
|
||||
drop table if exists t1,t2,t3;
|
||||
--connection server1
|
||||
reset master;
|
||||
--connection server2
|
||||
reset master;
|
||||
--enable_warnings
|
||||
|
||||
--let $binlog_start=102
|
||||
|
||||
#
|
||||
# basic test to see if ddl distribution works across
|
||||
# multiple binlogs
|
||||
|
@ -33,15 +31,10 @@ create table t1 (a int primary key) engine=ndb;
|
|||
|
||||
--connection server2
|
||||
create table t2 (a int primary key) engine=ndb;
|
||||
--replace_result $binlog_start <binlog_start>
|
||||
--replace_column 2 # 4 # 5 #
|
||||
--eval show binlog events from $binlog_start
|
||||
--source include/show_binlog_events.inc
|
||||
|
||||
--connection server1
|
||||
--replace_result $binlog_start <binlog_start>
|
||||
--replace_column 2 # 4 # 5 #
|
||||
--eval show binlog events from $binlog_start
|
||||
|
||||
-- source include/show_binlog_events.inc
|
||||
|
||||
# alter table
|
||||
--connection server1
|
||||
|
@ -53,9 +46,7 @@ reset master;
|
|||
alter table t2 add column (b int);
|
||||
|
||||
--connections server1
|
||||
--replace_result $binlog_start <binlog_start>
|
||||
--replace_column 2 # 4 # 5 #
|
||||
--eval show binlog events from $binlog_start
|
||||
--source include/show_binlog_events.inc
|
||||
|
||||
|
||||
# alter database
|
||||
|
@ -91,9 +82,7 @@ drop database mysqltest;
|
|||
create table t1 (a int primary key) engine=ndb;
|
||||
|
||||
--connection server2
|
||||
--replace_result $binlog_start <binlog_start>
|
||||
--replace_column 2 # 4 # 5 #
|
||||
--eval show binlog events from $binlog_start
|
||||
--source include/show_binlog_events.inc
|
||||
|
||||
--connection server2
|
||||
drop table t2;
|
||||
|
@ -144,6 +133,51 @@ ENGINE =NDB;
|
|||
drop table t1;
|
||||
|
||||
--connection server2
|
||||
--replace_result $binlog_start <binlog_start>
|
||||
--replace_column 2 # 4 # 5 #
|
||||
--eval show binlog events from $binlog_start
|
||||
--source include/show_binlog_events.inc
|
||||
|
||||
#
|
||||
# Bug #17827 cluster: rename of several tables in one statement,
|
||||
# gets multiply logged
|
||||
#
|
||||
--connection server1
|
||||
reset master;
|
||||
show tables;
|
||||
--connection server2
|
||||
reset master;
|
||||
show tables;
|
||||
|
||||
--connection server1
|
||||
create table t1 (a int key) engine=ndb;
|
||||
create table t2 (a int key) engine=ndb;
|
||||
create table t3 (a int key) engine=ndb;
|
||||
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
|
||||
--connection server2
|
||||
--source include/show_binlog_events.inc
|
||||
|
||||
drop table t1;
|
||||
drop table t2;
|
||||
drop table t3;
|
||||
|
||||
#
|
||||
# Bug #17838 binlog not setup on seconday master after rename
|
||||
#
|
||||
#
|
||||
--connection server1
|
||||
reset master;
|
||||
show tables;
|
||||
--connection server2
|
||||
reset master;
|
||||
show tables;
|
||||
|
||||
--connection server1
|
||||
create table t1 (a int key) engine=ndb;
|
||||
insert into t1 values(1);
|
||||
rename table t1 to t2;
|
||||
insert into t2 values(2);
|
||||
|
||||
# now we should see data in table t1 _and_ t2
|
||||
# prior to bug fix, data was missing for t2
|
||||
--connection server2
|
||||
--source include/show_binlog_events.inc
|
||||
|
||||
drop table t2;
|
||||
|
|
|
@ -230,12 +230,25 @@ static void run_query(THD *thd, char *buf, char *end,
|
|||
}
|
||||
}
|
||||
|
||||
int
|
||||
static void
|
||||
ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
|
||||
{
|
||||
DBUG_ENTER("ndbcluster_binlog_close_table");
|
||||
if (share->table_share)
|
||||
{
|
||||
free_table_share(share->table_share);
|
||||
share->table_share= 0;
|
||||
share->table= 0;
|
||||
}
|
||||
DBUG_ASSERT(share->table == 0);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
static int
|
||||
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
|
||||
TABLE_SHARE *table_share, TABLE *table)
|
||||
{
|
||||
int error;
|
||||
MEM_ROOT *mem_root= &share->mem_root;
|
||||
DBUG_ENTER("ndbcluster_binlog_open_table");
|
||||
|
||||
init_tmp_table_share(table_share, share->db, 0, share->table_name,
|
||||
|
@ -274,22 +287,13 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
|
|||
table->s->table_name.str= share->table_name;
|
||||
table->s->table_name.length= strlen(share->table_name);
|
||||
|
||||
DBUG_ASSERT(share->table_share == 0);
|
||||
share->table_share= table_share;
|
||||
DBUG_ASSERT(share->table == 0);
|
||||
share->table= table;
|
||||
#ifndef DBUG_OFF
|
||||
dbug_print_table("table", table);
|
||||
#endif
|
||||
/*
|
||||
! do not touch the contents of the table
|
||||
it may be in use by the injector thread
|
||||
*/
|
||||
share->ndb_value[0]= (NdbValue*)
|
||||
alloc_root(mem_root, sizeof(NdbValue) *
|
||||
(table->s->fields + 2 /*extra for hidden key and part key*/));
|
||||
share->ndb_value[1]= (NdbValue*)
|
||||
alloc_root(mem_root, sizeof(NdbValue) *
|
||||
(table->s->fields + 2 /*extra for hidden key and part key*/));
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
@ -351,6 +355,18 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
|
|||
TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
|
||||
if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
|
||||
break;
|
||||
/*
|
||||
! do not touch the contents of the table
|
||||
it may be in use by the injector thread
|
||||
*/
|
||||
MEM_ROOT *mem_root= &share->mem_root;
|
||||
share->ndb_value[0]= (NdbValue*)
|
||||
alloc_root(mem_root, sizeof(NdbValue) *
|
||||
(table->s->fields + 2 /*extra for hidden key and part key*/));
|
||||
share->ndb_value[1]= (NdbValue*)
|
||||
alloc_root(mem_root, sizeof(NdbValue) *
|
||||
(table->s->fields + 2 /*extra for hidden key and part key*/));
|
||||
|
||||
if (table->s->primary_key == MAX_KEY)
|
||||
share->flags|= NSF_HIDDEN_PK;
|
||||
if (table->s->blob_fields != 0)
|
||||
|
@ -1156,8 +1172,11 @@ end:
|
|||
(void) pthread_mutex_unlock(&share->mutex);
|
||||
}
|
||||
|
||||
if (get_a_share)
|
||||
if (get_a_share && share)
|
||||
{
|
||||
free_share(&share);
|
||||
share= 0;
|
||||
}
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
@ -1314,22 +1333,35 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||
NDB_SHARE *share)
|
||||
{
|
||||
DBUG_ENTER("ndb_handle_schema_change");
|
||||
int remote_drop_table= 0, do_close_cached_tables= 0;
|
||||
const char *dbname= share->table->s->db.str;
|
||||
const char *tabname= share->table->s->table_name.str;
|
||||
bool online_alter_table= (pOp->getEventType() == NDBEVENT::TE_ALTER &&
|
||||
pOp->tableFrmChanged());
|
||||
bool do_close_cached_tables= FALSE;
|
||||
bool is_online_alter_table= FALSE;
|
||||
bool is_rename_table= FALSE;
|
||||
bool is_remote_change=
|
||||
(uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
|
||||
|
||||
if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
|
||||
(uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
|
||||
if (pOp->getEventType() == NDBEVENT::TE_ALTER)
|
||||
{
|
||||
if (pOp->tableFrmChanged())
|
||||
{
|
||||
is_online_alter_table= TRUE;
|
||||
}
|
||||
else
|
||||
{
|
||||
DBUG_ASSERT(pOp->tableNameChanged());
|
||||
is_rename_table= TRUE;
|
||||
}
|
||||
}
|
||||
|
||||
if (is_remote_change) /* includes CLUSTER_FAILURE */
|
||||
{
|
||||
TABLE_SHARE *table_share= share->table->s;
|
||||
TABLE* table= share->table;
|
||||
TABLE_SHARE *table_share= table->s;
|
||||
const char *dbname= table_share->db.str;
|
||||
|
||||
/*
|
||||
Invalidate table and all it's indexes
|
||||
*/
|
||||
ndb->setDatabaseName(share->table->s->db.str);
|
||||
ndb->setDatabaseName(dbname);
|
||||
Thd_ndb *thd_ndb= get_thd_ndb(thd);
|
||||
DBUG_ASSERT(thd_ndb != NULL);
|
||||
Ndb* old_ndb= thd_ndb->ndb;
|
||||
|
@ -1341,8 +1373,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||
table_handler.invalidate_dictionary_cache(TRUE);
|
||||
thd_ndb->ndb= old_ndb;
|
||||
|
||||
if (online_alter_table)
|
||||
{
|
||||
if (is_online_alter_table)
|
||||
{
|
||||
const char *tabname= table_share->table_name.str;
|
||||
char key[FN_REFLEN];
|
||||
const void *data= 0, *pack_data= 0;
|
||||
uint length, pack_length;
|
||||
|
@ -1364,17 +1397,18 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||
DBUG_DUMP("frm", (char*)altered_table->getFrmData(),
|
||||
altered_table->getFrmLength());
|
||||
pthread_mutex_lock(&LOCK_open);
|
||||
const NDBTAB *old= dict->getTable(tabname);
|
||||
const NDBTAB *old= dict->getTable(tabname);
|
||||
if (!old &&
|
||||
old->getObjectVersion() != altered_table->getObjectVersion())
|
||||
dict->putTable(altered_table);
|
||||
|
||||
|
||||
if ((error= unpackfrm(&data, &length, altered_table->getFrmData())) ||
|
||||
(error= writefrm(key, data, length)))
|
||||
{
|
||||
sql_print_information("NDB: Failed write frm for %s.%s, error %d",
|
||||
dbname, tabname, error);
|
||||
}
|
||||
ndbcluster_binlog_close_table(thd, share);
|
||||
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
|
||||
if ((error= ndbcluster_binlog_open_table(thd, share,
|
||||
table_share, table)))
|
||||
|
@ -1383,11 +1417,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||
pthread_mutex_unlock(&LOCK_open);
|
||||
}
|
||||
}
|
||||
remote_drop_table= 1;
|
||||
}
|
||||
|
||||
// If only frm was changed continue replicating
|
||||
if (online_alter_table)
|
||||
if (is_online_alter_table)
|
||||
{
|
||||
/* Signal ha_ndbcluster::alter_table that drop is done */
|
||||
(void) pthread_cond_signal(&injector_cond);
|
||||
|
@ -1395,6 +1428,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||
}
|
||||
|
||||
(void) pthread_mutex_lock(&share->mutex);
|
||||
if (is_rename_table && !is_remote_change)
|
||||
{
|
||||
DBUG_PRINT("info", ("Detected name change of table %s.%s",
|
||||
share->db, share->table_name));
|
||||
/* ToDo: remove printout */
|
||||
if (ndb_extra_logging)
|
||||
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
|
||||
share_prefix, share->table->s->db.str,
|
||||
share->table->s->table_name.str,
|
||||
share->key);
|
||||
/* do the rename of the table in the share */
|
||||
share->table->s->db.str= share->db;
|
||||
share->table->s->db.length= strlen(share->db);
|
||||
share->table->s->table_name.str= share->table_name;
|
||||
share->table->s->table_name.length= strlen(share->table_name);
|
||||
}
|
||||
DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
|
||||
if (share->op_old == pOp)
|
||||
share->op_old= 0;
|
||||
|
@ -1408,11 +1457,11 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
|
|||
|
||||
pthread_mutex_lock(&ndbcluster_mutex);
|
||||
free_share(&share, TRUE);
|
||||
if (remote_drop_table && share && share->state != NSS_DROPPED)
|
||||
if (is_remote_change && share && share->state != NSS_DROPPED)
|
||||
{
|
||||
DBUG_PRINT("info", ("remote drop table"));
|
||||
DBUG_PRINT("info", ("remote change"));
|
||||
if (share->use_count != 1)
|
||||
do_close_cached_tables= 1;
|
||||
do_close_cached_tables= TRUE;
|
||||
share->state= NSS_DROPPED;
|
||||
free_share(&share, TRUE);
|
||||
}
|
||||
|
@ -1464,24 +1513,45 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||
int log_query= 0;
|
||||
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
|
||||
schema->query_length, schema->query));
|
||||
char key[FN_REFLEN];
|
||||
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
|
||||
NDB_SHARE *share= get_share(key, 0, false, false);
|
||||
|
||||
switch ((enum SCHEMA_OP_TYPE)schema->type)
|
||||
{
|
||||
case SOT_DROP_TABLE:
|
||||
/* binlog dropping table after any table operations */
|
||||
if (ndb_binlog_running)
|
||||
if (share && share->op)
|
||||
post_epoch_log_list->push_back(schema, mem_root);
|
||||
log_query= 0;
|
||||
break;
|
||||
case SOT_RENAME_TABLE:
|
||||
/* fall through */
|
||||
case SOT_ALTER_TABLE:
|
||||
if (ndb_binlog_running)
|
||||
if (share && share->op)
|
||||
{
|
||||
log_query= 1;
|
||||
log_query= 0;
|
||||
post_epoch_log_list->push_back(schema, mem_root);
|
||||
break; /* discovery will be handled by binlog */
|
||||
}
|
||||
/* fall through */
|
||||
goto sot_create_table;
|
||||
case SOT_ALTER_TABLE:
|
||||
if (share && share->op)
|
||||
{
|
||||
log_query= 0;
|
||||
post_epoch_log_list->push_back(schema, mem_root);
|
||||
break; /* discovery will be handled by binlog */
|
||||
}
|
||||
goto sot_create_table;
|
||||
case SOT_CREATE_TABLE:
|
||||
sot_create_table:
|
||||
/*
|
||||
we need to free any share here as command below
|
||||
may need to call handle_trailing_share
|
||||
*/
|
||||
if (share)
|
||||
{
|
||||
free_share(&share);
|
||||
share= 0;
|
||||
}
|
||||
pthread_mutex_lock(&LOCK_open);
|
||||
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
|
||||
{
|
||||
|
@ -1514,10 +1584,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||
break;
|
||||
case SOT_CLEAR_SLOCK:
|
||||
{
|
||||
char key[FN_REFLEN];
|
||||
build_table_filename(key, sizeof(key),
|
||||
schema->db, schema->name, "");
|
||||
NDB_SHARE *share= get_share(key, 0, false, false);
|
||||
if (share)
|
||||
{
|
||||
pthread_mutex_lock(&share->mutex);
|
||||
|
@ -1528,6 +1594,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||
pthread_mutex_unlock(&share->mutex);
|
||||
pthread_cond_signal(&injector_cond);
|
||||
free_share(&share);
|
||||
share= 0;
|
||||
}
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
@ -1536,7 +1603,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||
log_query= 1;
|
||||
break;
|
||||
}
|
||||
|
||||
if (share)
|
||||
{
|
||||
free_share(&share);
|
||||
share= 0;
|
||||
}
|
||||
/* signal that schema operation has been handled */
|
||||
if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK)
|
||||
{
|
||||
|
@ -1571,23 +1642,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||
case NDBEVENT::TE_DELETE:
|
||||
// skip
|
||||
break;
|
||||
case NDBEVENT::TE_ALTER:
|
||||
if (pOp->tableNameChanged())
|
||||
{
|
||||
DBUG_PRINT("info", ("Detected name change of table %s.%s",
|
||||
share->db, share->table_name));
|
||||
/* do the rename of the table in the share */
|
||||
share->table->s->db.str= share->db;
|
||||
share->table->s->db.length= strlen(share->db);
|
||||
share->table->s->table_name.str= share->table_name;
|
||||
share->table->s->table_name.length= strlen(share->table_name);
|
||||
}
|
||||
ndb_handle_schema_change(thd, ndb, pOp, share);
|
||||
break;
|
||||
case NDBEVENT::TE_CLUSTER_FAILURE:
|
||||
case NDBEVENT::TE_DROP:
|
||||
free_share(&schema_share);
|
||||
schema_share= 0;
|
||||
// fall through
|
||||
case NDBEVENT::TE_ALTER:
|
||||
ndb_handle_schema_change(thd, ndb, pOp, share);
|
||||
break;
|
||||
case NDBEVENT::TE_NODE_FAILURE:
|
||||
|
@ -1659,6 +1719,72 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
|
|||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
process any operations that should be done after
|
||||
the epoch is complete
|
||||
*/
|
||||
static void
|
||||
ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
|
||||
List<Cluster_replication_schema>
|
||||
*post_epoch_log_list,
|
||||
List<Cluster_replication_schema>
|
||||
*post_epoch_unlock_list)
|
||||
{
|
||||
DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
|
||||
Cluster_replication_schema *schema;
|
||||
while ((schema= post_epoch_log_list->pop()))
|
||||
{
|
||||
DBUG_PRINT("info", ("log query_length: %d query: '%s'",
|
||||
schema->query_length, schema->query));
|
||||
{
|
||||
char key[FN_REFLEN];
|
||||
build_table_filename(key, sizeof(key), schema->db, schema->name, "");
|
||||
NDB_SHARE *share= get_share(key, 0, false, false);
|
||||
switch ((enum SCHEMA_OP_TYPE)schema->type)
|
||||
{
|
||||
case SOT_DROP_DB:
|
||||
case SOT_DROP_TABLE:
|
||||
break;
|
||||
case SOT_RENAME_TABLE:
|
||||
case SOT_ALTER_TABLE:
|
||||
if (share && share->op)
|
||||
{
|
||||
break; /* discovery handled by binlog */
|
||||
}
|
||||
pthread_mutex_lock(&LOCK_open);
|
||||
if (ndb_create_table_from_engine(thd, schema->db, schema->name))
|
||||
{
|
||||
sql_print_error("Could not discover table '%s.%s' from "
|
||||
"binlog schema event '%s' from node %d",
|
||||
schema->db, schema->name, schema->query,
|
||||
schema->node_id);
|
||||
}
|
||||
pthread_mutex_unlock(&LOCK_open);
|
||||
default:
|
||||
DBUG_ASSERT(false);
|
||||
}
|
||||
if (share)
|
||||
{
|
||||
free_share(&share);
|
||||
share= 0;
|
||||
}
|
||||
}
|
||||
{
|
||||
char *thd_db_save= thd->db;
|
||||
thd->db= schema->db;
|
||||
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
|
||||
schema->query_length, FALSE,
|
||||
schema->name[0] == 0);
|
||||
thd->db= thd_db_save;
|
||||
}
|
||||
}
|
||||
while ((schema= post_epoch_unlock_list->pop()))
|
||||
{
|
||||
ndbcluster_update_slock(thd, schema->db, schema->name);
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
/*
|
||||
Timer class for doing performance measurements
|
||||
*/
|
||||
|
@ -2206,6 +2332,10 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
|
|||
if (share->flags & NSF_BLOB_FLAG)
|
||||
op->mergeEvents(true); // currently not inherited from event
|
||||
|
||||
DBUG_PRINT("info", ("share->ndb_value[0]: 0x%x",
|
||||
share->ndb_value[0]));
|
||||
DBUG_PRINT("info", ("share->ndb_value[1]: 0x%x",
|
||||
share->ndb_value[1]));
|
||||
int n_columns= ndbtab->getNoOfColumns();
|
||||
int n_fields= table ? table->s->fields : 0; // XXX ???
|
||||
for (int j= 0; j < n_columns; j++)
|
||||
|
@ -2258,6 +2388,12 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
|
|||
}
|
||||
share->ndb_value[0][j].ptr= attr0.ptr;
|
||||
share->ndb_value[1][j].ptr= attr1.ptr;
|
||||
DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%x "
|
||||
"share->ndb_value[0][%d]: 0x%x",
|
||||
j, &share->ndb_value[0][j], j, attr0.ptr));
|
||||
DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%x "
|
||||
"share->ndb_value[1][%d]: 0x%x",
|
||||
j, &share->ndb_value[0][j], j, attr1.ptr));
|
||||
}
|
||||
op->setCustomData((void *) share); // set before execute
|
||||
share->op= op; // assign op in NDB_SHARE
|
||||
|
@ -2468,24 +2604,6 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||
"op_old: %lx",
|
||||
share->key, share, pOp, share->op, share->op_old));
|
||||
break;
|
||||
case NDBEVENT::TE_ALTER:
|
||||
if (pOp->tableNameChanged())
|
||||
{
|
||||
DBUG_PRINT("info", ("Detected name change of table %s.%s",
|
||||
share->db, share->table_name));
|
||||
/* ToDo: remove printout */
|
||||
if (ndb_extra_logging)
|
||||
sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
|
||||
share_prefix, share->table->s->db.str,
|
||||
share->table->s->table_name.str,
|
||||
share->key);
|
||||
/* do the rename of the table in the share */
|
||||
share->table->s->db.str= share->db;
|
||||
share->table->s->db.length= strlen(share->db);
|
||||
share->table->s->table_name.str= share->table_name;
|
||||
share->table->s->table_name.length= strlen(share->table_name);
|
||||
}
|
||||
goto drop_alter_common;
|
||||
case NDBEVENT::TE_DROP:
|
||||
if (apply_status_share == share)
|
||||
{
|
||||
|
@ -2495,7 +2613,8 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
|
|||
/* ToDo: remove printout */
|
||||
if (ndb_extra_logging)
|
||||
sql_print_information("NDB Binlog: drop table %s.", share->key);
|
||||
drop_alter_common:
|
||||
// fall through
|
||||
case NDBEVENT::TE_ALTER:
|
||||
row.n_schemaops++;
|
||||
DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx "
|
||||
"share op: %lx op_old: %lx",
|
||||
|
@ -3075,26 +3194,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
process any operations that should be done after
|
||||
the epoch is complete
|
||||
*/
|
||||
{
|
||||
Cluster_replication_schema *schema;
|
||||
while ((schema= post_epoch_unlock_list.pop()))
|
||||
{
|
||||
ndbcluster_update_slock(thd, schema->db, schema->name);
|
||||
}
|
||||
while ((schema= post_epoch_log_list.pop()))
|
||||
{
|
||||
char *thd_db_save= thd->db;
|
||||
thd->db= schema->db;
|
||||
thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
|
||||
schema->query_length, FALSE,
|
||||
schema->name[0] == 0);
|
||||
thd->db= thd_db_save;
|
||||
}
|
||||
}
|
||||
ndb_binlog_thread_handle_schema_event_post_epoch(thd,
|
||||
&post_epoch_log_list,
|
||||
&post_epoch_unlock_list);
|
||||
free_root(&mem_root, MYF(0));
|
||||
*root_ptr= old_root;
|
||||
ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
|
||||
|
@ -3110,9 +3212,15 @@ err:
|
|||
sql_print_information("Stopping Cluster Binlog");
|
||||
|
||||
if (apply_status_share)
|
||||
{
|
||||
free_share(&apply_status_share);
|
||||
apply_status_share= 0;
|
||||
}
|
||||
if (schema_share)
|
||||
{
|
||||
free_share(&schema_share);
|
||||
schema_share= 0;
|
||||
}
|
||||
|
||||
/* remove all event operations */
|
||||
if (ndb)
|
||||
|
|
|
@ -689,9 +689,45 @@ NdbEventOperationImpl::receive_event()
|
|||
error.code));
|
||||
DBUG_RETURN_EVENT(1);
|
||||
}
|
||||
if ( m_eventImpl->m_tableImpl)
|
||||
delete m_eventImpl->m_tableImpl;
|
||||
|
||||
NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
|
||||
m_eventImpl->m_tableImpl = at;
|
||||
|
||||
DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x",
|
||||
tmp_table_impl, at));
|
||||
|
||||
// change the rec attrs to refer to the new table object
|
||||
int i;
|
||||
for (i = 0; i < 2; i++)
|
||||
{
|
||||
NdbRecAttr *p = theFirstPkAttrs[i];
|
||||
while (p)
|
||||
{
|
||||
int no = p->getColumn()->getColumnNo();
|
||||
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||
"switching column impl 0x%x -> 0x%x",
|
||||
p, p->m_column, tAttrInfo));
|
||||
p->m_column = tAttrInfo;
|
||||
p = p->next();
|
||||
}
|
||||
}
|
||||
for (i = 0; i < 2; i++)
|
||||
{
|
||||
NdbRecAttr *p = theFirstDataAttrs[i];
|
||||
while (p)
|
||||
{
|
||||
int no = p->getColumn()->getColumnNo();
|
||||
NdbColumnImpl *tAttrInfo = at->getColumn(no);
|
||||
DBUG_PRINT("info", ("rec_attr: 0x%x "
|
||||
"switching column impl 0x%x -> 0x%x",
|
||||
p, p->m_column, tAttrInfo));
|
||||
p->m_column = tAttrInfo;
|
||||
p = p->next();
|
||||
}
|
||||
}
|
||||
if (tmp_table_impl)
|
||||
delete tmp_table_impl;
|
||||
}
|
||||
|
||||
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
|
||||
|
|
Loading…
Reference in a new issue