Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new
This commit is contained in:
tomas@poseidon.ndb.mysql.com 2006-04-19 14:58:42 +02:00
commit 0ebba697d8
18 changed files with 371 additions and 166 deletions

View file

@ -6,13 +6,6 @@ attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
drop table t1;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
SHOW INDEX FROM t1;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment
t1 0 PRIMARY 1 pk1 A 0 NULL NULL BTREE

View file

@ -122,3 +122,28 @@ select * from t1 order by nid;
nid nom prenom
1 DEAD ABC1
DROP TABLE t1;
CREATE TABLE t1 (c1 INT KEY) ENGINE=NDB;
INSERT INTO t1 VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
ALTER TABLE t1 ADD c2 INT;
SELECT * FROM t1 ORDER BY c1;
c1 c2
1 NULL
2 NULL
3 NULL
4 NULL
5 NULL
6 NULL
7 NULL
8 NULL
9 NULL
10 NULL
ALTER TABLE t1 CHANGE c2 c2 TEXT CHARACTER SET utf8;
ALTER TABLE t1 CHANGE c2 c2 BLOB;
SELECT * FROM t1 ORDER BY c1 LIMIT 5;
c1 c2
1 NULL
2 NULL
3 NULL
4 NULL
5 NULL
DROP TABLE t1;

View file

@ -25,9 +25,9 @@ rpl_ndb_2innodb : BUG#19004 2006-03-22 tomas ndb: partition by range an
rpl_ndb_2myisam : BUG#19004 2006-03-22 tomas ndb: partition by range and update hangs
rpl_ndb_auto_inc : BUG#17086 2006-02-16 jmiller CR: auto_increment_increment and auto_increment_offset produce duplicate key er
rpl_ndb_ddl : result file needs update + test needs to checked
rpl_ndb_innodb2ndb : BUG#18094 2006-03-16 mats Slave caches invalid table definition after atlters causes select failure
rpl_ndb_innodb2ndb : BUG#17400 2006-04-19 tomas Cluster Replication: delete & update of rows in table without pk fails on slave.
rpl_ndb_log : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ
rpl_ndb_myisam2ndb : BUG#18094 2006-03-16 mats Slave caches invalid table definition after atlters causes select failure
rpl_ndb_myisam2ndb : BUG#17400 2006-04-19 tomas Cluster Replication: delete & update of rows in table without pk fails on slave.
rpl_ndb_relay_space : BUG#16993 2006-02-16 jmiller RBR: ALTER TABLE ZEROFILL AUTO_INCREMENT is not replicated correctly
rpl_switch_stm_row_mixed : BUG#18590 2006-03-28 brian
rpl_row_basic_7ndb : BUG#17400 2006-04-09 brian Cluster Replication: delete & update of rows in table without pk fails on slave.

View file

@ -6,17 +6,6 @@ DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists mysqltest;
--enable_warnings
# workaround for bug#16445
# remove to reproduce bug and run tests drom ndb start
# and with ndb_autodiscover disabled
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
drop table t1;
#
# Basic test to show that the NDB
# table handler is working

View file

@ -143,6 +143,37 @@ COMMIT;
--connection slave
select * from t1 order by nid;
# cleanup
--connection master
DROP TABLE t1;
#
# BUG#18094
# Slave caches invalid table definition after atlters causes select failure
#
--connection master
CREATE TABLE t1 (c1 INT KEY) ENGINE=NDB;
INSERT INTO t1 VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
ALTER TABLE t1 ADD c2 INT;
--sync_slave_with_master
connection slave;
SELECT * FROM t1 ORDER BY c1;
connection master;
ALTER TABLE t1 CHANGE c2 c2 TEXT CHARACTER SET utf8;
ALTER TABLE t1 CHANGE c2 c2 BLOB;
--sync_slave_with_master
connection slave;
# here we would get error 1412 prior to bug
SELECT * FROM t1 ORDER BY c1 LIMIT 5;
# cleanup
--connection master
DROP TABLE t1;

View file

@ -5989,7 +5989,7 @@ int Field_str::store(double nr)
uint Field::is_equal(create_field *new_field)
{
return (new_field->sql_type == type());
return (new_field->sql_type == real_type());
}
@ -6001,7 +6001,7 @@ uint Field_str::is_equal(create_field *new_field)
(flags & (BINCMP_FLAG | BINARY_FLAG))))
return 0; /* One of the fields is binary and the other one isn't */
return ((new_field->sql_type == type()) &&
return ((new_field->sql_type == real_type()) &&
new_field->charset == field_charset &&
new_field->length == max_length());
}
@ -6798,7 +6798,7 @@ Field *Field_varstring::new_key_field(MEM_ROOT *root,
uint Field_varstring::is_equal(create_field *new_field)
{
if (new_field->sql_type == type() &&
if (new_field->sql_type == real_type() &&
new_field->charset == field_charset)
{
if (new_field->length == max_length())
@ -7957,12 +7957,12 @@ bool Field_num::eq_def(Field *field)
uint Field_num::is_equal(create_field *new_field)
{
return ((new_field->sql_type == type()) &&
return ((new_field->sql_type == real_type()) &&
((new_field->flags & UNSIGNED_FLAG) == (uint) (flags &
UNSIGNED_FLAG)) &&
((new_field->flags & AUTO_INCREMENT_FLAG) ==
(uint) (flags & AUTO_INCREMENT_FLAG)) &&
(new_field->length >= max_length()));
(new_field->length <= max_length()));
}

View file

@ -466,7 +466,7 @@ void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
# The mapped error code
*/
int ha_ndbcluster::invalidate_dictionary_cache(bool global)
int ha_ndbcluster::invalidate_dictionary_cache(bool global, const NDBTAB *ndbtab)
{
NDBDICT *dict= get_ndb()->getDictionary();
DBUG_ENTER("invalidate_dictionary_cache");
@ -494,20 +494,17 @@ int ha_ndbcluster::invalidate_dictionary_cache(bool global)
DBUG_PRINT("info", ("Released ndbcluster mutex"));
}
#endif
const NDBTAB *tab= dict->getTable(m_tabname);
if (!tab)
DBUG_RETURN(1);
if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
if (!ndbtab)
{
// Global cache has already been invalidated
dict->removeCachedTable(m_tabname);
global= FALSE;
DBUG_PRINT("info", ("global: %d", global));
ndbtab= dict->getTable(m_tabname);
if (!ndbtab)
DBUG_RETURN(1);
}
else
dict->invalidateTable(m_tabname);
dict->invalidateTable(ndbtab);
table_share->version= 0L; /* Free when thread is ready */
}
else if (ndbtab)
dict->removeCachedTable(ndbtab);
else
dict->removeCachedTable(m_tabname);
@ -564,7 +561,7 @@ int ha_ndbcluster::ndb_err(NdbTransaction *trans)
table_list.alias= table_list.table_name= m_tabname;
close_cached_tables(current_thd, 0, &table_list);
invalidate_dictionary_cache(TRUE);
invalidate_dictionary_cache(TRUE, m_table);
if (err.code==284)
{
@ -1041,7 +1038,7 @@ int ha_ndbcluster::get_metadata(const char *path)
// Check if thread has stale local cache
if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
invalidate_dictionary_cache(FALSE);
invalidate_dictionary_cache(FALSE, tab);
if (!(tab= dict->getTable(m_tabname)))
ERR_RETURN(dict->getNdbError());
DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
@ -1064,7 +1061,7 @@ int ha_ndbcluster::get_metadata(const char *path)
if (!invalidating_ndb_table)
{
DBUG_PRINT("info", ("Invalidating table"));
invalidate_dictionary_cache(TRUE);
invalidate_dictionary_cache(TRUE, tab);
invalidating_ndb_table= TRUE;
}
else
@ -1091,7 +1088,7 @@ int ha_ndbcluster::get_metadata(const char *path)
DBUG_RETURN(error);
m_table_version= tab->getObjectVersion();
m_table= (void *)tab;
m_table= tab;
m_table_info= NULL; // Set in external lock
DBUG_RETURN(open_indexes(ndb, table, FALSE));
@ -1150,7 +1147,7 @@ int ha_ndbcluster::table_changed(const void *pack_frm_data, uint pack_frm_len)
// Check if thread has stale local cache
if (orig_tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
dict->removeCachedTable(m_tabname);
dict->removeCachedTable(orig_tab);
if (!(orig_tab= dict->getTable(m_tabname)))
ERR_RETURN(dict->getNdbError());
}
@ -1219,13 +1216,31 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info,
int error= 0;
NDB_INDEX_TYPE idx_type= get_index_type_from_table(index_no);
m_index[index_no].type= idx_type;
DBUG_ENTER("ha_ndbcluster::get_index_handle");
DBUG_ENTER("ha_ndbcluster::add_index_handle");
DBUG_PRINT("enter", ("table %s", m_tabname));
if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX)
{
DBUG_PRINT("info", ("Get handle to index %s", index_name));
const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
if (!index) ERR_RETURN(dict->getNdbError());
const NDBINDEX *index;
do
{
index= dict->getIndex(index_name, m_tabname);
if (!index)
ERR_RETURN(dict->getNdbError());
DBUG_PRINT("info", ("index: 0x%x id: %d version: %d.%d status: %d",
index,
index->getObjectId(),
index->getObjectVersion() & 0xFFFFFF,
index->getObjectVersion() >> 24,
index->getObjectStatus()));
if (index->getObjectStatus() != NdbDictionary::Object::Retrieved)
{
dict->removeCachedIndex(index);
continue;
}
break;
} while (1);
m_index[index_no].index= (void *) index;
// ordered index - add stats
NDB_INDEX_DATA& d=m_index[index_no];
@ -1254,8 +1269,25 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info,
m_has_unique_index= TRUE;
strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name));
const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname);
if (!index) ERR_RETURN(dict->getNdbError());
const NDBINDEX *index;
do
{
index= dict->getIndex(unique_index_name, m_tabname);
if (!index)
ERR_RETURN(dict->getNdbError());
DBUG_PRINT("info", ("index: 0x%x id: %d version: %d.%d status: %d",
index,
index->getObjectId(),
index->getObjectVersion() & 0xFFFFFF,
index->getObjectVersion() >> 24,
index->getObjectStatus()));
if (index->getObjectStatus() != NdbDictionary::Object::Retrieved)
{
dict->removeCachedIndex(index);
continue;
}
break;
} while (1);
m_index[index_no].unique_index= (void *) index;
error= fix_unique_index_attr_order(m_index[index_no], index, key_info);
}
@ -3954,7 +3986,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if ((trans && tab->getObjectStatus() != NdbDictionary::Object::Retrieved)
|| tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
invalidate_dictionary_cache(FALSE);
invalidate_dictionary_cache(FALSE, tab);
if (!(tab= dict->getTable(m_tabname, &tab_info)))
ERR_RETURN(dict->getNdbError());
DBUG_PRINT("info", ("Table schema version: %d",
@ -3970,7 +4002,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
}
if (m_table != (void *)tab)
{
m_table= (void *)tab;
m_table= tab;
m_table_version = tab->getObjectVersion();
if (!(my_errno= open_indexes(ndb, table, FALSE)))
DBUG_RETURN(my_errno);
@ -4990,7 +5022,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
// Check if thread has stale local cache
if (orig_tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
dict->removeCachedTable(m_tabname);
dict->removeCachedTable(orig_tab);
if (!(orig_tab= dict->getTable(m_tabname)))
ERR_RETURN(dict->getNdbError());
}
@ -5002,7 +5034,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
DBUG_ASSERT(r == 0);
}
#endif
m_table= (void *)orig_tab;
m_table= orig_tab;
// Change current database to that of target table
set_dbname(to);
ndb->setDatabaseName(m_dbname);
@ -9988,7 +10020,7 @@ bool ha_ndbcluster::get_no_parts(const char *name, uint *no_parts)
// Check if thread has stale local cache
if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
invalidate_dictionary_cache(FALSE);
invalidate_dictionary_cache(FALSE, tab);
if (!(tab= dict->getTable(m_tabname)))
ERR_BREAK(dict->getNdbError(), err);
}

View file

@ -778,7 +778,8 @@ private:
void print_results();
ulonglong get_auto_increment();
int invalidate_dictionary_cache(bool global);
int invalidate_dictionary_cache(bool global,
const NdbDictionary::Table *ndbtab);
int ndb_err(NdbTransaction*);
bool uses_blob_value();
@ -816,7 +817,7 @@ private:
NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor;
void *m_table;
const NdbDictionary::Table *m_table;
int m_table_version;
void *m_table_info;
char m_dbname[FN_HEADLEN];

View file

@ -1068,20 +1068,27 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
MY_BITMAP schema_subscribers;
uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
{
int i;
int i, updated= 0;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false);
bitmap_set_all(&schema_subscribers);
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < ndb_number_of_storage_nodes; i++)
for (i= 0; i < no_storage_nodes; i++)
{
MY_BITMAP *table_subscribers= &schema_share->subscriber_bitmap[i];
if (!bitmap_is_clear_all(table_subscribers))
{
bitmap_intersect(&schema_subscribers,
table_subscribers);
updated= 1;
}
}
(void) pthread_mutex_unlock(&schema_share->mutex);
bitmap_clear_bit(&schema_subscribers, node_id);
if (updated)
bitmap_clear_bit(&schema_subscribers, node_id);
else
bitmap_clear_all(&schema_subscribers);
if (ndb_schema_object)
{
(void) pthread_mutex_lock(&ndb_schema_object->mutex);
@ -1227,13 +1234,14 @@ end:
{
struct timespec abstime;
int i;
int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
set_timespec(abstime, 1);
int ret= pthread_cond_timedwait(&injector_cond,
&ndb_schema_object->mutex,
&abstime);
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < ndb_number_of_storage_nodes; i++)
for (i= 0; i < no_storage_nodes; i++)
{
/* remove any unsubscribed from schema_subscribers */
MY_BITMAP *tmp= &schema_share->subscriber_bitmap[i];
@ -1430,6 +1438,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share)
{
DBUG_ENTER("ndb_handle_schema_change");
TABLE* table= share->table;
TABLE_SHARE *table_share= table->s;
const char *dbname= table_share->db.str;
const char *tabname= table_share->table_name.str;
bool do_close_cached_tables= FALSE;
bool is_online_alter_table= FALSE;
bool is_rename_table= FALSE;
@ -1449,70 +1461,68 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
}
}
if (is_remote_change) /* includes CLUSTER_FAILURE */
/*
Refresh local dictionary cache by
invalidating table and all it's indexes
*/
ndb->setDatabaseName(dbname);
Thd_ndb *thd_ndb= get_thd_ndb(thd);
DBUG_ASSERT(thd_ndb != NULL);
Ndb* old_ndb= thd_ndb->ndb;
thd_ndb->ndb= ndb;
ha_ndbcluster table_handler(table_share);
(void)strxmov(table_handler.m_dbname, dbname, NullS);
(void)strxmov(table_handler.m_tabname, tabname, NullS);
table_handler.open_indexes(ndb, table, TRUE);
table_handler.invalidate_dictionary_cache(TRUE, 0);
thd_ndb->ndb= old_ndb;
/*
Refresh local frm file and dictionary cache if
remote on-line alter table
*/
if (is_remote_change && is_online_alter_table)
{
TABLE* table= share->table;
TABLE_SHARE *table_share= table->s;
const char *dbname= table_share->db.str;
const char *tabname= table_share->table_name.str;
char key[FN_REFLEN];
const void *data= 0, *pack_data= 0;
uint length, pack_length;
int error;
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *altered_table= pOp->getTable();
/*
Invalidate table and all it's indexes
DBUG_PRINT("info", ("Detected frm change of table %s.%s",
dbname, tabname));
build_table_filename(key, FN_LEN-1, dbname, tabname, NullS);
/*
If the frm of the altered table is different than the one on
disk then overwrite it with the new table definition
*/
ndb->setDatabaseName(dbname);
Thd_ndb *thd_ndb= get_thd_ndb(thd);
DBUG_ASSERT(thd_ndb != NULL);
Ndb* old_ndb= thd_ndb->ndb;
thd_ndb->ndb= ndb;
ha_ndbcluster table_handler(table_share);
table_handler.set_dbname(share->key);
table_handler.set_tabname(share->key);
table_handler.open_indexes(ndb, table, TRUE);
table_handler.invalidate_dictionary_cache(TRUE);
thd_ndb->ndb= old_ndb;
if (is_online_alter_table)
if (readfrm(key, &data, &length) == 0 &&
packfrm(data, length, &pack_data, &pack_length) == 0 &&
cmp_frm(altered_table, pack_data, pack_length))
{
const char *tabname= table_share->table_name.str;
char key[FN_REFLEN];
const void *data= 0, *pack_data= 0;
uint length, pack_length;
int error;
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *altered_table= pOp->getTable();
DBUG_PRINT("info", ("Detected frm change of table %s.%s",
dbname, tabname));
build_table_filename(key, FN_LEN-1, dbname, tabname, NullS);
/*
If the frm of the altered table is different than the one on
disk then overwrite it with the new table definition
*/
if (readfrm(key, &data, &length) == 0 &&
packfrm(data, length, &pack_data, &pack_length) == 0 &&
cmp_frm(altered_table, pack_data, pack_length))
DBUG_DUMP("frm", (char*)altered_table->getFrmData(),
altered_table->getFrmLength());
pthread_mutex_lock(&LOCK_open);
const NDBTAB *old= dict->getTable(tabname);
if (!old &&
old->getObjectVersion() != altered_table->getObjectVersion())
dict->putTable(altered_table);
if ((error= unpackfrm(&data, &length, altered_table->getFrmData())) ||
(error= writefrm(key, data, length)))
{
DBUG_DUMP("frm", (char*)altered_table->getFrmData(),
altered_table->getFrmLength());
pthread_mutex_lock(&LOCK_open);
const NDBTAB *old= dict->getTable(tabname);
if (!old &&
old->getObjectVersion() != altered_table->getObjectVersion())
dict->putTable(altered_table);
if ((error= unpackfrm(&data, &length, altered_table->getFrmData())) ||
(error= writefrm(key, data, length)))
{
sql_print_information("NDB: Failed write frm for %s.%s, error %d",
dbname, tabname, error);
}
ndbcluster_binlog_close_table(thd, share);
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
if ((error= ndbcluster_binlog_open_table(thd, share,
table_share, table)))
sql_print_information("NDB: Failed to re-open table %s.%s",
dbname, tabname);
pthread_mutex_unlock(&LOCK_open);
sql_print_information("NDB: Failed write frm for %s.%s, error %d",
dbname, tabname, error);
}
ndbcluster_binlog_close_table(thd, share);
close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
if ((error= ndbcluster_binlog_open_table(thd, share,
table_share, table)))
sql_print_information("NDB: Failed to re-open table %s.%s",
dbname, tabname);
pthread_mutex_unlock(&LOCK_open);
}
}
@ -1540,6 +1550,21 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
share->table->s->db.length= strlen(share->db);
share->table->s->table_name.str= share->table_name;
share->table->s->table_name.length= strlen(share->table_name);
/*
Refresh local dictionary cache by invalidating any
old table with same name and all it's indexes
*/
ndb->setDatabaseName(dbname);
Thd_ndb *thd_ndb= get_thd_ndb(thd);
DBUG_ASSERT(thd_ndb != NULL);
Ndb* old_ndb= thd_ndb->ndb;
thd_ndb->ndb= ndb;
ha_ndbcluster table_handler(table_share);
table_handler.set_dbname(share->key);
table_handler.set_tabname(share->key);
table_handler.open_indexes(ndb, table, TRUE);
table_handler.invalidate_dictionary_cache(TRUE, 0);
thd_ndb->ndb= old_ndb;
}
DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
if (share->op_old == pOp)

View file

@ -1745,11 +1745,15 @@ public:
const char * tableName);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
void removeCachedTable(const Table *table);
void removeCachedIndex(const Index *index);
void invalidateTable(const Table *table);
/**
* Invalidate cached index object
*/
void invalidateIndex(const char * indexName,
const char * tableName);
void invalidateIndex(const Index *index);
/**
* Force gcp and wait for gcp complete
*/

View file

@ -5016,12 +5016,13 @@ void Dblqh::packLqhkeyreqLab(Signal* signal)
Uint32 nextNodeId = regTcPtr->nextReplica;
Uint32 nextVersion = getNodeInfo(nextNodeId).m_version;
UintR TAiLen = regTcPtr->reclenAiLqhkey;
UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1);
LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator);
LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec);
LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica);
LqhKeyReq::setAIInLqhKeyReq(Treqinfo, regTcPtr->reclenAiLqhkey);
LqhKeyReq::setAIInLqhKeyReq(Treqinfo, TAiLen);
if (unlikely(nextVersion < NDBD_ROWID_VERSION))
{
@ -5124,22 +5125,32 @@ void Dblqh::packLqhkeyreqLab(Signal* signal)
lqhKeyReq->variableData[nextPos + 0] = sig0;
nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
sig0 = regTcPtr->firstAttrinfo[0];
sig1 = regTcPtr->firstAttrinfo[1];
sig2 = regTcPtr->firstAttrinfo[2];
sig3 = regTcPtr->firstAttrinfo[3];
sig4 = regTcPtr->firstAttrinfo[4];
UintR TAiLen = regTcPtr->reclenAiLqhkey;
BlockReference lqhRef = calcLqhBlockRef(regTcPtr->nextReplica);
if (likely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength <= 25))
{
jam();
sig0 = regTcPtr->firstAttrinfo[0];
sig1 = regTcPtr->firstAttrinfo[1];
sig2 = regTcPtr->firstAttrinfo[2];
sig3 = regTcPtr->firstAttrinfo[3];
sig4 = regTcPtr->firstAttrinfo[4];
lqhKeyReq->variableData[nextPos] = sig0;
lqhKeyReq->variableData[nextPos + 1] = sig1;
lqhKeyReq->variableData[nextPos + 2] = sig2;
lqhKeyReq->variableData[nextPos + 3] = sig3;
lqhKeyReq->variableData[nextPos + 4] = sig4;
nextPos += TAiLen;
lqhKeyReq->variableData[nextPos] = sig0;
lqhKeyReq->variableData[nextPos + 1] = sig1;
lqhKeyReq->variableData[nextPos + 2] = sig2;
lqhKeyReq->variableData[nextPos + 3] = sig3;
lqhKeyReq->variableData[nextPos + 4] = sig4;
nextPos += TAiLen;
TAiLen = 0;
}
else
{
Treqinfo &= ~(Uint32)(RI_AI_IN_THIS_MASK << RI_AI_IN_THIS_SHIFT);
lqhKeyReq->requestInfo = Treqinfo;
}
sendSignal(lqhRef, GSN_LQHKEYREQ, signal,
nextPos + LqhKeyReq::FixedSignalLength, JBB);
if (regTcPtr->primKeyLen > 4) {
@ -5165,6 +5176,17 @@ void Dblqh::packLqhkeyreqLab(Signal* signal)
signal->theData[0] = sig0;
signal->theData[1] = sig1;
signal->theData[2] = sig2;
if (unlikely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength > 25))
{
jam();
/**
* 4 replicas...
*/
memcpy(signal->theData+3, regTcPtr->firstAttrinfo, TAiLen << 2);
sendSignal(lqhRef, GSN_ATTRINFO, signal, 3 + TAiLen, JBB);
}
AttrbufPtr regAttrinbufptr;
regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
while (regAttrinbufptr.i != RNIL) {

View file

@ -312,11 +312,12 @@ void AsyncFile::openReq(Request* request)
Uint32 new_flags = 0;
// Convert file open flags from Solaris to Liux
if(flags & FsOpenReq::OM_CREATE){
if (flags & FsOpenReq::OM_CREATE)
{
new_flags |= O_CREAT;
}
if(flags & FsOpenReq::OM_TRUNCATE){
if (flags & FsOpenReq::OM_TRUNCATE){
#if 0
if(Global_unlinkO_CREAT){
unlink(theFileName.c_str());
@ -330,25 +331,25 @@ void AsyncFile::openReq(Request* request)
m_syncFrequency = 1024*1024; // Hard coded to 1M
}
if(flags & FsOpenReq::OM_APPEND){
if (flags & FsOpenReq::OM_APPEND){
new_flags |= O_APPEND;
}
if((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
{
#ifdef O_SYNC
new_flags |= O_SYNC;
#endif
}
#ifndef NDB_NO_O_DIRECT /* to allow tmpfs */
//#ifndef NDB_NO_O_DIRECT /* to allow tmpfs */
#ifdef O_DIRECT
if (flags & FsOpenReq::OM_DIRECT)
{
new_flags |= O_DIRECT;
}
#endif
#endif
//#endif
switch(flags & 0x3){
case FsOpenReq::OM_READONLY:
@ -370,44 +371,73 @@ void AsyncFile::openReq(Request* request)
const int mode = S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP |
S_IROTH | S_IWOTH;
if(flags & FsOpenReq::OM_CREATE_IF_NONE){
if((theFd = ::open(theFileName.c_str(), new_flags, mode)) != -1) {
if (flags & FsOpenReq::OM_CREATE_IF_NONE)
{
Uint32 tmp_flags = new_flags;
#ifdef O_DIRECT
tmp_flags &= ~O_DIRECT;
#endif
if ((theFd = ::open(theFileName.c_str(), tmp_flags, mode)) != -1)
{
close(theFd);
request->error = FsRef::fsErrFileExists;
return;
}
new_flags |= O_CREAT;
}
if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) {
no_odirect:
if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode)))
{
PRINT_ERRORANDFLAGS(new_flags);
if( (errno == ENOENT ) && (new_flags & O_CREAT ) ) {
if ((errno == ENOENT) && (new_flags & O_CREAT))
{
createDirectories();
if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) {
if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode)))
{
#ifdef O_DIRECT
if (new_flags & O_DIRECT)
{
new_flags &= ~O_DIRECT;
goto no_odirect;
}
#endif
PRINT_ERRORANDFLAGS(new_flags);
request->error = errno;
return;
}
} else {
}
#ifdef O_DIRECT
else if (new_flags & O_DIRECT)
{
new_flags &= ~O_DIRECT;
goto no_odirect;
}
#endif
else
{
request->error = errno;
return;
}
}
if(flags & FsOpenReq::OM_CHECK_SIZE)
if (flags & FsOpenReq::OM_CHECK_SIZE)
{
struct stat buf;
if((fstat(theFd, &buf) == -1))
if ((fstat(theFd, &buf) == -1))
{
request->error = errno;
} else if(buf.st_size != request->par.open.file_size){
}
else if(buf.st_size != request->par.open.file_size)
{
request->error = FsRef::fsErrInvalidFileSize;
}
if(request->error)
if (request->error)
return;
}
if(flags & FsOpenReq::OM_INIT){
if (flags & FsOpenReq::OM_INIT)
{
off_t off = 0;
const off_t sz = request->par.open.file_size;
Uint32 tmp[sizeof(SignalHeader)+25];

View file

@ -772,17 +772,17 @@ NdbDictionary::Index::getLogging() const {
NdbDictionary::Object::Status
NdbDictionary::Index::getObjectStatus() const {
return m_impl.m_status;
return m_impl.m_table->m_status;
}
int
NdbDictionary::Index::getObjectVersion() const {
return m_impl.m_version;
return m_impl.m_table->m_version;
}
int
NdbDictionary::Index::getObjectId() const {
return m_impl.m_id;
return m_impl.m_table->m_id;
}
@ -1395,6 +1395,12 @@ NdbDictionary::Dictionary::invalidateTable(const char * name){
DBUG_VOID_RETURN;
}
void
NdbDictionary::Dictionary::invalidateTable(const Table *table){
NdbTableImpl &t = NdbTableImpl::getImpl(*table);
m_impl.invalidateObject(t);
}
void
NdbDictionary::Dictionary::removeCachedTable(const char * name){
NdbTableImpl * t = m_impl.getTable(name);
@ -1402,6 +1408,12 @@ NdbDictionary::Dictionary::removeCachedTable(const char * name){
m_impl.removeCachedObject(* t);
}
void
NdbDictionary::Dictionary::removeCachedTable(const Table *table){
NdbTableImpl &t = NdbTableImpl::getImpl(*table);
m_impl.removeCachedObject(t);
}
int
NdbDictionary::Dictionary::createIndex(const Index & ind)
{
@ -1425,6 +1437,15 @@ NdbDictionary::Dictionary::getIndex(const char * indexName,
return 0;
}
void
NdbDictionary::Dictionary::invalidateIndex(const Index *index){
DBUG_ENTER("NdbDictionary::Dictionary::invalidateIndex");
NdbIndexImpl &i = NdbIndexImpl::getImpl(*index);
assert(i.m_table != 0);
m_impl.invalidateObject(* i.m_table);
DBUG_VOID_RETURN;
}
void
NdbDictionary::Dictionary::invalidateIndex(const char * indexName,
const char * tableName){
@ -1443,6 +1464,15 @@ NdbDictionary::Dictionary::forceGCPWait()
return m_impl.forceGCPWait();
}
void
NdbDictionary::Dictionary::removeCachedIndex(const Index *index){
DBUG_ENTER("NdbDictionary::Dictionary::removeCachedIndex");
NdbIndexImpl &i = NdbIndexImpl::getImpl(*index);
assert(i.m_table != 0);
m_impl.removeCachedObject(* i.m_table);
DBUG_VOID_RETURN;
}
void
NdbDictionary::Dictionary::removeCachedIndex(const char * indexName,
const char * tableName){

View file

@ -3840,9 +3840,10 @@ NdbDictionaryImpl::dropBlobEvents(const NdbEventImpl& evnt)
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
n--;
char bename[MAX_TAB_NAME_SIZE];
NdbBlob::getBlobEventName(bename, &evnt, &c);
(void)dropEvent(bename);
NdbEventImpl* blob_evnt = getBlobEvent(evnt, i);
if (blob_evnt == NULL)
continue;
(void)dropEvent(*blob_evnt);
}
} else {
// loop over MAX_ATTRIBUTES_IN_TABLE ...

View file

@ -1279,6 +1279,23 @@ find_bucket(Vector<Gci_container> * active, Uint64 gci)
return find_bucket_chained(active,gci);
}
static
void
crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
const SubGcpCompleteRep * const rep,
Uint32 nodes)
{
Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
ndbout_c("INVALID SUB_GCP_COMPLETE_REP");
ndbout_c("gci: %d", rep->gci);
ndbout_c("sender: %x", rep->senderRef);
ndbout_c("count: %d", rep->gcp_complete_rep_count);
ndbout_c("bucket count: %u", old_cnt);
ndbout_c("nodes: %u", nodes);
abort();
}
void
NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
{
@ -1317,9 +1334,13 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
old_cnt = m_system_nodes;
}
assert(old_cnt >= cnt);
//assert(old_cnt >= cnt);
if (unlikely(! (old_cnt >= cnt)))
{
crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, m_system_nodes);
}
bucket->m_gcp_complete_rep_count = old_cnt - cnt;
if(old_cnt == cnt)
{
if(likely(gci == m_latestGCI + 1 || m_latestGCI == 0))

View file

@ -1435,8 +1435,7 @@ NdbTransaction::sendTC_COMMIT_ACK(TransporterFacade *tp,
Uint32 * dataPtr = aSignal->getDataPtrSend();
dataPtr[0] = transId1;
dataPtr[1] = transId2;
tp->sendSignal(aSignal, refToNode(aTCRef));
tp->sendSignalUnCond(aSignal, refToNode(aTCRef));
}
int

View file

@ -343,7 +343,7 @@ execute(void * callbackObj, SignalHeader * const header,
Uint32 aNodeId= refToNode(ref);
tSignal.theReceiversBlockNumber= refToBlock(ref);
tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
theFacade->sendSignal(&tSignal, aNodeId);
theFacade->sendSignalUnCond(&tSignal, aNodeId);
}
break;
}
@ -987,7 +987,7 @@ TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
LinearSectionPtr ptr[3];
signalLogger.sendSignal(* aSignal,
1,
aSignal->getDataPtr(),
tDataPtr,
aNode, ptr, 0);
signalLogger.flushSignalLog();
aSignal->theSendersBlockRef = tmp;
@ -1014,6 +1014,7 @@ TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
int
TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
Uint32* tDataPtr = aSignal->getDataPtrSend();
#ifdef API_TRACE
if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
Uint32 tmp = aSignal->theSendersBlockRef;
@ -1021,7 +1022,7 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
LinearSectionPtr ptr[3];
signalLogger.sendSignal(* aSignal,
0,
aSignal->getDataPtr(),
tDataPtr,
aNode, ptr, 0);
signalLogger.flushSignalLog();
aSignal->theSendersBlockRef = tmp;
@ -1032,7 +1033,7 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
(aSignal->theReceiversBlockNumber != 0));
SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
0,
aSignal->getDataPtr(),
tDataPtr,
aNode,
0);

View file

@ -175,7 +175,8 @@ private:
friend class GrepSS;
friend class Ndb;
friend class Ndb_cluster_connection_impl;
friend class NdbTransaction;
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
bool isConnected(NodeId aNodeId);