References: lp:1144911 - merged fix for prepared statement processing from upstream.

Merged fix is revision 3853 in lp:~codership/codership-mysql/5.5-23
This commit is contained in:
Seppo Jaakola 2013-03-05 00:01:20 +02:00
parent 4174f1a474
commit 972acdb164
2 changed files with 105 additions and 74 deletions

View file

@ -6052,6 +6052,89 @@ void mysql_init_multi_delete(LEX *lex)
}
#ifdef WITH_WSREP
void wsrep_replay_transaction(THD *thd)
{
/* checking if BF trx must be replayed */
if (thd->wsrep_conflict_state== MUST_REPLAY)
{
if (thd->wsrep_exec_mode!= REPL_RECV)
{
if (thd->stmt_da->is_sent)
{
WSREP_ERROR("replay issue, thd has reported status already");
}
thd->stmt_da->reset_diagnostics_area();
thd->wsrep_conflict_state= REPLAYING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
mysql_reset_thd_for_next_command(thd, opt_userstat_running);
thd->killed= NOT_KILLED;
close_thread_tables(thd);
if (thd->locked_tables_mode && thd->lock)
{
WSREP_DEBUG("releasing table lock for replaying (%ld)",
thd->thread_id);
thd->locked_tables_list.unlock_locked_tables(thd);
thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
}
thd->mdl_context.release_transactional_locks();
thd_proc_info(thd, "wsrep replaying trx");
WSREP_DEBUG("replay trx: %s %lld",
thd->query() ? thd->query() : "void",
(long long)thd->wsrep_trx_seqno);
struct wsrep_thd_shadow shadow;
wsrep_prepare_bf_thd(thd, &shadow);
int rcode = wsrep->replay_trx(wsrep,
&thd->wsrep_trx_handle,
(void *)thd);
wsrep_return_from_bf_mode(thd, &shadow);
if (thd->wsrep_conflict_state!= REPLAYING)
WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch (rcode)
{
case WSREP_OK:
thd->wsrep_conflict_state= NO_CONFLICT;
wsrep->post_commit(wsrep, &thd->wsrep_trx_handle);
WSREP_DEBUG("trx_replay successful for: %ld %llu",
thd->thread_id, (long long)thd->real_id);
break;
case WSREP_TRX_FAIL:
if (thd->stmt_da->is_sent)
{
WSREP_ERROR("replay failed, thd has reported status");
}
else
{
WSREP_DEBUG("replay failed, rolling back");
my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
}
thd->wsrep_conflict_state= ABORTED;
thd->wsrep_bf_thd = NULL;
wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle);
break;
default:
WSREP_ERROR("trx_replay failed for: %d, query: %s",
rcode, thd->query() ? thd->query() : "void");
/* we're now in inconsistent state, must abort */
unireg_abort(1);
break;
}
mysql_mutex_lock(&LOCK_wsrep_replaying);
wsrep_replaying--;
WSREP_DEBUG("replaying decreased: %d, thd: %lu",
wsrep_replaying, thd->thread_id);
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
}
}
}
static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
Parser_state *parser_state)
{
@ -6078,79 +6161,9 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
WSREP_DEBUG("abort in exec query state, avoiding autocommit");
}
/* checking if BF trx must be replayed */
if (thd->wsrep_conflict_state== MUST_REPLAY) {
if (thd->wsrep_exec_mode!= REPL_RECV) {
if (thd->stmt_da->is_sent) {
WSREP_ERROR("replay issue, thd has reported status already");
}
thd->stmt_da->reset_diagnostics_area();
thd->wsrep_conflict_state= REPLAYING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
mysql_reset_thd_for_next_command(thd, opt_userstat_running);
thd->killed= NOT_KILLED;
close_thread_tables(thd);
if (thd->locked_tables_mode && thd->lock)
{
WSREP_DEBUG("releasing table lock for replaying (%ld)",
thd->thread_id);
thd->locked_tables_list.unlock_locked_tables(thd);
thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
}
thd->mdl_context.release_transactional_locks();
thd_proc_info(thd, "wsrep replaying trx");
WSREP_DEBUG("replay trx: %s %lld",
thd->query() ? thd->query() : "void",
(long long)thd->wsrep_trx_seqno);
struct wsrep_thd_shadow shadow;
wsrep_prepare_bf_thd(thd, &shadow);
int rcode = wsrep->replay_trx(wsrep,
&thd->wsrep_trx_handle,
(void *)thd);
wsrep_return_from_bf_mode(thd, &shadow);
if (thd->wsrep_conflict_state!= REPLAYING)
WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch (rcode) {
case WSREP_OK:
thd->wsrep_conflict_state= NO_CONFLICT;
wsrep->post_commit(wsrep, &thd->wsrep_trx_handle);
WSREP_DEBUG("trx_replay successful for: %ld %llu",
thd->thread_id, (long long)thd->real_id);
break;
case WSREP_TRX_FAIL:
if (thd->stmt_da->is_sent) {
WSREP_ERROR("replay failed, thd has reported status");
}
else
{
WSREP_DEBUG("replay failed, rolling back");
my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
}
thd->wsrep_conflict_state= ABORTED;
thd->wsrep_bf_thd = NULL;
wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle);
break;
default:
WSREP_ERROR("trx_replay failed for: %d, query: %s",
rcode, thd->query() ? thd->query() : "void");
/* we're now in inconsistent state, must abort */
unireg_abort(1);
break;
}
mysql_mutex_lock(&LOCK_wsrep_replaying);
wsrep_replaying--;
WSREP_DEBUG("replaying decreased: %d, thd: %lu",
wsrep_replaying, thd->thread_id);
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
}
if (thd->wsrep_conflict_state== MUST_REPLAY)
{
wsrep_replay_transaction(thd);
}
/* setting error code for BF aborted trxs */

View file

@ -3483,7 +3483,9 @@ Prepared_statement::set_parameters(String *expanded_query,
return res;
}
#ifdef WITH_WSREP
void wsrep_replay_transaction(THD *thd);
#endif /* WITH_WSREP */
/**
Execute a prepared statement. Re-prepare it a limited number
of times if necessary.
@ -3563,6 +3565,22 @@ reexecute:
error= execute(expanded_query, open_cursor) || thd->is_error();
thd->m_reprepare_observer= NULL;
#ifdef WITH_WSREP
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch (thd->wsrep_conflict_state)
{
case CERT_FAILURE:
WSREP_DEBUG("PS execute fail for CERT_FAILURE: thd: %ld err: %d",
thd->thread_id, thd->stmt_da->sql_errno() );
thd->wsrep_conflict_state = NO_CONFLICT;
break;
case MUST_REPLAY:
(void)wsrep_replay_transaction(thd);
default: break;
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
#endif /* WITH_WSREP */
if (error && !thd->is_fatal_error && !thd->killed &&
reprepare_observer.is_invalidated() &&