mirror of
https://github.com/MariaDB/server.git
synced 2025-01-27 01:04:19 +01:00
653 lines
14 KiB
C++
653 lines
14 KiB
C++
/* Copyright (C) 2003 MySQL AB
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
|
|
#include "consumer_restore.hpp"
|
|
#include <NdbSleep.h>
|
|
|
|
extern FilteredNdbOut err;
|
|
extern FilteredNdbOut info;
|
|
extern FilteredNdbOut debug;
|
|
|
|
static bool asynchErrorHandler(NdbTransaction * trans, Ndb * ndb);
|
|
static void callback(int result, NdbTransaction* trans, void* aObject);
|
|
|
|
bool
|
|
BackupRestore::init()
|
|
{
|
|
|
|
if (!m_restore && !m_restore_meta)
|
|
return true;
|
|
|
|
m_ndb = new Ndb();
|
|
|
|
if (m_ndb == NULL)
|
|
return false;
|
|
|
|
// Turn off table name completion
|
|
m_ndb->useFullyQualifiedNames(false);
|
|
|
|
m_ndb->init(1024);
|
|
if (m_ndb->waitUntilReady(30) != 0)
|
|
{
|
|
ndbout << "Failed to connect to ndb!!" << endl;
|
|
return false;
|
|
}
|
|
ndbout << "Connected to ndb!!" << endl;
|
|
|
|
#if USE_MYSQL
|
|
if(use_mysql)
|
|
{
|
|
if ( mysql_thread_safe() == 0 )
|
|
{
|
|
ndbout << "Not thread safe mysql library..." << endl;
|
|
exit(-1);
|
|
}
|
|
|
|
ndbout << "Connecting to MySQL..." <<endl;
|
|
|
|
/**
|
|
* nwe param:
|
|
* port
|
|
* host
|
|
* user
|
|
*/
|
|
bool returnValue = true;
|
|
mysql_init(&mysql);
|
|
{
|
|
int portNo = 3306;
|
|
if ( mysql_real_connect(&mysql,
|
|
ga_host,
|
|
ga_user,
|
|
ga_password,
|
|
ga_database,
|
|
ga_port,
|
|
:: ga_socket,
|
|
0) == NULL )
|
|
{
|
|
ndbout_c("Connect failed: %s", mysql_error(&mysql));
|
|
returnValue = false;
|
|
}
|
|
mysql.reconnect= 1;
|
|
ndbout << "Connected to MySQL!!!" <<endl;
|
|
}
|
|
|
|
/* if(returnValue){
|
|
mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
|
|
}
|
|
*/
|
|
return returnValue;
|
|
}
|
|
#endif
|
|
|
|
if (m_callback) {
|
|
delete [] m_callback;
|
|
m_callback = 0;
|
|
}
|
|
|
|
m_callback = new restore_callback_t[m_parallelism];
|
|
|
|
if (m_callback == 0)
|
|
{
|
|
ndbout << "Failed to allocate callback structs" << endl;
|
|
return false;
|
|
}
|
|
|
|
m_free_callback = m_callback;
|
|
for (int i= 0; i < m_parallelism; i++) {
|
|
m_callback[i].restore = this;
|
|
m_callback[i].connection = 0;
|
|
m_callback[i].retries = 0;
|
|
if (i > 0)
|
|
m_callback[i-1].next = &(m_callback[i]);
|
|
}
|
|
m_callback[m_parallelism-1].next = 0;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
BackupRestore::~BackupRestore()
|
|
{
|
|
if (m_ndb != 0)
|
|
delete m_ndb;
|
|
|
|
if (m_callback)
|
|
delete [] m_callback;
|
|
}
|
|
|
|
#ifdef USE_MYSQL
|
|
bool
|
|
BackupRestore::table(const TableS & table, MYSQL * mysqlp){
|
|
if (!m_restore_meta)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
char tmpTabName[MAX_TAB_NAME_SIZE*2];
|
|
sprintf(tmpTabName, "%s", table.getTableName());
|
|
char * database = strtok(tmpTabName, "/");
|
|
char * schema = strtok( NULL , "/");
|
|
char * tableName = strtok( NULL , "/");
|
|
|
|
/**
|
|
* this means that the user did not specify schema
|
|
* and it is a v2x backup
|
|
*/
|
|
if(database == NULL)
|
|
return false;
|
|
if(schema == NULL)
|
|
return false;
|
|
if(tableName==NULL)
|
|
tableName = schema;
|
|
|
|
char stmtCreateDB[255];
|
|
sprintf(stmtCreateDB,"CREATE DATABASE %s", database);
|
|
|
|
/*ignore return value. mysql_select_db will trap errors anyways*/
|
|
if (mysql_query(mysqlp,stmtCreateDB) == 0)
|
|
{
|
|
//ndbout_c("%s", stmtCreateDB);
|
|
}
|
|
|
|
if (mysql_select_db(&mysql, database) != 0)
|
|
{
|
|
ndbout_c("Error: %s", mysql_error(&mysql));
|
|
return false;
|
|
}
|
|
|
|
char buf [2048];
|
|
/**
|
|
* create table ddl
|
|
*/
|
|
if (create_table_string(table, tableName, buf))
|
|
{
|
|
ndbout_c("Unable to create a table definition since the "
|
|
"backup contains undefined types");
|
|
return false;
|
|
}
|
|
|
|
//ndbout_c("%s", buf);
|
|
|
|
if (mysql_query(mysqlp,buf) != 0)
|
|
{
|
|
ndbout_c("Error: %s", mysql_error(&mysql));
|
|
return false;
|
|
} else
|
|
{
|
|
ndbout_c("Successfully restored table %s into database %s", tableName, database);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
#endif
|
|
|
|
bool
|
|
BackupRestore::table(const TableS & table){
|
|
if (!m_restore_meta)
|
|
{
|
|
return true;
|
|
}
|
|
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
|
|
if (dict->createTable(*table.m_dictTable) == -1)
|
|
{
|
|
err << "Create table " << table.getTableName() << " failed: "
|
|
<< dict->getNdbError() << endl;
|
|
return false;
|
|
}
|
|
info << "Successfully restored table " << table.getTableName()<< endl ;
|
|
return true;
|
|
}
|
|
|
|
void BackupRestore::tuple(const TupleS & tup)
|
|
{
|
|
if (!m_restore)
|
|
{
|
|
delete &tup;
|
|
return;
|
|
}
|
|
|
|
restore_callback_t * cb = m_free_callback;
|
|
|
|
if (cb)
|
|
{
|
|
m_free_callback = cb->next;
|
|
cb->retries = 0;
|
|
cb->tup = &tup;
|
|
tuple_a(cb);
|
|
}
|
|
|
|
if (m_free_callback == 0)
|
|
{
|
|
// send-poll all transactions
|
|
// close transaction is done in callback
|
|
m_ndb->sendPollNdb(3000, 1);
|
|
}
|
|
}
|
|
|
|
void BackupRestore::tuple_a(restore_callback_t *cb)
|
|
{
|
|
while (cb->retries < 10)
|
|
{
|
|
/**
|
|
* start transactions
|
|
*/
|
|
cb->connection = m_ndb->startTransaction();
|
|
if (cb->connection == NULL)
|
|
{
|
|
/*
|
|
if (asynchErrorHandler(cb->connection, m_ndb))
|
|
{
|
|
cb->retries++;
|
|
continue;
|
|
}
|
|
*/
|
|
asynchExitHandler();
|
|
} // if
|
|
|
|
const TupleS &tup = *(cb->tup);
|
|
const TableS * table = tup.getTable();
|
|
NdbOperation * op = cb->connection->getNdbOperation(table->getTableName());
|
|
|
|
if (op == NULL)
|
|
{
|
|
if (asynchErrorHandler(cb->connection, m_ndb))
|
|
{
|
|
cb->retries++;
|
|
continue;
|
|
}
|
|
asynchExitHandler();
|
|
} // if
|
|
|
|
if (op->writeTuple() == -1)
|
|
{
|
|
if (asynchErrorHandler(cb->connection, m_ndb))
|
|
{
|
|
cb->retries++;
|
|
continue;
|
|
}
|
|
asynchExitHandler();
|
|
} // if
|
|
|
|
Uint32 ret = 0;
|
|
for (int i = 0; i < tup.getNoOfAttributes(); i++)
|
|
{
|
|
const AttributeS * attr = tup[i];
|
|
int size = attr->Desc->size;
|
|
int arraySize = attr->Desc->arraySize;
|
|
char * dataPtr = attr->Data.string_value;
|
|
Uint32 length = (size * arraySize) / 8;
|
|
if (attr->Desc->m_column->getPrimaryKey())
|
|
{
|
|
ret = op->equal(i, dataPtr, length);
|
|
}
|
|
else
|
|
{
|
|
if (attr->Data.null)
|
|
ret = op->setValue(i, NULL, 0);
|
|
else
|
|
ret = op->setValue(i, dataPtr, length);
|
|
}
|
|
|
|
if (ret<0)
|
|
{
|
|
ndbout_c("Column: %d type %d",i,
|
|
tup.getTable()->m_dictTable->getColumn(i)->getType());
|
|
if (asynchErrorHandler(cb->connection, m_ndb))
|
|
{
|
|
cb->retries++;
|
|
break;
|
|
}
|
|
asynchExitHandler();
|
|
}
|
|
}
|
|
if (ret < 0)
|
|
continue;
|
|
|
|
// Prepare transaction (the transaction is NOT yet sent to NDB)
|
|
cb->connection->executeAsynchPrepare(Commit, &callback, cb);
|
|
m_transactions++;
|
|
}
|
|
ndbout_c("Unable to recover from errors. Exiting...");
|
|
asynchExitHandler();
|
|
}
|
|
|
|
void BackupRestore::cback(int result, restore_callback_t *cb)
|
|
{
|
|
if (result<0)
|
|
{
|
|
/**
|
|
* Error. temporary or permanent?
|
|
*/
|
|
if (asynchErrorHandler(cb->connection, m_ndb))
|
|
{
|
|
cb->retries++;
|
|
tuple_a(cb);
|
|
}
|
|
else
|
|
{
|
|
ndbout_c("Restore: Failed to restore data "
|
|
"due to a unrecoverable error. Exiting...");
|
|
delete m_ndb;
|
|
delete cb->tup;
|
|
exit(-1);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
/**
|
|
* OK! close transaction
|
|
*/
|
|
m_ndb->closeTransaction(cb->connection);
|
|
delete cb->tup;
|
|
m_transactions--;
|
|
}
|
|
}
|
|
|
|
void BackupRestore::asynchExitHandler()
|
|
{
|
|
if (m_ndb != NULL)
|
|
delete m_ndb;
|
|
exit(-1);
|
|
}
|
|
|
|
#if 0 // old tuple impl
|
|
void
|
|
BackupRestore::tuple(const TupleS & tup)
|
|
{
|
|
if (!m_restore)
|
|
return;
|
|
while (1)
|
|
{
|
|
NdbTransaction * trans = m_ndb->startTransaction();
|
|
if (trans == NULL)
|
|
{
|
|
// Deep shit, TODO: handle the error
|
|
ndbout << "Cannot start transaction" << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
const TableS * table = tup.getTable();
|
|
NdbOperation * op = trans->getNdbOperation(table->getTableName());
|
|
if (op == NULL)
|
|
{
|
|
ndbout << "Cannot get operation: ";
|
|
ndbout << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
// TODO: check return value and handle error
|
|
if (op->writeTuple() == -1)
|
|
{
|
|
ndbout << "writeTuple call failed: ";
|
|
ndbout << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
for (int i = 0; i < tup.getNoOfAttributes(); i++)
|
|
{
|
|
const AttributeS * attr = tup[i];
|
|
int size = attr->Desc->size;
|
|
int arraySize = attr->Desc->arraySize;
|
|
const char * dataPtr = attr->Data.string_value;
|
|
|
|
const Uint32 length = (size * arraySize) / 8;
|
|
if (attr->Desc->m_column->getPrimaryKey())
|
|
op->equal(i, dataPtr, length);
|
|
}
|
|
|
|
for (int i = 0; i < tup.getNoOfAttributes(); i++)
|
|
{
|
|
const AttributeS * attr = tup[i];
|
|
int size = attr->Desc->size;
|
|
int arraySize = attr->Desc->arraySize;
|
|
const char * dataPtr = attr->Data.string_value;
|
|
|
|
const Uint32 length = (size * arraySize) / 8;
|
|
if (!attr->Desc->m_column->getPrimaryKey())
|
|
if (attr->Data.null)
|
|
op->setValue(i, NULL, 0);
|
|
else
|
|
op->setValue(i, dataPtr, length);
|
|
}
|
|
int ret = trans->execute(Commit);
|
|
if (ret != 0)
|
|
{
|
|
ndbout << "execute failed: ";
|
|
ndbout << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
}
|
|
m_ndb->closeTransaction(trans);
|
|
if (ret == 0)
|
|
break;
|
|
}
|
|
m_dataCount++;
|
|
}
|
|
#endif
|
|
|
|
void
|
|
BackupRestore::endOfTuples()
|
|
{
|
|
if (!m_restore)
|
|
return;
|
|
|
|
// Send all transactions to NDB
|
|
m_ndb->sendPreparedTransactions(0);
|
|
|
|
// Poll all transactions
|
|
m_ndb->pollNdb(3000, m_transactions);
|
|
|
|
// Close all transactions
|
|
// for (int i = 0; i < nPreparedTransactions; i++)
|
|
// m_ndb->closeTransaction(asynchTrans[i]);
|
|
}
|
|
|
|
void
|
|
BackupRestore::logEntry(const LogEntry & tup)
|
|
{
|
|
if (!m_restore)
|
|
return;
|
|
|
|
NdbTransaction * trans = m_ndb->startTransaction();
|
|
if (trans == NULL)
|
|
{
|
|
// Deep shit, TODO: handle the error
|
|
ndbout << "Cannot start transaction" << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
const TableS * table = tup.m_table;
|
|
NdbOperation * op = trans->getNdbOperation(table->getTableName());
|
|
if (op == NULL)
|
|
{
|
|
ndbout << "Cannot get operation: ";
|
|
ndbout << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
int check = 0;
|
|
switch(tup.m_type)
|
|
{
|
|
case LogEntry::LE_INSERT:
|
|
check = op->insertTuple();
|
|
break;
|
|
case LogEntry::LE_UPDATE:
|
|
check = op->updateTuple();
|
|
break;
|
|
case LogEntry::LE_DELETE:
|
|
check = op->deleteTuple();
|
|
break;
|
|
default:
|
|
ndbout << "Log entry has wrong operation type."
|
|
<< " Exiting...";
|
|
exit(-1);
|
|
}
|
|
|
|
for (int i = 0; i < tup.m_values.size(); i++)
|
|
{
|
|
const AttributeS * attr = tup.m_values[i];
|
|
int size = attr->Desc->size;
|
|
int arraySize = attr->Desc->arraySize;
|
|
const char * dataPtr = attr->Data.string_value;
|
|
|
|
const Uint32 length = (size / 8) * arraySize;
|
|
if (attr->Desc->m_column->getPrimaryKey())
|
|
op->equal(attr->Desc->attrId, dataPtr, length);
|
|
else
|
|
op->setValue(attr->Desc->attrId, dataPtr, length);
|
|
}
|
|
|
|
#if 1
|
|
trans->execute(Commit);
|
|
#else
|
|
const int ret = trans->execute(Commit);
|
|
// Both insert update and delete can fail during log running
|
|
// and it's ok
|
|
|
|
if (ret != 0)
|
|
{
|
|
ndbout << "execute failed: ";
|
|
ndbout << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
}
|
|
#endif
|
|
|
|
m_ndb->closeTransaction(trans);
|
|
m_logCount++;
|
|
}
|
|
|
|
void
|
|
BackupRestore::endOfLogEntrys()
|
|
{
|
|
if (m_restore)
|
|
{
|
|
ndbout << "Restored " << m_dataCount << " tuples and "
|
|
<< m_logCount << " log entries" << endl;
|
|
}
|
|
}
|
|
#if 0
|
|
/*****************************************
|
|
*
|
|
* Callback function for asynchronous transactions
|
|
*
|
|
* Idea for error handling: Transaction objects have to be stored globally when
|
|
* they are prepared.
|
|
* In the callback function if the transaction:
|
|
* succeeded: delete the object from global storage
|
|
* failed but can be retried: execute the object that is in global storage
|
|
* failed but fatal: delete the object from global storage
|
|
*
|
|
******************************************/
|
|
static void restoreCallback(int result, // Result for transaction
|
|
NdbTransaction *object, // Transaction object
|
|
void *anything) // Not used
|
|
{
|
|
static Uint32 counter = 0;
|
|
|
|
|
|
debug << "restoreCallback function called " << counter << " time(s)" << endl;
|
|
|
|
++counter;
|
|
|
|
if (result == -1)
|
|
{
|
|
ndbout << " restoreCallback (" << counter;
|
|
if ((counter % 10) == 1)
|
|
{
|
|
ndbout << "st";
|
|
} // if
|
|
else if ((counter % 10) == 2)
|
|
{
|
|
ndbout << "nd";
|
|
} // else if
|
|
else if ((counter % 10 ) ==3)
|
|
{
|
|
ndbout << "rd";
|
|
} // else if
|
|
else
|
|
{
|
|
ndbout << "th";
|
|
} // else
|
|
err << " time: error detected " << object->getNdbError() << endl;
|
|
} // if
|
|
|
|
} // restoreCallback
|
|
#endif
|
|
|
|
|
|
|
|
/*
|
|
* callback : This is called when the transaction is polled
|
|
*
|
|
* (This function must have three arguments:
|
|
* - The result of the transaction,
|
|
* - The NdbTransaction object, and
|
|
* - A pointer to an arbitrary object.)
|
|
*/
|
|
|
|
static void
|
|
callback(int result, NdbTransaction* trans, void* aObject)
|
|
{
|
|
restore_callback_t *cb = (restore_callback_t *)aObject;
|
|
(cb->restore)->cback(result, cb);
|
|
}
|
|
|
|
/**
|
|
* returns true if is recoverable,
|
|
* Error handling based on hugo
|
|
* false if it is an error that generates an abort.
|
|
*/
|
|
static
|
|
bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb)
|
|
{
|
|
NdbError error = trans->getNdbError();
|
|
ndb->closeTransaction(trans);
|
|
switch(error.status)
|
|
{
|
|
case NdbError::Success:
|
|
return false;
|
|
// ERROR!
|
|
break;
|
|
|
|
case NdbError::TemporaryError:
|
|
NdbSleep_MilliSleep(10);
|
|
return true;
|
|
// RETRY
|
|
break;
|
|
|
|
case NdbError::UnknownResult:
|
|
ndbout << error << endl;
|
|
return false;
|
|
// ERROR!
|
|
break;
|
|
|
|
default:
|
|
case NdbError::PermanentError:
|
|
switch (error.code)
|
|
{
|
|
case 499:
|
|
case 250:
|
|
NdbSleep_MilliSleep(10);
|
|
return true; //temp errors?
|
|
default:
|
|
break;
|
|
}
|
|
//ERROR
|
|
ndbout << error << endl;
|
|
return false;
|
|
break;
|
|
}
|
|
return false;
|
|
}
|