mirror of
https://github.com/MariaDB/server.git
synced 2025-01-25 00:04:33 +01:00
b181f610e1
ndb/tools/restore/consumer_restore.cpp: 1) Fix bug when blobs and multiple databases bug#7379 2) Fix bug #7346 ndb/tools/restore/consumer_restore.hpp: Moved tuple handle into callback object instead of having separate array Declared m_transactions volatile
660 lines
15 KiB
C++
660 lines
15 KiB
C++
/* Copyright (C) 2003 MySQL AB
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
|
|
|
#include "consumer_restore.hpp"
|
|
#include <NdbSleep.h>
|
|
|
|
extern FilteredNdbOut err;
|
|
extern FilteredNdbOut info;
|
|
extern FilteredNdbOut debug;
|
|
|
|
static void callback(int, NdbConnection*, void*);
|
|
|
|
bool
|
|
BackupRestore::init()
|
|
{
|
|
release();
|
|
|
|
if (!m_restore && !m_restore_meta)
|
|
return true;
|
|
|
|
m_ndb = new Ndb();
|
|
|
|
if (m_ndb == NULL)
|
|
return false;
|
|
|
|
m_ndb->init(1024);
|
|
if (m_ndb->waitUntilReady(30) != 0)
|
|
{
|
|
err << "Failed to connect to ndb!!" << endl;
|
|
return false;
|
|
}
|
|
info << "Connected to ndb!!" << endl;
|
|
|
|
m_callback = new restore_callback_t[m_parallelism];
|
|
|
|
if (m_callback == 0)
|
|
{
|
|
err << "Failed to allocate callback structs" << endl;
|
|
return false;
|
|
}
|
|
|
|
m_free_callback= m_callback;
|
|
for (Uint32 i= 0; i < m_parallelism; i++) {
|
|
m_callback[i].restore= this;
|
|
m_callback[i].connection= 0;
|
|
if (i > 0)
|
|
m_callback[i-1].next= &(m_callback[i]);
|
|
}
|
|
m_callback[m_parallelism-1].next = 0;
|
|
|
|
return true;
|
|
}
|
|
|
|
void BackupRestore::release()
|
|
{
|
|
if (m_ndb)
|
|
{
|
|
delete m_ndb;
|
|
m_ndb= 0;
|
|
}
|
|
|
|
if (m_callback)
|
|
{
|
|
delete [] m_callback;
|
|
m_callback= 0;
|
|
}
|
|
}
|
|
|
|
BackupRestore::~BackupRestore()
|
|
{
|
|
release();
|
|
}
|
|
|
|
static
|
|
int
|
|
match_blob(const char * name){
|
|
int cnt, id1, id2;
|
|
char buf[256];
|
|
if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
|
|
return id1;
|
|
}
|
|
|
|
return -1;
|
|
}
|
|
|
|
const NdbDictionary::Table*
|
|
BackupRestore::get_table(const NdbDictionary::Table* tab){
|
|
if(m_cache.m_old_table == tab)
|
|
return m_cache.m_new_table;
|
|
m_cache.m_old_table = tab;
|
|
|
|
int cnt, id1, id2;
|
|
char db[256], schema[256];
|
|
if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d",
|
|
db, schema, &id1, &id2)) == 4){
|
|
m_ndb->setDatabaseName(db);
|
|
m_ndb->setSchemaName(schema);
|
|
|
|
BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d",
|
|
m_new_tables[id1]->getTableId(), id2);
|
|
|
|
m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);
|
|
|
|
} else {
|
|
m_cache.m_new_table = m_new_tables[tab->getTableId()];
|
|
}
|
|
assert(m_cache.m_new_table);
|
|
return m_cache.m_new_table;
|
|
}
|
|
|
|
bool
|
|
BackupRestore::finalize_table(const TableS & table){
|
|
bool ret= true;
|
|
if (!m_restore && !m_restore_meta)
|
|
return ret;
|
|
if (table.have_auto_inc())
|
|
{
|
|
Uint64 max_val= table.get_max_auto_val();
|
|
Uint64 auto_val= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable));
|
|
if (max_val+1 > auto_val || auto_val == ~(Uint64)0)
|
|
ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
bool
|
|
BackupRestore::table(const TableS & table){
|
|
if (!m_restore && !m_restore_meta)
|
|
return true;
|
|
|
|
const char * name = table.getTableName();
|
|
|
|
/**
|
|
* Ignore blob tables
|
|
*/
|
|
if(match_blob(name) >= 0)
|
|
return true;
|
|
|
|
const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);
|
|
if(tmptab.m_indexType != NdbDictionary::Index::Undefined){
|
|
m_indexes.push_back(table.m_dictTable);
|
|
return true;
|
|
}
|
|
|
|
BaseString tmp(name);
|
|
Vector<BaseString> split;
|
|
if(tmp.split(split, "/") != 3){
|
|
err << "Invalid table name format " << name << endl;
|
|
return false;
|
|
}
|
|
|
|
m_ndb->setDatabaseName(split[0].c_str());
|
|
m_ndb->setSchemaName(split[1].c_str());
|
|
|
|
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
|
|
if(m_restore_meta){
|
|
NdbDictionary::Table copy(*table.m_dictTable);
|
|
|
|
copy.setName(split[2].c_str());
|
|
|
|
if (dict->createTable(copy) == -1)
|
|
{
|
|
err << "Create table " << table.getTableName() << " failed: "
|
|
<< dict->getNdbError() << endl;
|
|
return false;
|
|
}
|
|
info << "Successfully restored table " << table.getTableName()<< endl ;
|
|
}
|
|
|
|
const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());
|
|
if(tab == 0){
|
|
err << "Unable to find table: " << split[2].c_str() << endl;
|
|
return false;
|
|
}
|
|
if(m_restore_meta){
|
|
m_ndb->setAutoIncrementValue(tab, ~(Uint64)0, false);
|
|
}
|
|
const NdbDictionary::Table* null = 0;
|
|
m_new_tables.fill(table.m_dictTable->getTableId(), null);
|
|
m_new_tables[table.m_dictTable->getTableId()] = tab;
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
BackupRestore::endOfTables(){
|
|
if(!m_restore_meta)
|
|
return true;
|
|
|
|
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
|
|
for(size_t i = 0; i<m_indexes.size(); i++){
|
|
const NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);
|
|
|
|
BaseString tmp(indtab.m_primaryTable.c_str());
|
|
Vector<BaseString> split;
|
|
if(tmp.split(split, "/") != 3){
|
|
err << "Invalid table name format " << indtab.m_primaryTable.c_str()
|
|
<< endl;
|
|
return false;
|
|
}
|
|
|
|
m_ndb->setDatabaseName(split[0].c_str());
|
|
m_ndb->setSchemaName(split[1].c_str());
|
|
|
|
const NdbDictionary::Table * prim = dict->getTable(split[2].c_str());
|
|
if(prim == 0){
|
|
err << "Unable to find base table \"" << split[2].c_str()
|
|
<< "\" for index "
|
|
<< indtab.getName() << endl;
|
|
return false;
|
|
}
|
|
NdbTableImpl& base = NdbTableImpl::getImpl(*prim);
|
|
NdbIndexImpl* idx;
|
|
int id;
|
|
char idxName[255], buf[255];
|
|
if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s",
|
|
buf, buf, &id, idxName) != 4){
|
|
err << "Invalid index name format " << indtab.getName() << endl;
|
|
return false;
|
|
}
|
|
if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))
|
|
{
|
|
err << "Failed to create index " << idxName
|
|
<< " on " << split[2].c_str() << endl;
|
|
return false;
|
|
}
|
|
idx->setName(idxName);
|
|
if(dict->createIndex(* idx) != 0)
|
|
{
|
|
delete idx;
|
|
err << "Failed to create index " << idxName
|
|
<< " on " << split[2].c_str() << endl
|
|
<< dict->getNdbError() << endl;
|
|
|
|
return false;
|
|
}
|
|
delete idx;
|
|
info << "Successfully created index " << idxName
|
|
<< " on " << split[2].c_str() << endl;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void BackupRestore::tuple(const TupleS & tup)
|
|
{
|
|
if (!m_restore)
|
|
return;
|
|
|
|
while (m_free_callback == 0)
|
|
{
|
|
assert(m_transactions == m_parallelism);
|
|
// send-poll all transactions
|
|
// close transaction is done in callback
|
|
m_ndb->sendPollNdb(3000, 1);
|
|
}
|
|
|
|
restore_callback_t * cb = m_free_callback;
|
|
|
|
if (cb == 0)
|
|
assert(false);
|
|
|
|
m_free_callback = cb->next;
|
|
cb->retries = 0;
|
|
cb->tup = tup; // must do copy!
|
|
tuple_a(cb);
|
|
|
|
}
|
|
|
|
void BackupRestore::tuple_a(restore_callback_t *cb)
|
|
{
|
|
while (cb->retries < 10)
|
|
{
|
|
/**
|
|
* start transactions
|
|
*/
|
|
cb->connection = m_ndb->startTransaction();
|
|
if (cb->connection == NULL)
|
|
{
|
|
/*
|
|
if (errorHandler(cb))
|
|
{
|
|
continue;
|
|
}
|
|
*/
|
|
exitHandler();
|
|
} // if
|
|
|
|
const TupleS &tup = cb->tup;
|
|
const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);
|
|
|
|
NdbOperation * op = cb->connection->getNdbOperation(table);
|
|
|
|
if (op == NULL)
|
|
{
|
|
if (errorHandler(cb))
|
|
continue;
|
|
exitHandler();
|
|
} // if
|
|
|
|
if (op->writeTuple() == -1)
|
|
{
|
|
if (errorHandler(cb))
|
|
continue;
|
|
exitHandler();
|
|
} // if
|
|
|
|
int ret = 0;
|
|
for (int j = 0; j < 2; j++)
|
|
{
|
|
for (int i = 0; i < tup.getNoOfAttributes(); i++)
|
|
{
|
|
const AttributeDesc * attr_desc = tup.getDesc(i);
|
|
const AttributeData * attr_data = tup.getData(i);
|
|
int size = attr_desc->size;
|
|
int arraySize = attr_desc->arraySize;
|
|
char * dataPtr = attr_data->string_value;
|
|
Uint32 length = (size * arraySize) / 8;
|
|
|
|
if (j == 0 && tup.getTable()->have_auto_inc(i))
|
|
tup.getTable()->update_max_auto_val(dataPtr,size);
|
|
|
|
if (attr_desc->m_column->getPrimaryKey())
|
|
{
|
|
if (j == 1) continue;
|
|
ret = op->equal(i, dataPtr, length);
|
|
}
|
|
else
|
|
{
|
|
if (j == 0) continue;
|
|
if (attr_data->null)
|
|
ret = op->setValue(i, NULL, 0);
|
|
else
|
|
ret = op->setValue(i, dataPtr, length);
|
|
}
|
|
if (ret < 0) {
|
|
ndbout_c("Column: %d type %d %d %d %d",i,
|
|
attr_desc->m_column->getType(),
|
|
size, arraySize, attr_data->size);
|
|
break;
|
|
}
|
|
}
|
|
if (ret < 0)
|
|
break;
|
|
}
|
|
if (ret < 0)
|
|
{
|
|
if (errorHandler(cb))
|
|
continue;
|
|
exitHandler();
|
|
}
|
|
|
|
// Prepare transaction (the transaction is NOT yet sent to NDB)
|
|
cb->connection->executeAsynchPrepare(Commit, &callback, cb);
|
|
m_transactions++;
|
|
return;
|
|
}
|
|
err << "Retried transaction " << cb->retries << " times.\nLast error"
|
|
<< m_ndb->getNdbError(cb->error_code) << endl
|
|
<< "...Unable to recover from errors. Exiting..." << endl;
|
|
exitHandler();
|
|
}
|
|
|
|
void BackupRestore::cback(int result, restore_callback_t *cb)
|
|
{
|
|
m_transactions--;
|
|
|
|
if (result < 0)
|
|
{
|
|
/**
|
|
* Error. temporary or permanent?
|
|
*/
|
|
if (errorHandler(cb))
|
|
tuple_a(cb); // retry
|
|
else
|
|
{
|
|
err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl;
|
|
exitHandler();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
/**
|
|
* OK! close transaction
|
|
*/
|
|
m_ndb->closeTransaction(cb->connection);
|
|
cb->connection= 0;
|
|
cb->next= m_free_callback;
|
|
m_free_callback= cb;
|
|
m_dataCount++;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* returns true if is recoverable,
|
|
* Error handling based on hugo
|
|
* false if it is an error that generates an abort.
|
|
*/
|
|
bool BackupRestore::errorHandler(restore_callback_t *cb)
|
|
{
|
|
NdbError error= cb->connection->getNdbError();
|
|
m_ndb->closeTransaction(cb->connection);
|
|
cb->connection= 0;
|
|
|
|
Uint32 sleepTime = 100 + cb->retries * 300;
|
|
|
|
cb->retries++;
|
|
cb->error_code = error.code;
|
|
|
|
switch(error.status)
|
|
{
|
|
case NdbError::Success:
|
|
return false;
|
|
// ERROR!
|
|
break;
|
|
|
|
case NdbError::TemporaryError:
|
|
NdbSleep_MilliSleep(sleepTime);
|
|
return true;
|
|
// RETRY
|
|
break;
|
|
|
|
case NdbError::UnknownResult:
|
|
err << error << endl;
|
|
return false;
|
|
// ERROR!
|
|
break;
|
|
|
|
default:
|
|
case NdbError::PermanentError:
|
|
//ERROR
|
|
err << error << endl;
|
|
return false;
|
|
break;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void BackupRestore::exitHandler()
|
|
{
|
|
release();
|
|
exit(-1);
|
|
}
|
|
|
|
|
|
void
|
|
BackupRestore::tuple_free()
|
|
{
|
|
if (!m_restore)
|
|
return;
|
|
|
|
// Poll all transactions
|
|
while (m_transactions)
|
|
{
|
|
m_ndb->sendPollNdb(3000);
|
|
}
|
|
}
|
|
|
|
void
|
|
BackupRestore::endOfTuples()
|
|
{
|
|
tuple_free();
|
|
}
|
|
|
|
void
|
|
BackupRestore::logEntry(const LogEntry & tup)
|
|
{
|
|
if (!m_restore)
|
|
return;
|
|
|
|
NdbConnection * trans = m_ndb->startTransaction();
|
|
if (trans == NULL)
|
|
{
|
|
// Deep shit, TODO: handle the error
|
|
err << "Cannot start transaction" << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable);
|
|
NdbOperation * op = trans->getNdbOperation(table);
|
|
if (op == NULL)
|
|
{
|
|
err << "Cannot get operation: " << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
int check = 0;
|
|
switch(tup.m_type)
|
|
{
|
|
case LogEntry::LE_INSERT:
|
|
check = op->insertTuple();
|
|
break;
|
|
case LogEntry::LE_UPDATE:
|
|
check = op->updateTuple();
|
|
break;
|
|
case LogEntry::LE_DELETE:
|
|
check = op->deleteTuple();
|
|
break;
|
|
default:
|
|
err << "Log entry has wrong operation type."
|
|
<< " Exiting...";
|
|
exit(-1);
|
|
}
|
|
|
|
for (Uint32 i= 0; i < tup.size(); i++)
|
|
{
|
|
const AttributeS * attr = tup[i];
|
|
int size = attr->Desc->size;
|
|
int arraySize = attr->Desc->arraySize;
|
|
const char * dataPtr = attr->Data.string_value;
|
|
|
|
if (tup.m_table->have_auto_inc(attr->Desc->attrId))
|
|
tup.m_table->update_max_auto_val(dataPtr,size);
|
|
|
|
const Uint32 length = (size / 8) * arraySize;
|
|
if (attr->Desc->m_column->getPrimaryKey())
|
|
op->equal(attr->Desc->attrId, dataPtr, length);
|
|
else
|
|
op->setValue(attr->Desc->attrId, dataPtr, length);
|
|
}
|
|
|
|
const int ret = trans->execute(Commit);
|
|
if (ret != 0)
|
|
{
|
|
// Both insert update and delete can fail during log running
|
|
// and it's ok
|
|
// TODO: check that the error is either tuple exists or tuple does not exist?
|
|
switch(tup.m_type)
|
|
{
|
|
case LogEntry::LE_INSERT:
|
|
break;
|
|
case LogEntry::LE_UPDATE:
|
|
break;
|
|
case LogEntry::LE_DELETE:
|
|
break;
|
|
}
|
|
if (false)
|
|
{
|
|
err << "execute failed: " << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
}
|
|
}
|
|
|
|
m_ndb->closeTransaction(trans);
|
|
m_logCount++;
|
|
}
|
|
|
|
void
|
|
BackupRestore::endOfLogEntrys()
|
|
{
|
|
if (!m_restore)
|
|
return;
|
|
|
|
info << "Restored " << m_dataCount << " tuples and "
|
|
<< m_logCount << " log entries" << endl;
|
|
}
|
|
|
|
/*
|
|
* callback : This is called when the transaction is polled
|
|
*
|
|
* (This function must have three arguments:
|
|
* - The result of the transaction,
|
|
* - The NdbConnection object, and
|
|
* - A pointer to an arbitrary object.)
|
|
*/
|
|
|
|
static void
|
|
callback(int result, NdbConnection* trans, void* aObject)
|
|
{
|
|
restore_callback_t *cb = (restore_callback_t *)aObject;
|
|
(cb->restore)->cback(result, cb);
|
|
}
|
|
|
|
#if 0 // old tuple impl
|
|
void
|
|
BackupRestore::tuple(const TupleS & tup)
|
|
{
|
|
if (!m_restore)
|
|
return;
|
|
while (1)
|
|
{
|
|
NdbConnection * trans = m_ndb->startTransaction();
|
|
if (trans == NULL)
|
|
{
|
|
// Deep shit, TODO: handle the error
|
|
ndbout << "Cannot start transaction" << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
const TableS * table = tup.getTable();
|
|
NdbOperation * op = trans->getNdbOperation(table->getTableName());
|
|
if (op == NULL)
|
|
{
|
|
ndbout << "Cannot get operation: ";
|
|
ndbout << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
// TODO: check return value and handle error
|
|
if (op->writeTuple() == -1)
|
|
{
|
|
ndbout << "writeTuple call failed: ";
|
|
ndbout << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
} // if
|
|
|
|
for (int i = 0; i < tup.getNoOfAttributes(); i++)
|
|
{
|
|
const AttributeS * attr = tup[i];
|
|
int size = attr->Desc->size;
|
|
int arraySize = attr->Desc->arraySize;
|
|
const char * dataPtr = attr->Data.string_value;
|
|
|
|
const Uint32 length = (size * arraySize) / 8;
|
|
if (attr->Desc->m_column->getPrimaryKey())
|
|
op->equal(i, dataPtr, length);
|
|
}
|
|
|
|
for (int i = 0; i < tup.getNoOfAttributes(); i++)
|
|
{
|
|
const AttributeS * attr = tup[i];
|
|
int size = attr->Desc->size;
|
|
int arraySize = attr->Desc->arraySize;
|
|
const char * dataPtr = attr->Data.string_value;
|
|
|
|
const Uint32 length = (size * arraySize) / 8;
|
|
if (!attr->Desc->m_column->getPrimaryKey())
|
|
if (attr->Data.null)
|
|
op->setValue(i, NULL, 0);
|
|
else
|
|
op->setValue(i, dataPtr, length);
|
|
}
|
|
int ret = trans->execute(Commit);
|
|
if (ret != 0)
|
|
{
|
|
ndbout << "execute failed: ";
|
|
ndbout << trans->getNdbError() << endl;
|
|
exit(-1);
|
|
}
|
|
m_ndb->closeTransaction(trans);
|
|
if (ret == 0)
|
|
break;
|
|
}
|
|
m_dataCount++;
|
|
}
|
|
#endif
|
|
|
|
template class Vector<NdbDictionary::Table*>;
|
|
template class Vector<const NdbDictionary::Table*>;
|