Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb

into  whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb-bj


sql/ha_ndbcluster.cc:
  Auto merged
This commit is contained in:
unknown 2007-09-12 13:55:45 +02:00
commit 85b4a64cdf
60 changed files with 1582 additions and 169 deletions

View file

@ -329,6 +329,7 @@ Thd_ndb::Thd_ndb()
all= NULL;
stmt= NULL;
m_error= FALSE;
m_error_code= 0;
query_state&= NDB_QUERY_NORMAL;
options= 0;
(void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0,
@ -364,6 +365,7 @@ Thd_ndb::init_open_tables()
{
count= 0;
m_error= FALSE;
m_error_code= 0;
my_hash_reset(&open_tables);
}
@ -487,6 +489,7 @@ void ha_ndbcluster::no_uncommitted_rows_execute_failure()
return;
DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_execute_failure");
get_thd_ndb(current_thd)->m_error= TRUE;
get_thd_ndb(current_thd)->m_error_code= 0;
DBUG_VOID_RETURN;
}
@ -2084,9 +2087,15 @@ int ha_ndbcluster::unique_index_read(const uchar *key,
if (execute_no_commit_ie(this,trans,FALSE) != 0 ||
op->getNdbError().code)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
int err= ndb_err(trans);
if(err==HA_ERR_KEY_NOT_FOUND)
table->status= STATUS_NOT_FOUND;
else
table->status= STATUS_GARBAGE;
DBUG_RETURN(err);
}
// The value have now been fetched from NDB
unpack_record(buf);
table->status= 0;
@ -8068,9 +8077,9 @@ int handle_trailing_share(NDB_SHARE *share)
}
}
sql_print_error("NDB_SHARE: %s already exists use_count=%d."
" Moving away for safety, but possible memleak.",
share->key, share->use_count);
sql_print_warning("NDB_SHARE: %s already exists use_count=%d."
" Moving away for safety, but possible memleak.",
share->key, share->use_count);
dbug_print_open_tables();
/*
@ -8266,7 +8275,15 @@ NDB_SHARE *ndbcluster_get_share(const char *key, TABLE *table,
share->table_name= share->db + strlen(share->db) + 1;
ha_ndbcluster::set_tabname(key, share->table_name);
#ifdef HAVE_NDB_BINLOG
ndbcluster_binlog_init_share(share, table);
if (ndbcluster_binlog_init_share(share, table))
{
DBUG_PRINT("error", ("get_share: %s could not init share", key));
ndbcluster_real_free_share(&share);
*root_ptr= old_root;
if (!have_lock)
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
#endif
*root_ptr= old_root;
}

View file

@ -264,12 +264,13 @@ static void run_query(THD *thd, char *buf, char *end,
int i;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
for (i= 0; no_print_error[i]; i++)
if (thd_ndb->m_error == no_print_error[i])
if ((thd_ndb->m_error_code == no_print_error[i]) ||
(thd->net.last_errno == (unsigned)no_print_error[i]))
break;
if (!no_print_error[i])
sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
buf, thd->net.last_error, thd->net.last_errno,
thd_ndb->m_error,
thd_ndb->m_error_code,
thd->net.report_error, thd->query_error);
}
@ -324,18 +325,14 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
share->key);
if ((error= open_table_def(thd, table_share, 0)))
{
sql_print_error("Unable to get table share for %s, error=%d",
share->key, error);
DBUG_PRINT("error", ("open_table_def failed %d", error));
DBUG_PRINT("error", ("open_table_def failed: %d my_errno: %d", error, my_errno));
free_table_share(table_share);
DBUG_RETURN(error);
}
if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,
(uint) READ_ALL, 0, table, FALSE)))
{
sql_print_error("Unable to open table for %s, error=%d(%d)",
share->key, error, my_errno);
DBUG_PRINT("error", ("open_table_from_share failed %d", error));
DBUG_PRINT("error", ("open_table_from_share failed %d my_errno: %d", error, my_errno));
free_table_share(table_share);
DBUG_RETURN(error);
}
@ -381,11 +378,12 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
/*
Initialize the binlog part of the NDB_SHARE
*/
void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
int ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
{
THD *thd= current_thd;
MEM_ROOT *mem_root= &share->mem_root;
int do_event_op= ndb_binlog_running;
int error= 0;
DBUG_ENTER("ndbcluster_binlog_init_share");
share->connect_count= g_ndb_cluster_connection->get_connect_count();
@ -428,7 +426,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
{
share->flags|= NSF_NO_BINLOG;
}
DBUG_VOID_RETURN;
DBUG_RETURN(error);
}
while (1)
{
@ -455,7 +453,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
share->flags|= NSF_BLOB_FLAG;
break;
}
DBUG_VOID_RETURN;
DBUG_RETURN(error);
}
/*****************************************************************
@ -779,7 +777,10 @@ static int ndbcluster_create_ndb_apply_status_table(THD *thd)
" end_pos BIGINT UNSIGNED NOT NULL, "
" PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
const int no_print_error[3]= {701, 4009, 0}; // do not print error 701 etc
const int no_print_error[4]= {ER_TABLE_EXISTS_ERROR,
701,
4009,
0}; // do not print error 701 etc
run_query(thd, buf, end, no_print_error, TRUE);
DBUG_RETURN(0);
@ -836,7 +837,10 @@ static int ndbcluster_create_schema_table(THD *thd)
" type INT UNSIGNED NOT NULL,"
" PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB");
const int no_print_error[3]= {701, 4009, 0}; // do not print error 701 etc
const int no_print_error[4]= {ER_TABLE_EXISTS_ERROR,
701,
4009,
0}; // do not print error 701 etc
run_query(thd, buf, end, no_print_error, TRUE);
DBUG_RETURN(0);
@ -3078,7 +3082,7 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
/* ToDo; handle error? */
if (share && share->op &&
share->op->getState() == NdbEventOperation::EO_EXECUTING &&
dict->getNdbError().code != 4009)
dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
{
DBUG_ASSERT(FALSE);
DBUG_RETURN(-1);
@ -3205,11 +3209,6 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
NDBEVENT::TableEvent type= pOp->getEventType();
/* make sure to flush any pending events as they can be dependent
on one of the tables being changed below
*/
thd->binlog_flush_pending_rows_event(TRUE);
switch (type)
{
case NDBEVENT::TE_CLUSTER_FAILURE:

View file

@ -124,7 +124,7 @@ void ndbcluster_binlog_init_handlerton();
/*
Initialize the binlog part of the NDB_SHARE
*/
void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table);
int ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table);
bool ndbcluster_check_if_local_table(const char *dbname, const char *tabname);
bool ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);

View file

@ -132,6 +132,11 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
const char* table_name, bool overwrite);
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi);
static Log_event* next_event(RELAY_LOG_INFO* rli);
static int terminate_slave_thread(THD *thd,
pthread_mutex_t* term_lock,
pthread_cond_t* term_cond,
volatile uint *slave_running,
bool skip_lock);
/*
Find out which replications threads are running
@ -312,35 +317,26 @@ int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
DBUG_RETURN(0); /* successfully do nothing */
int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
pthread_mutex_t *sql_cond_lock,*io_cond_lock;
sql_cond_lock=sql_lock;
io_cond_lock=io_lock;
if (skip_lock)
{
sql_lock = io_lock = 0;
}
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
{
DBUG_PRINT("info",("Terminating IO thread"));
mi->abort_slave=1;
if ((error=terminate_slave_thread(mi->io_thd,io_lock,
io_cond_lock,
&mi->stop_cond,
&mi->slave_running)) &&
&mi->slave_running,
skip_lock)) &&
!force_all)
DBUG_RETURN(error);
}
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
{
DBUG_PRINT("info",("Terminating SQL thread"));
DBUG_ASSERT(mi->rli.sql_thd != 0) ;
mi->rli.abort_slave=1;
if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
sql_cond_lock,
&mi->rli.stop_cond,
&mi->rli.slave_running)) &&
&mi->rli.slave_running,
skip_lock)) &&
!force_all)
DBUG_RETURN(error);
}
@ -348,23 +344,60 @@ int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
}
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
pthread_mutex_t *cond_lock,
pthread_cond_t* term_cond,
volatile uint *slave_running)
/**
Wait for a slave thread to terminate.
This function is called after requesting the thread to terminate
(by setting @c abort_slave member of @c Relay_log_info or @c
Master_info structure to 1). Termination of the thread is
controlled with the the predicate <code>*slave_running</code>.
Function will acquire @c term_lock before waiting on the condition
unless @c skip_lock is true in which case the mutex should be owned
by the caller of this function and will remain acquired after
return from the function.
@param term_lock
Associated lock to use when waiting for @c term_cond
@param term_cond
Condition that is signalled when the thread has terminated
@param slave_running
Pointer to predicate to check for slave thread termination
@param skip_lock
If @c true the lock will not be acquired before waiting on
the condition. In this case, it is assumed that the calling
function acquires the lock before calling this function.
@retval 0 All OK
*/
static int
terminate_slave_thread(THD *thd,
pthread_mutex_t* term_lock,
pthread_cond_t* term_cond,
volatile uint *slave_running,
bool skip_lock)
{
int error;
DBUG_ENTER("terminate_slave_thread");
if (term_lock)
{
if (!skip_lock)
pthread_mutex_lock(term_lock);
if (!*slave_running)
{
safe_mutex_assert_owner(term_lock);
if (!*slave_running)
{
if (!skip_lock)
pthread_mutex_unlock(term_lock);
DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
}
DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
}
DBUG_ASSERT(thd != 0);
THD_CHECK_SENTRY(thd);
/*
Is is critical to test if the slave is running. Otherwise, we might
be referening freed memory trying to kick it
@ -380,9 +413,13 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
*/
struct timespec abstime;
set_timespec(abstime,2);
pthread_cond_timedwait(term_cond, cond_lock, &abstime);
error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
DBUG_ASSERT(error == ETIMEDOUT || error == 0);
}
if (term_lock)
DBUG_ASSERT(*slave_running == 0);
if (!skip_lock)
pthread_mutex_unlock(term_lock);
DBUG_RETURN(0);
}

View file

@ -133,10 +133,6 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli);
int register_slave_on_master(MYSQL* mysql);
int terminate_slave_threads(MASTER_INFO* mi, int thread_mask,
bool skip_lock = 0);
int terminate_slave_thread(THD* thd, pthread_mutex_t* term_mutex,
pthread_mutex_t* cond_lock,
pthread_cond_t* term_cond,
volatile uint* slave_running);
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname, int thread_mask);