mariadb/ndb/tools/restore/consumer_restore.cpp
tomas@poseidon.ndb.mysql.com 0d55af3f0e moved ndb_restore together with rest of the ndb tools
kernel includes needed by ndb_restore
    changed link order
    moved LocalConfig to mgmapi
    Moved NdbConfig to Portlib
    mgmapi to use LocalConfig for connectstring parsing instead of it's own
    enable usage of "NULL" connectstring for default
    new ndbmgmclient lib that can be used by e.g. mysqladmin later
    LocalConfig no longer needed here, now in mgmapi
    Send connectstring to mgmapi instead
    added valid connectstring to be with only host without port i.e.
    valid connectstring =host1,host2,host3
    default port will be added
2004-11-14 11:02:06 +00:00

671 lines
15 KiB
C++

/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_restore.hpp"
#include <NdbSleep.h>
extern FilteredNdbOut err;
extern FilteredNdbOut info;
extern FilteredNdbOut debug;
static void callback(int, NdbConnection*, void*);
bool
BackupRestore::init()
{
release();
if (!m_restore && !m_restore_meta)
return true;
m_ndb = new Ndb();
if (m_ndb == NULL)
return false;
m_ndb->init(1024);
if (m_ndb->waitUntilReady(30) != 0)
{
err << "Failed to connect to ndb!!" << endl;
return false;
}
info << "Connected to ndb!!" << endl;
m_callback = new restore_callback_t[m_parallelism];
if (m_callback == 0)
{
err << "Failed to allocate callback structs" << endl;
return false;
}
m_tuples = new TupleS[m_parallelism];
if (m_tuples == 0)
{
err << "Failed to allocate tuples" << endl;
return false;
}
m_free_callback= m_callback;
for (Uint32 i= 0; i < m_parallelism; i++) {
m_callback[i].restore= this;
m_callback[i].connection= 0;
m_callback[i].tup= &m_tuples[i];
if (i > 0)
m_callback[i-1].next= &(m_callback[i]);
}
m_callback[m_parallelism-1].next = 0;
return true;
}
void BackupRestore::release()
{
if (m_ndb)
{
delete m_ndb;
m_ndb= 0;
}
if (m_callback)
{
delete [] m_callback;
m_callback= 0;
}
if (m_tuples)
{
delete [] m_tuples;
m_tuples= 0;
}
}
BackupRestore::~BackupRestore()
{
release();
}
static
int
match_blob(const char * name){
int cnt, id1, id2;
char buf[256];
if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
return id1;
}
return -1;
}
const NdbDictionary::Table*
BackupRestore::get_table(const NdbDictionary::Table* tab){
if(m_cache.m_old_table == tab)
return m_cache.m_new_table;
m_cache.m_old_table = tab;
int cnt, id1, id2;
char buf[256];
if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
BaseString::snprintf(buf, sizeof(buf), "NDB$BLOB_%d_%d", m_new_tables[id1]->getTableId(), id2);
m_cache.m_new_table = m_ndb->getDictionary()->getTable(buf);
} else {
m_cache.m_new_table = m_new_tables[tab->getTableId()];
}
return m_cache.m_new_table;
}
bool
BackupRestore::finalize_table(const TableS & table){
bool ret= true;
if (!m_restore && !m_restore_meta)
return ret;
if (table.have_auto_inc())
{
Uint64 max_val= table.get_max_auto_val();
Uint64 auto_val= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable));
if (max_val+1 > auto_val || auto_val == ~(Uint64)0)
ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false);
}
return ret;
}
bool
BackupRestore::table(const TableS & table){
if (!m_restore && !m_restore_meta)
return true;
const char * name = table.getTableName();
/**
* Ignore blob tables
*/
if(match_blob(name) >= 0)
return true;
const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);
if(tmptab.m_indexType != NdbDictionary::Index::Undefined){
m_indexes.push_back(table.m_dictTable);
return true;
}
BaseString tmp(name);
Vector<BaseString> split;
if(tmp.split(split, "/") != 3){
err << "Invalid table name format " << name << endl;
return false;
}
m_ndb->setDatabaseName(split[0].c_str());
m_ndb->setSchemaName(split[1].c_str());
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
if(m_restore_meta){
NdbDictionary::Table copy(*table.m_dictTable);
copy.setName(split[2].c_str());
if (dict->createTable(copy) == -1)
{
err << "Create table " << table.getTableName() << " failed: "
<< dict->getNdbError() << endl;
return false;
}
info << "Successfully restored table " << table.getTableName()<< endl ;
}
const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());
if(tab == 0){
err << "Unable to find table: " << split[2].c_str() << endl;
return false;
}
if(m_restore_meta){
m_ndb->setAutoIncrementValue(tab, ~(Uint64)0, false);
}
const NdbDictionary::Table* null = 0;
m_new_tables.fill(table.m_dictTable->getTableId(), null);
m_new_tables[table.m_dictTable->getTableId()] = tab;
return true;
}
bool
BackupRestore::endOfTables(){
if(!m_restore_meta)
return true;
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
for(size_t i = 0; i<m_indexes.size(); i++){
const NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);
BaseString tmp(indtab.m_primaryTable.c_str());
Vector<BaseString> split;
if(tmp.split(split, "/") != 3){
err << "Invalid table name format " << indtab.m_primaryTable.c_str()
<< endl;
return false;
}
m_ndb->setDatabaseName(split[0].c_str());
m_ndb->setSchemaName(split[1].c_str());
const NdbDictionary::Table * prim = dict->getTable(split[2].c_str());
if(prim == 0){
err << "Unable to find base table \"" << split[2].c_str()
<< "\" for index "
<< indtab.getName() << endl;
return false;
}
NdbTableImpl& base = NdbTableImpl::getImpl(*prim);
NdbIndexImpl* idx;
int id;
char idxName[255], buf[255];
if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s",
buf, buf, &id, idxName) != 4){
err << "Invalid index name format " << indtab.getName() << endl;
return false;
}
if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))
{
err << "Failed to create index " << idxName
<< " on " << split[2].c_str() << endl;
return false;
}
idx->setName(idxName);
if(dict->createIndex(* idx) != 0)
{
delete idx;
err << "Failed to create index " << idxName
<< " on " << split[2].c_str() << endl
<< dict->getNdbError() << endl;
return false;
}
delete idx;
info << "Successfully created index " << idxName
<< " on " << split[2].c_str() << endl;
}
return true;
}
void BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
restore_callback_t * cb = m_free_callback;
if (cb == 0)
assert(false);
m_free_callback = cb->next;
cb->retries = 0;
*(cb->tup) = tup; // must do copy!
tuple_a(cb);
if (m_free_callback == 0)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb->sendPollNdb(3000, 1);
}
}
void BackupRestore::tuple_a(restore_callback_t *cb)
{
while (cb->retries < 10)
{
/**
* start transactions
*/
cb->connection = m_ndb->startTransaction();
if (cb->connection == NULL)
{
/*
if (errorHandler(cb))
{
continue;
}
*/
exitHandler();
} // if
const TupleS &tup = *(cb->tup);
const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);
NdbOperation * op = cb->connection->getNdbOperation(table);
if (op == NULL)
{
if (errorHandler(cb))
continue;
exitHandler();
} // if
if (op->writeTuple() == -1)
{
if (errorHandler(cb))
continue;
exitHandler();
} // if
int ret = 0;
for (int j = 0; j < 2; j++)
{
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeDesc * attr_desc = tup.getDesc(i);
const AttributeData * attr_data = tup.getData(i);
int size = attr_desc->size;
int arraySize = attr_desc->arraySize;
char * dataPtr = attr_data->string_value;
Uint32 length = (size * arraySize) / 8;
if (j == 0 && tup.getTable()->have_auto_inc(i))
tup.getTable()->update_max_auto_val(dataPtr,size);
if (attr_desc->m_column->getPrimaryKey())
{
if (j == 1) continue;
ret = op->equal(i, dataPtr, length);
}
else
{
if (j == 0) continue;
if (attr_data->null)
ret = op->setValue(i, NULL, 0);
else
ret = op->setValue(i, dataPtr, length);
}
if (ret < 0) {
ndbout_c("Column: %d type %d %d %d %d",i,
attr_desc->m_column->getType(),
size, arraySize, attr_data->size);
break;
}
}
if (ret < 0)
break;
}
if (ret < 0)
{
if (errorHandler(cb))
continue;
exitHandler();
}
// Prepare transaction (the transaction is NOT yet sent to NDB)
cb->connection->executeAsynchPrepare(Commit, &callback, cb);
m_transactions++;
return;
}
err << "Unable to recover from errors. Exiting..." << endl;
exitHandler();
}
void BackupRestore::cback(int result, restore_callback_t *cb)
{
m_transactions--;
if (result < 0)
{
/**
* Error. temporary or permanent?
*/
if (errorHandler(cb))
tuple_a(cb); // retry
else
{
err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl;
exitHandler();
}
}
else
{
/**
* OK! close transaction
*/
m_ndb->closeTransaction(cb->connection);
cb->connection= 0;
cb->next= m_free_callback;
m_free_callback= cb;
m_dataCount++;
}
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
bool BackupRestore::errorHandler(restore_callback_t *cb)
{
NdbError error= cb->connection->getNdbError();
m_ndb->closeTransaction(cb->connection);
cb->connection= 0;
cb->retries++;
switch(error.status)
{
case NdbError::Success:
return false;
// ERROR!
break;
case NdbError::TemporaryError:
NdbSleep_MilliSleep(10);
return true;
// RETRY
break;
case NdbError::UnknownResult:
err << error << endl;
return false;
// ERROR!
break;
default:
case NdbError::PermanentError:
switch (error.code)
{
case 499:
case 250:
NdbSleep_MilliSleep(10);
return true; //temp errors?
default:
break;
}
//ERROR
err << error << endl;
return false;
break;
}
return false;
}
void BackupRestore::exitHandler()
{
release();
exit(-1);
}
void
BackupRestore::tuple_free()
{
if (!m_restore)
return;
if (m_transactions > 0) {
// Send all transactions to NDB
m_ndb->sendPreparedTransactions(0);
// Poll all transactions
while (m_transactions > 0)
m_ndb->pollNdb(3000, m_transactions);
}
}
void
BackupRestore::endOfTuples()
{
tuple_free();
}
void
BackupRestore::logEntry(const LogEntry & tup)
{
if (!m_restore)
return;
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
err << "Cannot start transaction" << endl;
exit(-1);
} // if
const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable);
NdbOperation * op = trans->getNdbOperation(table);
if (op == NULL)
{
err << "Cannot get operation: " << trans->getNdbError() << endl;
exit(-1);
} // if
int check = 0;
switch(tup.m_type)
{
case LogEntry::LE_INSERT:
check = op->insertTuple();
break;
case LogEntry::LE_UPDATE:
check = op->updateTuple();
break;
case LogEntry::LE_DELETE:
check = op->deleteTuple();
break;
default:
err << "Log entry has wrong operation type."
<< " Exiting...";
exit(-1);
}
for (Uint32 i= 0; i < tup.size(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
if (tup.m_table->have_auto_inc(attr->Desc->attrId))
tup.m_table->update_max_auto_val(dataPtr,size);
const Uint32 length = (size / 8) * arraySize;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(attr->Desc->attrId, dataPtr, length);
else
op->setValue(attr->Desc->attrId, dataPtr, length);
}
const int ret = trans->execute(Commit);
if (ret != 0)
{
// Both insert update and delete can fail during log running
// and it's ok
// TODO: check that the error is either tuple exists or tuple does not exist?
switch(tup.m_type)
{
case LogEntry::LE_INSERT:
break;
case LogEntry::LE_UPDATE:
break;
case LogEntry::LE_DELETE:
break;
}
if (false)
{
err << "execute failed: " << trans->getNdbError() << endl;
exit(-1);
}
}
m_ndb->closeTransaction(trans);
m_logCount++;
}
void
BackupRestore::endOfLogEntrys()
{
if (!m_restore)
return;
info << "Restored " << m_dataCount << " tuples and "
<< m_logCount << " log entries" << endl;
}
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static void
callback(int result, NdbConnection* trans, void* aObject)
{
restore_callback_t *cb = (restore_callback_t *)aObject;
(cb->restore)->cback(result, cb);
}
#if 0 // old tuple impl
void
BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
while (1)
{
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.getTable();
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
// TODO: check return value and handle error
if (op->writeTuple() == -1)
{
ndbout << "writeTuple call failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(i, dataPtr, length);
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data.null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
}
int ret = trans->execute(Commit);
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
m_ndb->closeTransaction(trans);
if (ret == 0)
break;
}
m_dataCount++;
}
#endif
template class Vector<NdbDictionary::Table*>;
template class Vector<const NdbDictionary::Table*>;