From d089a424bdc99678e0eebb15ffd2fe44ffb652aa Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 9 Jun 2004 19:24:36 +0000 Subject: [PATCH] re-enginered ndb restore to remove new/deletes and data copy BitKeeper/deleted/.del-Makefile_old~5e1138bd59f6b3aa: Delete: ndb/src/kernel/blocks/backup/restore/Makefile_old --- .../kernel/blocks/backup/restore/Makefile.am | 2 +- .../kernel/blocks/backup/restore/Makefile_old | 20 - .../kernel/blocks/backup/restore/Restore.cpp | 307 +++-- .../kernel/blocks/backup/restore/Restore.hpp | 52 +- .../kernel/blocks/backup/restore/consumer.cpp | 107 ++ .../kernel/blocks/backup/restore/consumer.hpp | 40 + .../backup/restore/consumer_printer.cpp | 96 ++ .../backup/restore/consumer_printer.hpp | 50 + .../backup/restore/consumer_restore.cpp | 508 ++++++++ .../backup/restore/consumer_restore.hpp | 79 ++ .../backup/restore/consumer_restorem.cpp | 652 ++++++++++ ndb/src/kernel/blocks/backup/restore/main.cpp | 1051 +---------------- 12 files changed, 1831 insertions(+), 1133 deletions(-) delete mode 100644 ndb/src/kernel/blocks/backup/restore/Makefile_old create mode 100644 ndb/src/kernel/blocks/backup/restore/consumer.cpp create mode 100644 ndb/src/kernel/blocks/backup/restore/consumer.hpp create mode 100644 ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp create mode 100644 ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp create mode 100644 ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp create mode 100644 ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp create mode 100644 ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp diff --git a/ndb/src/kernel/blocks/backup/restore/Makefile.am b/ndb/src/kernel/blocks/backup/restore/Makefile.am index 0f380e7f1c8..e0429c60723 100644 --- a/ndb/src/kernel/blocks/backup/restore/Makefile.am +++ b/ndb/src/kernel/blocks/backup/restore/Makefile.am @@ -1,7 +1,7 @@ ndbtools_PROGRAMS = ndb_restore -ndb_restore_SOURCES = main.cpp Restore.cpp +ndb_restore_SOURCES = main.cpp consumer.cpp consumer_restore.cpp consumer_printer.cpp Restore.cpp LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la diff --git a/ndb/src/kernel/blocks/backup/restore/Makefile_old b/ndb/src/kernel/blocks/backup/restore/Makefile_old deleted file mode 100644 index 4c884525d73..00000000000 --- a/ndb/src/kernel/blocks/backup/restore/Makefile_old +++ /dev/null @@ -1,20 +0,0 @@ -include .defs.mk - -TYPE := * - -BIN_TARGET := restore -BIN_TARGET_LIBS := -BIN_TARGET_ARCHIVES := NDB_API - -CCFLAGS_LOC = -I.. -I$(NDB_TOP)/src/ndbapi -I$(NDB_TOP)/include/ndbapi -I$(NDB_TOP)/include/util -I$(NDB_TOP)/include/portlib -I$(NDB_TOP)/include/kernel - -#ifneq ($(MYSQLCLUSTER_TOP),) -#CCFLAGS_LOC +=-I$(MYSQLCLUSTER_TOP)/include -D USE_MYSQL -#LDFLAGS_LOC += -L$(MYSQLCLUSTER_TOP)/libmysql_r/ -lmysqlclient_r -#endif - -SOURCES = main.cpp Restore.cpp - -include $(NDB_TOP)/Epilogue.mk - - diff --git a/ndb/src/kernel/blocks/backup/restore/Restore.cpp b/ndb/src/kernel/blocks/backup/restore/Restore.cpp index 1b4ea9cf467..3fb2236857b 100644 --- a/ndb/src/kernel/blocks/backup/restore/Restore.cpp +++ b/ndb/src/kernel/blocks/backup/restore/Restore.cpp @@ -33,32 +33,32 @@ Uint32 Twiddle32(Uint32 in); // Byte shift 32-bit data Uint64 Twiddle64(Uint64 in); // Byte shift 64-bit data bool -BackupFile::Twiddle(AttributeS* attr, Uint32 arraySize){ +BackupFile::Twiddle(const AttributeDesc* attr_desc, AttributeData* attr_data, Uint32 arraySize){ if(m_hostByteOrder) return true; if(arraySize == 0){ - arraySize = attr->Desc->arraySize; + arraySize = attr_desc->arraySize; } - switch(attr->Desc->size){ + switch(attr_desc->size){ case 8: return true; case 16: for(unsigned i = 0; iData.u_int16_value[i] = Twiddle16(attr->Data.u_int16_value[i]); + attr_data->u_int16_value[i] = Twiddle16(attr_data->u_int16_value[i]); } return true; case 32: for(unsigned i = 0; iData.u_int32_value[i] = Twiddle32(attr->Data.u_int32_value[i]); + attr_data->u_int32_value[i] = Twiddle32(attr_data->u_int32_value[i]); } return true; case 64: for(unsigned i = 0; iData.u_int64_value[i] = Twiddle64(attr->Data.u_int64_value[i]); + attr_data->u_int64_value[i] = Twiddle64(attr_data->u_int64_value[i]); } return true; default: @@ -208,7 +208,7 @@ TableS::TableS(NdbTableImpl* tableImpl) m_dictTable = tableImpl; m_noOfNullable = m_nullBitmaskSize = 0; - for (Uint32 i = 0; i < tableImpl->getNoOfColumns(); i++) + for (int i = 0; i < tableImpl->getNoOfColumns(); i++) createAttr(tableImpl->getColumn(i)); } @@ -246,56 +246,112 @@ RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) } // Constructor -RestoreDataIterator::RestoreDataIterator(const RestoreMetaData & md) - : m_metaData(md) +RestoreDataIterator::RestoreDataIterator(const RestoreMetaData & md, void (* _free_data_callback)()) + : m_metaData(md), free_data_callback(_free_data_callback) { debug << "RestoreDataIterator constructor" << endl; setDataFile(md, 0); + + m_buffer_sz = 64*1024; + m_buffer = malloc(m_buffer_sz); + m_buffer_ptr = m_buffer; + m_buffer_data_left = 0; } -RestoreDataIterator::~RestoreDataIterator(){ +RestoreDataIterator::~RestoreDataIterator() +{ + if (m_buffer) + free(m_buffer); } +TupleS & TupleS::operator=(const TupleS& tuple) +{ + prepareRecord(*tuple.m_currentTable); + + if (allAttrData) { + allAttrData= new AttributeData[getNoOfAttributes()]; + memcpy(allAttrData, tuple.allAttrData, getNoOfAttributes()*sizeof(AttributeData)); + } + + return *this; +}; +int TupleS::getNoOfAttributes() const { + if (m_currentTable == 0) + return 0; + return m_currentTable->getNoOfAttributes(); +}; + +const TableS * TupleS::getTable() const { + return m_currentTable; +}; + +const AttributeDesc * TupleS::getDesc(int i) const { + return m_currentTable->allAttributesDesc[i]; +} + +AttributeData * TupleS::getData(int i) const{ + return &(allAttrData[i]); +}; + bool TupleS::prepareRecord(const TableS & tab){ + if (allAttrData) { + delete [] allAttrData; + m_currentTable= 0; + } + + allAttrData = new AttributeData[tab.getNoOfAttributes()]; + + if (allAttrData == 0) + return false; + m_currentTable = &tab; - for(int i = 0; iDesc = tab[i]; - allAttributes.push_back(a); - } + return true; } -const TupleS * -RestoreDataIterator::getNextTuple(int & res) { - TupleS * tup = new TupleS(); - if(tup == NULL) { - ndbout_c("Restore: Failed to allocate memory"); - res = -1; - return NULL; - } - if(!tup->prepareRecord(* m_currentTable)) { - res =-1; - return NULL; - } - +Uint32 RestoreDataIterator::get_buffer_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb, FILE *stream) +{ + Uint32 sz = size*nmemb; + if (sz > m_buffer_data_left) { + if (free_data_callback) + (*free_data_callback)(); + + memcpy(m_buffer, m_buffer_ptr, m_buffer_data_left); + + size_t r = fread(((char *)m_buffer) + m_buffer_data_left, 1, m_buffer_sz - m_buffer_data_left, stream); + m_buffer_data_left += r; + m_buffer_ptr = m_buffer; + + if (sz > m_buffer_data_left) + sz = size * (m_buffer_data_left / size); + } + + *p_buf_ptr = m_buffer_ptr; + + m_buffer_ptr = ((char*)m_buffer_ptr)+sz; + m_buffer_data_left -= sz; + + return sz/size; +} + +Uint32 RestoreDataIterator::fread_buffer(void *ptr, Uint32 size, Uint32 nmemb, FILE *stream) +{ + void *buf_ptr; + Uint32 r = get_buffer_ptr(&buf_ptr, size, nmemb, stream); + memcpy(ptr, buf_ptr, r*size); + + return r; +} + +const TupleS * +RestoreDataIterator::getNextTuple(int & res) +{ Uint32 dataLength = 0; // Read record length - if (fread(&dataLength, sizeof(dataLength), 1, m_file) != 1){ + if (fread_buffer(&dataLength, sizeof(dataLength), 1, m_file) != 1){ err << "getNextTuple:Error reading length of data part" << endl; - delete tup; res = -1; return NULL; } // if @@ -309,34 +365,34 @@ RestoreDataIterator::getNextTuple(int & res) { // End of this data fragment debug << "End of fragment" << endl; res = 0; - delete tup; return NULL; } // if - - tup->createDataRecord(dataLenBytes); + // Read tuple data - if (fread(tup->getDataRecord(), 1, dataLenBytes, m_file) != dataLenBytes) { + void *_buf_ptr; + if (get_buffer_ptr(&_buf_ptr, 1, dataLenBytes, m_file) != dataLenBytes) { err << "getNextTuple:Read error: " << endl; - delete tup; res = -1; return NULL; } - Uint32 * ptr = tup->getDataRecord(); + Uint32 *buf_ptr = (Uint32*)_buf_ptr, *ptr = buf_ptr; ptr += m_currentTable->m_nullBitmaskSize; for(int i = 0; i < m_currentTable->m_fixedKeys.size(); i++){ - assert(ptr < tup->getDataRecord() + dataLength); - + assert(ptr < buf_ptr + dataLength); + const Uint32 attrId = m_currentTable->m_fixedKeys[i]->attrId; - AttributeS * attr = tup->allAttributes[attrId]; - const Uint32 sz = attr->Desc->getSizeInWords(); + AttributeData * attr_data = m_tuple.getData(attrId); + const AttributeDesc * attr_desc = m_tuple.getDesc(attrId); - attr->Data.null = false; - attr->Data.void_value = ptr; + const Uint32 sz = attr_desc->getSizeInWords(); - if(!Twiddle(attr)) + attr_data->null = false; + attr_data->void_value = ptr; + + if(!Twiddle(attr_desc, attr_data)) { res = -1; return NULL; @@ -345,17 +401,19 @@ RestoreDataIterator::getNextTuple(int & res) { } for(int i = 0; im_fixedAttribs.size(); i++){ - assert(ptr < tup->getDataRecord() + dataLength); + assert(ptr < buf_ptr + dataLength); const Uint32 attrId = m_currentTable->m_fixedAttribs[i]->attrId; - AttributeS * attr = tup->allAttributes[attrId]; - const Uint32 sz = attr->Desc->getSizeInWords(); + AttributeData * attr_data = m_tuple.getData(attrId); + const AttributeDesc * attr_desc = m_tuple.getDesc(attrId); - attr->Data.null = false; - attr->Data.void_value = ptr; + const Uint32 sz = attr_desc->getSizeInWords(); - if(!Twiddle(attr)) + attr_data->null = false; + attr_data->void_value = ptr; + + if(!Twiddle(attr_desc, attr_data)) { res = -1; return NULL; @@ -366,19 +424,21 @@ RestoreDataIterator::getNextTuple(int & res) { for(int i = 0; im_variableAttribs.size(); i++){ const Uint32 attrId = m_currentTable->m_variableAttribs[i]->attrId; - AttributeS * attr = tup->allAttributes[attrId]; + + AttributeData * attr_data = m_tuple.getData(attrId); + const AttributeDesc * attr_desc = m_tuple.getDesc(attrId); - if(attr->Desc->m_column->getNullable()){ - const Uint32 ind = attr->Desc->m_nullBitIndex; + if(attr_desc->m_column->getNullable()){ + const Uint32 ind = attr_desc->m_nullBitIndex; if(BitmaskImpl::get(m_currentTable->m_nullBitmaskSize, - tup->getDataRecord(),ind)){ - attr->Data.null = true; - attr->Data.void_value = NULL; + buf_ptr,ind)){ + attr_data->null = true; + attr_data->void_value = NULL; continue; } } - assert(ptr < tup->getDataRecord() + dataLength); + assert(ptr < buf_ptr + dataLength); typedef BackupFormat::DataFile::VariableData VarData; VarData * data = (VarData *)ptr; @@ -386,15 +446,15 @@ RestoreDataIterator::getNextTuple(int & res) { Uint32 id = ntohl(data->Id); assert(id == attrId); - attr->Data.null = false; - attr->Data.void_value = &data->Data[0]; + attr_data->null = false; + attr_data->void_value = &data->Data[0]; /** * Compute array size */ - const Uint32 arraySize = (4 * sz) / (attr->Desc->size / 8); - assert(arraySize >= attr->Desc->arraySize); - if(!Twiddle(attr, attr->Desc->arraySize)) + const Uint32 arraySize = (4 * sz) / (attr_desc->size / 8); + assert(arraySize >= attr_desc->arraySize); + if(!Twiddle(attr_desc, attr_data, attr_desc->arraySize)) { res = -1; return NULL; @@ -405,7 +465,7 @@ RestoreDataIterator::getNextTuple(int & res) { m_count ++; res = 0; - return tup; + return &m_tuple; } // RestoreDataIterator::getNextTuple BackupFile::BackupFile(){ @@ -558,7 +618,7 @@ RestoreDataIterator::readFragmentHeader(int & ret) debug << "RestoreDataIterator::getNextFragment" << endl; - if (fread(&Header, sizeof(Header), 1, m_file) != 1){ + if (fread_buffer(&Header, sizeof(Header), 1, m_file) != 1){ ret = 0; return false; } // if @@ -581,6 +641,12 @@ RestoreDataIterator::readFragmentHeader(int & ret) return false; } + if(!m_tuple.prepareRecord(*m_currentTable)) + { + ret =-1; + return false; + } + info << "_____________________________________________________" << endl << "Restoring data in table: " << m_currentTable->getTableName() << "(" << Header.TableId << ") fragment " @@ -588,6 +654,7 @@ RestoreDataIterator::readFragmentHeader(int & ret) m_count = 0; ret = 0; + return true; } // RestoreDataIterator::getNextFragment @@ -596,7 +663,7 @@ bool RestoreDataIterator::validateFragmentFooter() { BackupFormat::DataFile::FragmentFooter footer; - if (fread(&footer, sizeof(footer), 1, m_file) != 1){ + if (fread_buffer(&footer, sizeof(footer), 1, m_file) != 1){ err << "getFragmentFooter:Error reading fragment footer" << endl; return false; } @@ -787,14 +854,14 @@ RestoreLogIterator::getNextLogEntry(int & res) { const Uint32 sz = ah->getDataSize(); if(sz == 0){ - attr->Data.null = true; - attr->Data.void_value = NULL; + attr->Data->null = true; + attr->Data->void_value = NULL; } else { - attr->Data.null = false; - attr->Data.void_value = ah->getDataPtr(); + attr->Data->null = false; + attr->Data->void_value = ah->getDataPtr(); } - Twiddle(attr); + Twiddle(attr->Desc, attr->Data); m_logEntry.m_values.push_back(attr); ah = ah->getNext(); @@ -804,3 +871,85 @@ RestoreLogIterator::getNextLogEntry(int & res) { res = 0; return &m_logEntry; } + +NdbOut & +operator<<(NdbOut& ndbout, const AttributeS& attr){ + const AttributeData & data = *(attr.Data); + const AttributeDesc & desc = *(attr.Desc); + + if (data.null) + { + ndbout << ""; + return ndbout; + } + + NdbRecAttr tmprec; + tmprec.setup(desc.m_column, (char *)data.void_value); + ndbout << tmprec; + + return ndbout; +} + +// Print tuple data +NdbOut& +operator<<(NdbOut& ndbout, const TupleS& tuple) +{ + ndbout << tuple.getTable()->getTableName() << "; "; + for (int i = 0; i < tuple.getNoOfAttributes(); i++) + { + AttributeData * attr_data = tuple.getData(i); + const AttributeDesc * attr_desc = tuple.getDesc(i); + const AttributeS attr = {attr_desc, attr_data}; + debug << i << " " << attr_desc->m_column->getName(); + ndbout << attr; + + if (i != (tuple.getNoOfAttributes() - 1)) + ndbout << delimiter << " "; + } // for + return ndbout; +} + +// Print tuple data +NdbOut& +operator<<(NdbOut& ndbout, const LogEntry& logE) +{ + switch(logE.m_type) + { + case LogEntry::LE_INSERT: + ndbout << "INSERT " << logE.m_table->getTableName() << " "; + break; + case LogEntry::LE_DELETE: + ndbout << "DELETE " << logE.m_table->getTableName() << " "; + break; + case LogEntry::LE_UPDATE: + ndbout << "UPDATE " << logE.m_table->getTableName() << " "; + break; + default: + ndbout << "Unknown log entry type (not insert, delete or update)" ; + } + + for (int i = 0; i < logE.m_values.size();i++) + { + const AttributeS * attr = logE.m_values[i]; + ndbout << attr->Desc->m_column->getName() << "="; + ndbout << (* attr); + if (i < (logE.m_values.size() - 1)) + ndbout << ", "; + } + return ndbout; +} + + +NdbOut & +operator<<(NdbOut& ndbout, const TableS & table){ + ndbout << endl << "Table: " << table.getTableName() << endl; + for (int j = 0; j < table.getNoOfAttributes(); j++) + { + const AttributeDesc * desc = table[j]; + ndbout << desc->m_column->getName() << ": " << desc->m_column->getType(); + ndbout << " key: " << desc->m_column->getPrimaryKey(); + ndbout << " array: " << desc->arraySize; + ndbout << " size: " << desc->size << endl; + } // for + return ndbout; +} diff --git a/ndb/src/kernel/blocks/backup/restore/Restore.hpp b/ndb/src/kernel/blocks/backup/restore/Restore.hpp index 08cccee6bd4..227ec60644c 100644 --- a/ndb/src/kernel/blocks/backup/restore/Restore.hpp +++ b/ndb/src/kernel/blocks/backup/restore/Restore.hpp @@ -18,6 +18,7 @@ #define RESTORE_H #include +#include #include #include #include "myVector.hpp" @@ -25,6 +26,8 @@ #include #include +static const char * delimiter = ";"; // Delimiter in file dump + const int FileNameLenC = 256; const int TableNameLenC = 256; const int AttrNameLenC = 256; @@ -82,26 +85,30 @@ public: struct AttributeS { const AttributeDesc * Desc; - AttributeData Data; + AttributeData * Data; }; class TupleS { private: friend class RestoreDataIterator; - const TableS * m_currentTable; - myVector allAttributes; - Uint32 * dataRecord; + const TableS *m_currentTable; + AttributeData *allAttrData; bool prepareRecord(const TableS &); public: - TupleS() {dataRecord = NULL;}; - ~TupleS() {if(dataRecord != NULL) delete [] dataRecord;}; - int getNoOfAttributes() const { return allAttributes.size(); }; - const TableS * getTable() const { return m_currentTable;}; - const AttributeS * operator[](int i) const { return allAttributes[i];}; - Uint32 * getDataRecord() { return dataRecord;}; - void createDataRecord(Uint32 bytes) { dataRecord = new Uint32[bytes];}; + TupleS() {}; + ~TupleS() + { + if (allAttrData) + delete [] allAttrData; + }; + TupleS(const TupleS& tuple); // disable copy constructor + TupleS & operator=(const TupleS& tuple); + int getNoOfAttributes() const; + const TableS * getTable() const; + const AttributeDesc * getDesc(int i) const; + AttributeData * getData(int i) const; }; // class TupleS class TableS { @@ -206,7 +213,7 @@ public: const char * getFilename() const { return m_fileName;} Uint32 getNodeId() const { return m_nodeId;} const BackupFormat::FileHeader & getFileHeader() const { return m_fileHeader;} - bool Twiddle(AttributeS * attr, Uint32 arraySize = 0); + bool Twiddle(const AttributeDesc * attr_desc, AttributeData * attr_data, Uint32 arraySize = 0); }; class RestoreMetaData : public BackupFile { @@ -243,20 +250,28 @@ public: class RestoreDataIterator : public BackupFile { const RestoreMetaData & m_metaData; - Uint32 m_count; - TupleS m_tuple; const TableS* m_currentTable; + TupleS m_tuple; + + void * m_buffer; + void * m_buffer_ptr; + Uint32 m_buffer_sz; + Uint32 m_buffer_data_left; + void (* free_data_callback)(); public: // Constructor - RestoreDataIterator(const RestoreMetaData &); + RestoreDataIterator(const RestoreMetaData &, void (* free_data_callback)()); ~RestoreDataIterator(); // Read data file fragment header bool readFragmentHeader(int & res); bool validateFragmentFooter(); - + + Uint32 get_buffer_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb, FILE *stream); + Uint32 fread_buffer(void *ptr, Uint32 size, Uint32 nmemb, FILE *stream); + const TupleS *getNextTuple(int & res); }; @@ -286,6 +301,11 @@ public: const LogEntry * getNextLogEntry(int & res); }; +NdbOut& operator<<(NdbOut& ndbout, const TableS&); +NdbOut& operator<<(NdbOut& ndbout, const TupleS&); +NdbOut& operator<<(NdbOut& ndbout, const LogEntry&); +NdbOut& operator<<(NdbOut& ndbout, const RestoreMetaData&); + #endif diff --git a/ndb/src/kernel/blocks/backup/restore/consumer.cpp b/ndb/src/kernel/blocks/backup/restore/consumer.cpp new file mode 100644 index 00000000000..e94c31b2666 --- /dev/null +++ b/ndb/src/kernel/blocks/backup/restore/consumer.cpp @@ -0,0 +1,107 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "consumer.hpp" + +#ifdef USE_MYSQL +int +BackupConsumer::create_table_string(const TableS & table, + char * tableName, + char *buf){ + int pos = 0; + int pos2 = 0; + char buf2[2048]; + + pos += sprintf(buf+pos, "%s%s", "CREATE TABLE ", tableName); + pos += sprintf(buf+pos, "%s", "("); + pos2 += sprintf(buf2+pos2, "%s", " primary key("); + + for (int j = 0; j < table.getNoOfAttributes(); j++) + { + const AttributeDesc * desc = table[j]; + // ndbout << desc->name << ": "; + pos += sprintf(buf+pos, "%s%s", desc->m_column->getName()," "); + switch(desc->m_column->getType()){ + case NdbDictionary::Column::Int: + pos += sprintf(buf+pos, "%s", "int"); + break; + case NdbDictionary::Column::Unsigned: + pos += sprintf(buf+pos, "%s", "int unsigned"); + break; + case NdbDictionary::Column::Float: + pos += sprintf(buf+pos, "%s", "float"); + break; + case NdbDictionary::Column::Decimal: + pos += sprintf(buf+pos, "%s", "decimal"); + break; + case NdbDictionary::Column::Char: + pos += sprintf(buf+pos, "%s", "char"); + break; + case NdbDictionary::Column::Varchar: + pos += sprintf(buf+pos, "%s", "varchar"); + break; + case NdbDictionary::Column::Binary: + pos += sprintf(buf+pos, "%s", "binary"); + break; + case NdbDictionary::Column::Varbinary: + pos += sprintf(buf+pos, "%s", "varchar binary"); + break; + case NdbDictionary::Column::Bigint: + pos += sprintf(buf+pos, "%s", "bigint"); + break; + case NdbDictionary::Column::Bigunsigned: + pos += sprintf(buf+pos, "%s", "bigint unsigned"); + break; + case NdbDictionary::Column::Double: + pos += sprintf(buf+pos, "%s", "double"); + break; + case NdbDictionary::Column::Datetime: + pos += sprintf(buf+pos, "%s", "datetime"); + break; + case NdbDictionary::Column::Timespec: + pos += sprintf(buf+pos, "%s", "time"); + break; + case NdbDictionary::Column::Undefined: + // pos += sprintf(buf+pos, "%s", "varchar binary"); + return -1; + break; + default: + //pos += sprintf(buf+pos, "%s", "varchar binary"); + return -1; + } + if (desc->arraySize > 1) { + int attrSize = desc->arraySize; + pos += sprintf(buf+pos, "%s%u%s", + "(", + attrSize, + ")"); + } + if (desc->m_column->getPrimaryKey()) { + pos += sprintf(buf+pos, "%s", " not null"); + pos2 += sprintf(buf2+pos2, "%s%s", desc->m_column->getName(), ","); + } + pos += sprintf(buf+pos, "%s", ","); + } // for + pos2--; // remove trailing comma + pos2 += sprintf(buf2+pos2, "%s", ")"); + // pos--; // remove trailing comma + + pos += sprintf(buf+pos, "%s", buf2); + pos += sprintf(buf+pos, "%s", ") type=ndbcluster"); + return 0; +} + +#endif // USE_MYSQL diff --git a/ndb/src/kernel/blocks/backup/restore/consumer.hpp b/ndb/src/kernel/blocks/backup/restore/consumer.hpp new file mode 100644 index 00000000000..79edd788c57 --- /dev/null +++ b/ndb/src/kernel/blocks/backup/restore/consumer.hpp @@ -0,0 +1,40 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CONSUMER_HPP +#define CONSUMER_HPP + +#include "Restore.hpp" + +class BackupConsumer { +public: + virtual bool init() { return true;} + virtual bool table(const TableS &){return true;} +#ifdef USE_MYSQL + virtual bool table(const TableS &, MYSQL* mysqlp) {return true;}; +#endif + virtual void tuple(const TupleS &){} + virtual void tuple_free(){} + virtual void endOfTuples(){} + virtual void logEntry(const LogEntry &){} + virtual void endOfLogEntrys(){} +protected: +#ifdef USE_MYSQL + int create_table_string(const TableS & table, char * ,char *); +#endif +}; + +#endif diff --git a/ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp b/ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp new file mode 100644 index 00000000000..b0c9595c65f --- /dev/null +++ b/ndb/src/kernel/blocks/backup/restore/consumer_printer.cpp @@ -0,0 +1,96 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "consumer_printer.hpp" + +bool +BackupPrinter::table(const TableS & tab) +{ + if (m_print || m_print_meta) + { + m_ndbout << tab; + ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName()); + } + return true; +} + +#ifdef USE_MYSQL +bool +BackupPrinter::table(const TableS & tab, MYSQL * mysql) +{ + if (m_print || m_print_meta) + { + + char tmpTabName[MAX_TAB_NAME_SIZE*2]; + sprintf(tmpTabName, "%s", tab.getTableName()); + char * database = strtok(tmpTabName, "/"); + char * schema = strtok( NULL , "/"); + char * tableName = strtok( NULL , "/"); + + /** + * this means that the user did not specify schema + * and it is a v2x backup + */ + if(database == NULL) + return false; + if(schema == NULL) + return false; + if(tableName==NULL) + tableName = schema; + + char stmtCreateDB[255]; + + sprintf(stmtCreateDB,"CREATE DATABASE %s", database); + ndbout_c("%s", stmtCreateDB); + + + char buf [2048]; + create_table_string(tab, tableName, buf); + ndbout_c("%s", buf); + + ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName()); + } + return true; +} + +#endif + +void +BackupPrinter::tuple(const TupleS & tup) +{ + m_dataCount++; + if (m_print || m_print_data) + m_ndbout << tup << endl; +} + +void +BackupPrinter::logEntry(const LogEntry & logE) +{ + if (m_print || m_print_log) + m_ndbout << logE << endl; + m_logCount++; +} + +void +BackupPrinter::endOfLogEntrys() +{ + if (m_print || m_print_log) + { + ndbout << "Printed " << m_dataCount << " tuples and " + << m_logCount << " log entries" + << " to stdout." << endl; + } +} diff --git a/ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp b/ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp new file mode 100644 index 00000000000..7cbc924e364 --- /dev/null +++ b/ndb/src/kernel/blocks/backup/restore/consumer_printer.hpp @@ -0,0 +1,50 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CONSUMER_PRINTER_HPP +#define CONSUMER_PRINTER_HPP + +#include "consumer.hpp" + +class BackupPrinter : public BackupConsumer +{ + NdbOut & m_ndbout; +public: + BackupPrinter(NdbOut & out = ndbout) : m_ndbout(out) + { + m_print = false; + m_print_log = false; + m_print_data = false; + m_print_meta = false; + } + + virtual bool table(const TableS &); +#ifdef USE_MYSQL + virtual bool table(const TableS &, MYSQL* mysqlp); +#endif + virtual void tuple(const TupleS &); + virtual void logEntry(const LogEntry &); + virtual void endOfTuples() {}; + virtual void endOfLogEntrys(); + bool m_print; + bool m_print_log; + bool m_print_data; + bool m_print_meta; + Uint32 m_logCount; + Uint32 m_dataCount; +}; + +#endif diff --git a/ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp b/ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp new file mode 100644 index 00000000000..e9dab23622d --- /dev/null +++ b/ndb/src/kernel/blocks/backup/restore/consumer_restore.cpp @@ -0,0 +1,508 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "consumer_restore.hpp" +#include + +extern FilteredNdbOut err; +extern FilteredNdbOut info; +extern FilteredNdbOut debug; + +static void callback(int, NdbConnection*, void*); + +bool +BackupRestore::init() +{ + release(); + + if (!m_restore && !m_restore_meta) + return true; + + m_ndb = new Ndb(); + + if (m_ndb == NULL) + return false; + + // Turn off table name completion + m_ndb->useFullyQualifiedNames(false); + + m_ndb->init(1024); + if (m_ndb->waitUntilReady(30) != 0) + { + err << "Failed to connect to ndb!!" << endl; + return false; + } + info << "Connected to ndb!!" << endl; + + m_callback = new restore_callback_t[m_parallelism]; + + if (m_callback == 0) + { + err << "Failed to allocate callback structs" << endl; + return false; + } + + m_tuples = new TupleS[m_parallelism]; + + if (m_tuples == 0) + { + err << "Failed to allocate tuples" << endl; + return false; + } + + m_free_callback= m_callback; + for (Uint32 i= 0; i < m_parallelism; i++) { + m_callback[i].restore= this; + m_callback[i].connection= 0; + m_callback[i].tup= &m_tuples[i]; + if (i > 0) + m_callback[i-1].next= &(m_callback[i]); + } + m_callback[m_parallelism-1].next = 0; + + return true; +} + +void BackupRestore::release() +{ + if (m_ndb) + { + delete m_ndb; + m_ndb= 0; + } + + if (m_callback) + { + delete [] m_callback; + m_callback= 0; + } + + if (m_tuples) + { + delete [] m_tuples; + m_tuples= 0; + } +} + +BackupRestore::~BackupRestore() +{ + release(); +} + +bool +BackupRestore::table(const TableS & table){ + if (!m_restore_meta) + { + return true; + } + NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); + if (dict->createTable(*table.m_dictTable) == -1) + { + err << "Create table " << table.getTableName() << " failed: " + << dict->getNdbError() << endl; + return false; + } + info << "Successfully restored table " << table.getTableName()<< endl ; + return true; +} + +void BackupRestore::tuple(const TupleS & tup) +{ + if (!m_restore) + return; + + restore_callback_t * cb = m_free_callback; + + if (cb == 0) + assert(false); + + m_free_callback = cb->next; + cb->retries = 0; + *(cb->tup) = tup; // must do copy! + tuple_a(cb); + + if (m_free_callback == 0) + { + // send-poll all transactions + // close transaction is done in callback + m_ndb->sendPollNdb(3000, 1); + } +} + +void BackupRestore::tuple_a(restore_callback_t *cb) +{ + while (cb->retries < 10) + { + /** + * start transactions + */ + cb->connection = m_ndb->startTransaction(); + if (cb->connection == NULL) + { + /* + if (errorHandler(cb)) + { + continue; + } + */ + exitHandler(); + } // if + + const TupleS &tup = *(cb->tup); + const TableS * table = tup.getTable(); + NdbOperation * op = cb->connection->getNdbOperation(table->getTableName()); + + if (op == NULL) + { + if (errorHandler(cb)) + continue; + exitHandler(); + } // if + + if (op->writeTuple() == -1) + { + if (errorHandler(cb)) + continue; + exitHandler(); + } // if + + int ret = 0; + for (int j = 0; j < 2; j++) + { + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeDesc * attr_desc = tup.getDesc(i); + const AttributeData * attr_data = tup.getData(i); + int size = attr_desc->size; + int arraySize = attr_desc->arraySize; + char * dataPtr = attr_data->string_value; + Uint32 length = (size * arraySize) / 8; + if (attr_desc->m_column->getPrimaryKey()) + { + if (j == 1) continue; + ret = op->equal(i, dataPtr, length); + } + else + { + if (j == 0) continue; + if (attr_data->null) + ret = op->setValue(i, NULL, 0); + else + ret = op->setValue(i, dataPtr, length); + } + if (ret < 0) { + ndbout_c("Column: %d type %d",i, + attr_desc->m_column->getType()); + break; + } + } + if (ret < 0) + break; + } + if (ret < 0) + { + if (errorHandler(cb)) + continue; + exitHandler(); + } + + // Prepare transaction (the transaction is NOT yet sent to NDB) + cb->connection->executeAsynchPrepare(Commit, &callback, cb); + m_transactions++; + return; + } + err << "Unable to recover from errors. Exiting..." << endl; + exitHandler(); +} + +void BackupRestore::cback(int result, restore_callback_t *cb) +{ + m_transactions--; + + if (result < 0) + { + /** + * Error. temporary or permanent? + */ + if (errorHandler(cb)) + tuple_a(cb); // retry + else + { + err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl; + exitHandler(); + } + } + else + { + /** + * OK! close transaction + */ + m_ndb->closeTransaction(cb->connection); + cb->connection= 0; + cb->next= m_free_callback; + m_free_callback= cb; + m_dataCount++; + } +} + +/** + * returns true if is recoverable, + * Error handling based on hugo + * false if it is an error that generates an abort. + */ +bool BackupRestore::errorHandler(restore_callback_t *cb) +{ + NdbError error= cb->connection->getNdbError(); + m_ndb->closeTransaction(cb->connection); + cb->connection= 0; + cb->retries++; + switch(error.status) + { + case NdbError::Success: + return false; + // ERROR! + break; + + case NdbError::TemporaryError: + NdbSleep_MilliSleep(10); + return true; + // RETRY + break; + + case NdbError::UnknownResult: + err << error << endl; + return false; + // ERROR! + break; + + default: + case NdbError::PermanentError: + switch (error.code) + { + case 499: + case 250: + NdbSleep_MilliSleep(10); + return true; //temp errors? + default: + break; + } + //ERROR + err << error << endl; + return false; + break; + } + return false; +} + +void BackupRestore::exitHandler() +{ + release(); + exit(-1); +} + + +void +BackupRestore::tuple_free() +{ + if (!m_restore) + return; + + // Send all transactions to NDB + if (m_transactions > 0) + m_ndb->sendPreparedTransactions(0); + + // Poll all transactions + while (m_transactions > 0) + m_ndb->pollNdb(3000, m_transactions); +} + +void +BackupRestore::endOfTuples() +{ + tuple_free(); +} + +void +BackupRestore::logEntry(const LogEntry & tup) +{ + if (!m_restore) + return; + + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) + { + // Deep shit, TODO: handle the error + err << "Cannot start transaction" << endl; + exit(-1); + } // if + + const TableS * table = tup.m_table; + NdbOperation * op = trans->getNdbOperation(table->getTableName()); + if (op == NULL) + { + err << "Cannot get operation: " << trans->getNdbError() << endl; + exit(-1); + } // if + + int check = 0; + switch(tup.m_type) + { + case LogEntry::LE_INSERT: + check = op->insertTuple(); + break; + case LogEntry::LE_UPDATE: + check = op->updateTuple(); + break; + case LogEntry::LE_DELETE: + check = op->deleteTuple(); + break; + default: + err << "Log entry has wrong operation type." + << " Exiting..."; + exit(-1); + } + + for (int i = 0; i < tup.m_values.size(); i++) + { + const AttributeS * attr = tup.m_values[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data->string_value; + + const Uint32 length = (size / 8) * arraySize; + if (attr->Desc->m_column->getPrimaryKey()) + op->equal(attr->Desc->attrId, dataPtr, length); + else + op->setValue(attr->Desc->attrId, dataPtr, length); + } + +#if 1 + trans->execute(Commit); +#else + const int ret = trans->execute(Commit); + // Both insert update and delete can fail during log running + // and it's ok + + if (ret != 0) + { + err << "execute failed: " << trans->getNdbError() << endl; + exit(-1); + } +#endif + + m_ndb->closeTransaction(trans); + m_logCount++; +} + +void +BackupRestore::endOfLogEntrys() +{ + if (m_restore) + { + info << "Restored " << m_dataCount << " tuples and " + << m_logCount << " log entries" << endl; + } +} + +/* + * callback : This is called when the transaction is polled + * + * (This function must have three arguments: + * - The result of the transaction, + * - The NdbConnection object, and + * - A pointer to an arbitrary object.) + */ + +static void +callback(int result, NdbConnection* trans, void* aObject) +{ + restore_callback_t *cb = (restore_callback_t *)aObject; + (cb->restore)->cback(result, cb); +} + +#if 0 // old tuple impl +void +BackupRestore::tuple(const TupleS & tup) +{ + if (!m_restore) + return; + while (1) + { + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) + { + // Deep shit, TODO: handle the error + ndbout << "Cannot start transaction" << endl; + exit(-1); + } // if + + const TableS * table = tup.getTable(); + NdbOperation * op = trans->getNdbOperation(table->getTableName()); + if (op == NULL) + { + ndbout << "Cannot get operation: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + // TODO: check return value and handle error + if (op->writeTuple() == -1) + { + ndbout << "writeTuple call failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data->string_value; + + const Uint32 length = (size * arraySize) / 8; + if (attr->Desc->m_column->getPrimaryKey()) + op->equal(i, dataPtr, length); + } + + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data->string_value; + + const Uint32 length = (size * arraySize) / 8; + if (!attr->Desc->m_column->getPrimaryKey()) + if (attr->Data->null) + op->setValue(i, NULL, 0); + else + op->setValue(i, dataPtr, length); + } + int ret = trans->execute(Commit); + if (ret != 0) + { + ndbout << "execute failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } + m_ndb->closeTransaction(trans); + if (ret == 0) + break; + } + m_dataCount++; +} +#endif diff --git a/ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp b/ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp new file mode 100644 index 00000000000..e34d0060c58 --- /dev/null +++ b/ndb/src/kernel/blocks/backup/restore/consumer_restore.hpp @@ -0,0 +1,79 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CONSUMER_RESTORE_HPP +#define CONSUMER_RESTORE_HPP + +#include "consumer.hpp" + +struct restore_callback_t { + class BackupRestore *restore; + class TupleS *tup; + class NdbConnection *connection; + int retries; + restore_callback_t *next; +}; + + +class BackupRestore : public BackupConsumer +{ +public: + BackupRestore(Uint32 parallelism=1) + { + m_ndb = 0; + m_logCount = m_dataCount = 0; + m_restore = false; + m_restore_meta = false; + m_parallelism = parallelism; + m_callback = 0; + m_tuples = 0; + m_free_callback = 0; + m_transactions = 0; + } + + virtual ~BackupRestore(); + + virtual bool init(); + virtual void release(); + virtual bool table(const TableS &); +#ifdef USE_MYSQL + virtual bool table(const TableS &, MYSQL* mysqlp); +#endif + virtual void tuple(const TupleS &); + virtual void tuple_free(); + virtual void tuple_a(restore_callback_t *cb); + virtual void cback(int result, restore_callback_t *cb); + virtual bool errorHandler(restore_callback_t *cb); + virtual void exitHandler(); + virtual void endOfTuples(); + virtual void logEntry(const LogEntry &); + virtual void endOfLogEntrys(); + void connectToMysql(); + Ndb * m_ndb; + bool m_restore; + bool m_restore_meta; + Uint32 m_logCount; + Uint32 m_dataCount; + + Uint32 m_parallelism; + Uint32 m_transactions; + + TupleS *m_tuples; + restore_callback_t *m_callback; + restore_callback_t *m_free_callback; +}; + +#endif diff --git a/ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp b/ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp new file mode 100644 index 00000000000..6a9ec07148a --- /dev/null +++ b/ndb/src/kernel/blocks/backup/restore/consumer_restorem.cpp @@ -0,0 +1,652 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "consumer_restore.hpp" +#include + +extern FilteredNdbOut err; +extern FilteredNdbOut info; +extern FilteredNdbOut debug; + +static bool asynchErrorHandler(NdbConnection * trans, Ndb * ndb); +static void callback(int result, NdbConnection* trans, void* aObject); + +bool +BackupRestore::init() +{ + + if (!m_restore && !m_restore_meta) + return true; + + m_ndb = new Ndb(); + + if (m_ndb == NULL) + return false; + + // Turn off table name completion + m_ndb->useFullyQualifiedNames(false); + + m_ndb->init(1024); + if (m_ndb->waitUntilReady(30) != 0) + { + ndbout << "Failed to connect to ndb!!" << endl; + return false; + } + ndbout << "Connected to ndb!!" << endl; + +#if USE_MYSQL + if(use_mysql) + { + if ( mysql_thread_safe() == 0 ) + { + ndbout << "Not thread safe mysql library..." << endl; + exit(-1); + } + + ndbout << "Connecting to MySQL..." < 0) + m_callback[i-1].next = &(m_callback[i]); + } + m_callback[m_parallelism-1].next = 0; + + return true; + +} + +BackupRestore::~BackupRestore() +{ + if (m_ndb != 0) + delete m_ndb; + + if (m_callback) + delete [] m_callback; +} + +#ifdef USE_MYSQL +bool +BackupRestore::table(const TableS & table, MYSQL * mysqlp){ + if (!m_restore_meta) + { + return true; + } + + char tmpTabName[MAX_TAB_NAME_SIZE*2]; + sprintf(tmpTabName, "%s", table.getTableName()); + char * database = strtok(tmpTabName, "/"); + char * schema = strtok( NULL , "/"); + char * tableName = strtok( NULL , "/"); + + /** + * this means that the user did not specify schema + * and it is a v2x backup + */ + if(database == NULL) + return false; + if(schema == NULL) + return false; + if(tableName==NULL) + tableName = schema; + + char stmtCreateDB[255]; + sprintf(stmtCreateDB,"CREATE DATABASE %s", database); + + /*ignore return value. mysql_select_db will trap errors anyways*/ + if (mysql_query(mysqlp,stmtCreateDB) == 0) + { + //ndbout_c("%s", stmtCreateDB); + } + + if (mysql_select_db(&mysql, database) != 0) + { + ndbout_c("Error: %s", mysql_error(&mysql)); + return false; + } + + char buf [2048]; + /** + * create table ddl + */ + if (create_table_string(table, tableName, buf)) + { + ndbout_c("Unable to create a table definition since the " + "backup contains undefined types"); + return false; + } + + //ndbout_c("%s", buf); + + if (mysql_query(mysqlp,buf) != 0) + { + ndbout_c("Error: %s", mysql_error(&mysql)); + return false; + } else + { + ndbout_c("Successfully restored table %s into database %s", tableName, database); + } + + return true; +} +#endif + +bool +BackupRestore::table(const TableS & table){ + if (!m_restore_meta) + { + return true; + } + NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); + if (dict->createTable(*table.m_dictTable) == -1) + { + err << "Create table " << table.getTableName() << " failed: " + << dict->getNdbError() << endl; + return false; + } + info << "Successfully restored table " << table.getTableName()<< endl ; + return true; +} + +void BackupRestore::tuple(const TupleS & tup) +{ + if (!m_restore) + { + delete &tup; + return; + } + + restore_callback_t * cb = m_free_callback; + + if (cb) + { + m_free_callback = cb->next; + cb->retries = 0; + cb->tup = &tup; + tuple_a(cb); + } + + if (m_free_callback == 0) + { + // send-poll all transactions + // close transaction is done in callback + m_ndb->sendPollNdb(3000, 1); + } +} + +void BackupRestore::tuple_a(restore_callback_t *cb) +{ + while (cb->retries < 10) + { + /** + * start transactions + */ + cb->connection = m_ndb->startTransaction(); + if (cb->connection == NULL) + { + /* + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + continue; + } + */ + asynchExitHandler(); + } // if + + const TupleS &tup = *(cb->tup); + const TableS * table = tup.getTable(); + NdbOperation * op = cb->connection->getNdbOperation(table->getTableName()); + + if (op == NULL) + { + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + continue; + } + asynchExitHandler(); + } // if + + if (op->writeTuple() == -1) + { + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + continue; + } + asynchExitHandler(); + } // if + + Uint32 ret = 0; + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + char * dataPtr = attr->Data.string_value; + Uint32 length = (size * arraySize) / 8; + if (attr->Desc->m_column->getPrimaryKey()) + { + ret = op->equal(i, dataPtr, length); + } + else + { + if (attr->Data.null) + ret = op->setValue(i, NULL, 0); + else + ret = op->setValue(i, dataPtr, length); + } + + if (ret<0) + { + ndbout_c("Column: %d type %d",i, + tup.getTable()->m_dictTable->getColumn(i)->getType()); + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + break; + } + asynchExitHandler(); + } + } + if (ret < 0) + continue; + + // Prepare transaction (the transaction is NOT yet sent to NDB) + cb->connection->executeAsynchPrepare(Commit, &callback, cb); + m_transactions++; + } + ndbout_c("Unable to recover from errors. Exiting..."); + asynchExitHandler(); +} + +void BackupRestore::cback(int result, restore_callback_t *cb) +{ + if (result<0) + { + /** + * Error. temporary or permanent? + */ + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + tuple_a(cb); + } + else + { + ndbout_c("Restore: Failed to restore data " + "due to a unrecoverable error. Exiting..."); + delete m_ndb; + delete cb->tup; + exit(-1); + } + } + else + { + /** + * OK! close transaction + */ + m_ndb->closeTransaction(cb->connection); + delete cb->tup; + m_transactions--; + } +} + +void BackupRestore::asynchExitHandler() +{ + if (m_ndb != NULL) + delete m_ndb; + exit(-1); +} + +#if 0 // old tuple impl +void +BackupRestore::tuple(const TupleS & tup) +{ + if (!m_restore) + return; + while (1) + { + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) + { + // Deep shit, TODO: handle the error + ndbout << "Cannot start transaction" << endl; + exit(-1); + } // if + + const TableS * table = tup.getTable(); + NdbOperation * op = trans->getNdbOperation(table->getTableName()); + if (op == NULL) + { + ndbout << "Cannot get operation: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + // TODO: check return value and handle error + if (op->writeTuple() == -1) + { + ndbout << "writeTuple call failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + const Uint32 length = (size * arraySize) / 8; + if (attr->Desc->m_column->getPrimaryKey()) + op->equal(i, dataPtr, length); + } + + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + const Uint32 length = (size * arraySize) / 8; + if (!attr->Desc->m_column->getPrimaryKey()) + if (attr->Data.null) + op->setValue(i, NULL, 0); + else + op->setValue(i, dataPtr, length); + } + int ret = trans->execute(Commit); + if (ret != 0) + { + ndbout << "execute failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } + m_ndb->closeTransaction(trans); + if (ret == 0) + break; + } + m_dataCount++; +} +#endif + +void +BackupRestore::endOfTuples() +{ + if (!m_restore) + return; + + // Send all transactions to NDB + m_ndb->sendPreparedTransactions(0); + + // Poll all transactions + m_ndb->pollNdb(3000, m_transactions); + + // Close all transactions + // for (int i = 0; i < nPreparedTransactions; i++) + // m_ndb->closeTransaction(asynchTrans[i]); +} + +void +BackupRestore::logEntry(const LogEntry & tup) +{ + if (!m_restore) + return; + + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) + { + // Deep shit, TODO: handle the error + ndbout << "Cannot start transaction" << endl; + exit(-1); + } // if + + const TableS * table = tup.m_table; + NdbOperation * op = trans->getNdbOperation(table->getTableName()); + if (op == NULL) + { + ndbout << "Cannot get operation: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + int check = 0; + switch(tup.m_type) + { + case LogEntry::LE_INSERT: + check = op->insertTuple(); + break; + case LogEntry::LE_UPDATE: + check = op->updateTuple(); + break; + case LogEntry::LE_DELETE: + check = op->deleteTuple(); + break; + default: + ndbout << "Log entry has wrong operation type." + << " Exiting..."; + exit(-1); + } + + for (int i = 0; i < tup.m_values.size(); i++) + { + const AttributeS * attr = tup.m_values[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + const Uint32 length = (size / 8) * arraySize; + if (attr->Desc->m_column->getPrimaryKey()) + op->equal(attr->Desc->attrId, dataPtr, length); + else + op->setValue(attr->Desc->attrId, dataPtr, length); + } + +#if 1 + trans->execute(Commit); +#else + const int ret = trans->execute(Commit); + // Both insert update and delete can fail during log running + // and it's ok + + if (ret != 0) + { + ndbout << "execute failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } +#endif + + m_ndb->closeTransaction(trans); + m_logCount++; +} + +void +BackupRestore::endOfLogEntrys() +{ + if (m_restore) + { + ndbout << "Restored " << m_dataCount << " tuples and " + << m_logCount << " log entries" << endl; + } +} +#if 0 +/***************************************** + * + * Callback function for asynchronous transactions + * + * Idea for error handling: Transaction objects have to be stored globally when + * they are prepared. + * In the callback function if the transaction: + * succeeded: delete the object from global storage + * failed but can be retried: execute the object that is in global storage + * failed but fatal: delete the object from global storage + * + ******************************************/ +static void restoreCallback(int result, // Result for transaction + NdbConnection *object, // Transaction object + void *anything) // Not used +{ + static Uint32 counter = 0; + + + debug << "restoreCallback function called " << counter << " time(s)" << endl; + + ++counter; + + if (result == -1) + { + ndbout << " restoreCallback (" << counter; + if ((counter % 10) == 1) + { + ndbout << "st"; + } // if + else if ((counter % 10) == 2) + { + ndbout << "nd"; + } // else if + else if ((counter % 10 ) ==3) + { + ndbout << "rd"; + } // else if + else + { + ndbout << "th"; + } // else + err << " time: error detected " << object->getNdbError() << endl; + } // if + +} // restoreCallback +#endif + + + +/* + * callback : This is called when the transaction is polled + * + * (This function must have three arguments: + * - The result of the transaction, + * - The NdbConnection object, and + * - A pointer to an arbitrary object.) + */ + +static void +callback(int result, NdbConnection* trans, void* aObject) +{ + restore_callback_t *cb = (restore_callback_t *)aObject; + (cb->restore)->cback(result, cb); +} + +/** + * returns true if is recoverable, + * Error handling based on hugo + * false if it is an error that generates an abort. + */ +static +bool asynchErrorHandler(NdbConnection * trans, Ndb* ndb) +{ + NdbError error = trans->getNdbError(); + ndb->closeTransaction(trans); + switch(error.status) + { + case NdbError::Success: + return false; + // ERROR! + break; + + case NdbError::TemporaryError: + NdbSleep_MilliSleep(10); + return true; + // RETRY + break; + + case NdbError::UnknownResult: + ndbout << error << endl; + return false; + // ERROR! + break; + + default: + case NdbError::PermanentError: + switch (error.code) + { + case 499: + case 250: + NdbSleep_MilliSleep(10); + return true; //temp errors? + default: + break; + } + //ERROR + ndbout << error << endl; + return false; + break; + } + return false; +} diff --git a/ndb/src/kernel/blocks/backup/restore/main.cpp b/ndb/src/kernel/blocks/backup/restore/main.cpp index 1c39fb5b5e0..5c6d9c629dd 100644 --- a/ndb/src/kernel/blocks/backup/restore/main.cpp +++ b/ndb/src/kernel/blocks/backup/restore/main.cpp @@ -14,9 +14,7 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "Restore.hpp" #include -#include #include #include #include @@ -26,18 +24,15 @@ #include -NdbOut& operator<<(NdbOut& ndbout, const TupleS& tuple); -NdbOut& operator<<(NdbOut& ndbout, const LogEntry& logEntry); -NdbOut& operator<<(NdbOut& ndbout, const RestoreMetaData &); +#include "consumer_restore.hpp" +#include "consumer_printer.hpp" extern FilteredNdbOut err; extern FilteredNdbOut info; extern FilteredNdbOut debug; -static const char * delimiter = ";"; // Delimiter in file dump - static int ga_nodeId = 0; -static int ga_nParallelism = 1; +static int ga_nParallelism = 128; static int ga_backupId = 0; static bool ga_dont_ignore_systab_0 = false; static myVector g_consumers; @@ -58,19 +53,11 @@ static MYSQL mysql; #ifdef NDB_WIN32 -static const char* ga_backupPath = ".\\"; +static const char* ga_backupPath = "." DIR_SEPARATOR; #else -static const char* ga_backupPath = "./"; +static const char* ga_backupPath = "." DIR_SEPARATOR; #endif -typedef struct { - void * ndb; - void * restore; - TupleS * tup; - int transaction; - int retries; -} restore_callback_t; - static const char* ga_connect_NDB = NULL; /** @@ -78,102 +65,9 @@ static const char* ga_connect_NDB = NULL; */ static bool ga_restore = false; static bool ga_print = false; - - - -class BackupConsumer { -public: - virtual bool init() { return true;} - virtual bool table(const TableS &){return true;} -#ifdef USE_MYSQL - virtual bool table(const TableS &, MYSQL* mysqlp) {return true;}; -#endif - virtual void tuple(const TupleS &){} - virtual void tupleAsynch(const TupleS &, restore_callback_t * callback) {}; - // virtual bool asynchErrorHandler(NdbConnection * trans){return true;}; - virtual void asynchExitHandler(){}; - virtual void endOfTuples(){} - virtual void logEntry(const LogEntry &){} - virtual void endOfLogEntrys(){} -protected: -#ifdef USE_MYSQL - int create_table_string(const TableS & table, char * ,char *); -#endif -}; - -class BackupPrinter : public BackupConsumer -{ - NdbOut & m_ndbout; -public: - BackupPrinter(NdbOut & out = ndbout) : m_ndbout(out) - { - m_print = false; - m_print_log = false; - m_print_data = false; - m_print_meta = false; - } - - virtual bool table(const TableS &); -#ifdef USE_MYSQL - virtual bool table(const TableS &, MYSQL* mysqlp); -#endif - virtual void tuple(const TupleS &); - virtual void logEntry(const LogEntry &); - virtual void endOfTuples() {}; - virtual void endOfLogEntrys(); - virtual void tupleAsynch(const TupleS &, restore_callback_t * callback); - bool m_print; - bool m_print_log; - bool m_print_data; - bool m_print_meta; - Uint32 m_logCount; - Uint32 m_dataCount; - -}; - -class BackupRestore : public BackupConsumer -{ -public: - BackupRestore() - { - m_ndb = 0; - m_logCount = m_dataCount = 0; - m_restore = false; - m_restore_meta = false; - } - - virtual ~BackupRestore(); - - virtual bool init(); - virtual bool table(const TableS &); -#ifdef USE_MYSQL - virtual bool table(const TableS &, MYSQL* mysqlp); -#endif - virtual void tuple(const TupleS &); - virtual void tupleAsynch(const TupleS &, restore_callback_t * callback); - virtual void asynchExitHandler(); - virtual void endOfTuples(); - virtual void logEntry(const LogEntry &); - virtual void endOfLogEntrys(); - void connectToMysql(); - Ndb * m_ndb; - bool m_restore; - bool m_restore_meta; - Uint32 m_logCount; - Uint32 m_dataCount; -}; bool readArguments(const int argc, const char** argv) { - BackupPrinter* printer = new BackupPrinter(); - if (printer == NULL) - return false; - BackupRestore* restore = new BackupRestore(); - if (restore == NULL) - { - delete printer; - return false; - } int _print = 0; int _print_meta = 0; @@ -238,8 +132,17 @@ readArguments(const int argc, const char** argv) { arg_printusage(args, num_args, argv[0], "\n"); + return false; + } + + BackupPrinter* printer = new BackupPrinter(); + if (printer == NULL) + return false; + + BackupRestore* restore = new BackupRestore(ga_nParallelism); + if (restore == NULL) + { delete printer; - delete restore; return false; } @@ -321,10 +224,7 @@ clearConsumers() g_consumers.clear(); } -static bool asynchErrorHandler(NdbConnection * trans, Ndb * ndb); -static NdbConnection * asynchTrans[1024]; - -bool +static bool checkSysTable(const char *tableName) { return ga_dont_ignore_systab_0 || @@ -335,6 +235,13 @@ checkSysTable(const char *tableName) } +static void +free_data_callback() +{ + for(int i = 0; i < g_consumers.size(); i++) + g_consumers[i]->tuple_free(); +} + int main(int argc, const char** argv) { @@ -343,6 +250,12 @@ main(int argc, const char** argv) return -1; } + if (ga_connect_NDB != NULL) + { + // Use connection string + Ndb::setConnectString(ga_connect_NDB); + } + /** * we must always load meta data, even if we will only print it to stdout */ @@ -414,13 +327,11 @@ main(int argc, const char** argv) } } - - if (ga_restore || ga_print) { if (ga_restore) { - RestoreDataIterator dataIter(metaData); + RestoreDataIterator dataIter(metaData, &free_data_callback); // Read data file header if (!dataIter.readHeader()) @@ -432,17 +343,13 @@ main(int argc, const char** argv) while (dataIter.readFragmentHeader(res)) { - const TupleS* tuple = 0; + const TupleS* tuple; while ((tuple = dataIter.getNextTuple(res)) != NULL) { if (checkSysTable(tuple->getTable()->getTableName())) - { - for(int i = 0; itupleAsynch(* tuple, 0); - } - } - } while (tuple != NULL); + for(int i = 0; i < g_consumers.size(); i++) + g_consumers[i]->tuple(* tuple); + } // while (tuple != NULL); if (res < 0) { @@ -504,893 +411,3 @@ main(int argc, const char** argv) return 1; } // main -NdbOut & -operator<<(NdbOut& ndbout, const AttributeS& attr){ - const AttributeData & data = attr.Data; - const AttributeDesc & desc = *attr.Desc; - - if (data.null) - { - ndbout << ""; - return ndbout; - } - - NdbRecAttr tmprec; - tmprec.setup(desc.m_column, (char *)data.void_value); - ndbout << tmprec; - - return ndbout; -} - -// Print tuple data -NdbOut& -operator<<(NdbOut& ndbout, const TupleS& tuple) -{ - ndbout << tuple.getTable()->getTableName() << "; "; - for (int i = 0; i < tuple.getNoOfAttributes(); i++) - { - const AttributeS * attr = tuple[i]; - debug << i << " " << attr->Desc->m_column->getName(); - ndbout << (* attr); - - if (i != (tuple.getNoOfAttributes() - 1)) - ndbout << delimiter << " "; - } // for - return ndbout; -} - -// Print tuple data -NdbOut& -operator<<(NdbOut& ndbout, const LogEntry& logE) -{ - switch(logE.m_type) - { - case LogEntry::LE_INSERT: - ndbout << "INSERT " << logE.m_table->getTableName() << " "; - break; - case LogEntry::LE_DELETE: - ndbout << "DELETE " << logE.m_table->getTableName() << " "; - break; - case LogEntry::LE_UPDATE: - ndbout << "UPDATE " << logE.m_table->getTableName() << " "; - break; - default: - ndbout << "Unknown log entry type (not insert, delete or update)" ; - } - - for (int i = 0; i < logE.m_values.size();i++) - { - const AttributeS * attr = logE.m_values[i]; - ndbout << attr->Desc->m_column->getName() << "="; - ndbout << (* attr); - if (i < (logE.m_values.size() - 1)) - ndbout << ", "; - } - return ndbout; -} - - -NdbOut & -operator<<(NdbOut& ndbout, const TableS & table){ - ndbout << endl << "Table: " << table.getTableName() << endl; - for (int j = 0; j < table.getNoOfAttributes(); j++) - { - const AttributeDesc * desc = table[j]; - ndbout << desc->m_column->getName() << ": " << desc->m_column->getType(); - ndbout << " key: " << desc->m_column->getPrimaryKey(); - ndbout << " array: " << desc->arraySize; - ndbout << " size: " << desc->size << endl; - } // for - return ndbout; -} - - -#if 0 -/***************************************** - * - * Callback function for asynchronous transactions - * - * Idea for error handling: Transaction objects have to be stored globally when - * they are prepared. - * In the callback function if the transaction: - * succeeded: delete the object from global storage - * failed but can be retried: execute the object that is in global storage - * failed but fatal: delete the object from global storage - * - ******************************************/ -static void restoreCallback(int result, // Result for transaction - NdbConnection *object, // Transaction object - void *anything) // Not used -{ - static Uint32 counter = 0; - - - debug << "restoreCallback function called " << counter << " time(s)" << endl; - - ++counter; - - if (result == -1) - { - ndbout << " restoreCallback (" << counter; - if ((counter % 10) == 1) - { - ndbout << "st"; - } // if - else if ((counter % 10) == 2) - { - ndbout << "nd"; - } // else if - else if ((counter % 10 ) ==3) - { - ndbout << "rd"; - } // else if - else - { - ndbout << "th"; - } // else - err << " time: error detected " << object->getNdbError() << endl; - } // if - -} // restoreCallback -#endif - - - -bool -BackupPrinter::table(const TableS & tab) -{ - if (m_print || m_print_meta) - { - m_ndbout << tab; - ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName()); - } - return true; -} - -#ifdef USE_MYSQL -bool -BackupPrinter::table(const TableS & tab, MYSQL * mysql) -{ - if (m_print || m_print_meta) - { - - char tmpTabName[MAX_TAB_NAME_SIZE*2]; - sprintf(tmpTabName, "%s", tab.getTableName()); - char * database = strtok(tmpTabName, "/"); - char * schema = strtok( NULL , "/"); - char * tableName = strtok( NULL , "/"); - - /** - * this means that the user did not specify schema - * and it is a v2x backup - */ - if(database == NULL) - return false; - if(schema == NULL) - return false; - if(tableName==NULL) - tableName = schema; - - char stmtCreateDB[255]; - - sprintf(stmtCreateDB,"CREATE DATABASE %s", database); - ndbout_c("%s", stmtCreateDB); - - - char buf [2048]; - create_table_string(tab, tableName, buf); - ndbout_c("%s", buf); - - ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName()); - } - return true; -} - -#endif - -void -BackupPrinter::tuple(const TupleS & tup) -{ - if (m_print || m_print_data) - m_ndbout << tup << endl; -} - -void -BackupPrinter::logEntry(const LogEntry & logE) -{ - if (m_print || m_print_log) - m_ndbout << logE << endl; - m_logCount++; -} - -bool -BackupRestore::init() -{ - - if (!m_restore && !m_restore_meta) - return true; - - if (ga_connect_NDB != NULL) - { - // Use connection string - Ndb::setConnectString(ga_connect_NDB); - } - - m_ndb = new Ndb(); - - if (m_ndb == NULL) - return false; - - // Turn off table name completion - m_ndb->useFullyQualifiedNames(false); - - m_ndb->init(1024); - if (m_ndb->waitUntilReady(30) != 0) - { - ndbout << "Failed to connect to ndb!!" << endl; - delete m_ndb; - return false; - } - ndbout << "Connected to ndb!!" << endl; - -#if USE_MYSQL - if(use_mysql) - { - if ( mysql_thread_safe() == 0 ) - { - ndbout << "Not thread safe mysql library..." << endl; - exit(-1); - } - - ndbout << "Connecting to MySQL..." <name << ": "; - pos += sprintf(buf+pos, "%s%s", desc->m_column->getName()," "); - switch(desc->m_column->getType()){ - case NdbDictionary::Column::Int: - pos += sprintf(buf+pos, "%s", "int"); - break; - case NdbDictionary::Column::Unsigned: - pos += sprintf(buf+pos, "%s", "int unsigned"); - break; - case NdbDictionary::Column::Float: - pos += sprintf(buf+pos, "%s", "float"); - break; - case NdbDictionary::Column::Decimal: - pos += sprintf(buf+pos, "%s", "decimal"); - break; - case NdbDictionary::Column::Char: - pos += sprintf(buf+pos, "%s", "char"); - break; - case NdbDictionary::Column::Varchar: - pos += sprintf(buf+pos, "%s", "varchar"); - break; - case NdbDictionary::Column::Binary: - pos += sprintf(buf+pos, "%s", "binary"); - break; - case NdbDictionary::Column::Varbinary: - pos += sprintf(buf+pos, "%s", "varchar binary"); - break; - case NdbDictionary::Column::Bigint: - pos += sprintf(buf+pos, "%s", "bigint"); - break; - case NdbDictionary::Column::Bigunsigned: - pos += sprintf(buf+pos, "%s", "bigint unsigned"); - break; - case NdbDictionary::Column::Double: - pos += sprintf(buf+pos, "%s", "double"); - break; - case NdbDictionary::Column::Datetime: - pos += sprintf(buf+pos, "%s", "datetime"); - break; - case NdbDictionary::Column::Timespec: - pos += sprintf(buf+pos, "%s", "time"); - break; - case NdbDictionary::Column::Undefined: - // pos += sprintf(buf+pos, "%s", "varchar binary"); - return -1; - break; - default: - //pos += sprintf(buf+pos, "%s", "varchar binary"); - return -1; - } - if (desc->arraySize > 1) { - int attrSize = desc->arraySize; - pos += sprintf(buf+pos, "%s%u%s", - "(", - attrSize, - ")"); - } - if (desc->m_column->getPrimaryKey()) { - pos += sprintf(buf+pos, "%s", " not null"); - pos2 += sprintf(buf2+pos2, "%s%s", desc->m_column->getName(), ","); - } - pos += sprintf(buf+pos, "%s", ","); - } // for - pos2--; // remove trailing comma - pos2 += sprintf(buf2+pos2, "%s", ")"); - // pos--; // remove trailing comma - - pos += sprintf(buf+pos, "%s", buf2); - pos += sprintf(buf+pos, "%s", ") type=ndbcluster"); - return 0; -} - -#endif // USE_MYSQL - - -bool -BackupRestore::table(const TableS & table){ - if (!m_restore_meta) - { - return true; - } - NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); - if (dict->createTable(*table.m_dictTable) == -1) - { - err << "Create table " << table.getTableName() << " failed: " - << dict->getNdbError() << endl; - return false; - } - info << "Successfully restored table " << table.getTableName()<< endl ; - return true; -} - - - -/* - * callback : This is called when the transaction is polled - * - * (This function must have three arguments: - * - The result of the transaction, - * - The NdbConnection object, and - * - A pointer to an arbitrary object.) - */ - -static void -callback(int result, NdbConnection* trans, void* aObject) -{ - restore_callback_t * cbData = (restore_callback_t *)aObject; - if (result<0) - { - /** - * Error. temporary or permanent? - */ - if (asynchErrorHandler(trans, (Ndb*)cbData->ndb)) - { - ((Ndb*)cbData->ndb)->closeTransaction(asynchTrans[cbData->transaction]); - cbData->retries++; - ((BackupRestore*)cbData)->tupleAsynch( * (TupleS*)(cbData->tup), cbData); - } - else - { - ndbout_c("Restore: Failed to restore data " - "due to a unrecoverable error. Exiting..."); - delete (Ndb*)cbData->ndb; - delete cbData->tup; - delete cbData; - exit(-1); - } - } - else - { - /** - * OK! close transaction - */ - ((Ndb*)cbData->ndb)->closeTransaction(asynchTrans[cbData->transaction]); - delete cbData->tup; - delete cbData; - } -} - -static int nPreparedTransactions = 0; -void -BackupPrinter::tupleAsynch(const TupleS & tup, restore_callback_t * callback) -{ - m_dataCount++; - if (m_print || m_print_data) - m_ndbout << tup << endl; -} - -void BackupRestore::tupleAsynch(const TupleS & tup, restore_callback_t * cbData) -{ - - if (!m_restore) - { - delete &tup; - return; - } - Uint32 retries; - if (cbData!=0) - retries = cbData->retries; - else - retries = 0; - - while (retries < 10) - { - /** - * start transactions - */ - asynchTrans[nPreparedTransactions] = m_ndb->startTransaction(); - if (asynchTrans[nPreparedTransactions] == NULL) - { - if (asynchErrorHandler(asynchTrans[nPreparedTransactions], m_ndb)) - { - retries++; - continue; - } - asynchExitHandler(); - } // if - - const TableS * table = tup.getTable(); - NdbOperation * op = - asynchTrans[nPreparedTransactions]->getNdbOperation(table->getTableName()); - - if (op == NULL) - { - if (asynchErrorHandler(asynchTrans[nPreparedTransactions], m_ndb)) - { - retries++; - continue; - } - asynchExitHandler(); - } // if - - if (op->writeTuple() == -1) - { - if (asynchErrorHandler(asynchTrans[nPreparedTransactions], m_ndb)) - { - retries++; - continue; - } - asynchExitHandler(); - } // if - - Uint32 ret = 0; - for (int i = 0; i < tup.getNoOfAttributes(); i++) - { - const AttributeS * attr = tup[i]; - int size = attr->Desc->size; - int arraySize = attr->Desc->arraySize; - char * dataPtr = attr->Data.string_value; - Uint32 length = (size * arraySize) / 8; - if (attr->Desc->m_column->getPrimaryKey()) - { - ret = op->equal(i, dataPtr, length); - if (ret<0) - { - ndbout_c("Column: %d type %d",i, - tup.getTable()->m_dictTable->getColumn(i)->getType()); - - if (asynchErrorHandler(asynchTrans[nPreparedTransactions],m_ndb)) - { - retries++; - continue; - } - asynchExitHandler(); - } - } - } - - for (int i = 0; i < tup.getNoOfAttributes(); i++) - { - const AttributeS * attr = tup[i]; - int size = attr->Desc->size; - int arraySize = attr->Desc->arraySize; - char * dataPtr = attr->Data.string_value; - Uint32 length = (size * arraySize) / 8; - - if (!attr->Desc->m_column->getPrimaryKey()) - if (attr->Data.null) - ret = op->setValue(i, NULL, 0); - else - ret = op->setValue(i, dataPtr, length); - - if (ret<0) - { - ndbout_c("Column: %d type %d",i, - tup.getTable()->m_dictTable->getColumn(i)->getType()); - if (asynchErrorHandler(asynchTrans[nPreparedTransactions], m_ndb)) - { - retries++; - continue; - } - asynchExitHandler(); - } - } - restore_callback_t * cb; - if (cbData ==0) - { - cb = new restore_callback_t; - cb->retries = 0; - } - else - cb =cbData; - cb->ndb = m_ndb; - cb->restore = this; - cb->tup = (TupleS*)&tup; - cb->transaction = nPreparedTransactions; - - // Prepare transaction (the transaction is NOT yet sent to NDB) - asynchTrans[nPreparedTransactions]->executeAsynchPrepare(Commit, - &callback, - cb); - if (nPreparedTransactions == ga_nParallelism-1) - { - // send-poll all transactions - // close transaction is done in callback - m_ndb->sendPollNdb(3000, ga_nParallelism); - nPreparedTransactions=0; - } - else - nPreparedTransactions++; - m_dataCount++; - return; - } - ndbout_c("Unable to recover from errors. Exiting..."); - asynchExitHandler(); -} - -void BackupRestore::asynchExitHandler() -{ - if (m_ndb != NULL) - delete m_ndb; - exit(-1); -} -/** - * returns true if is recoverable, - * Error handling based on hugo - * false if it is an error that generates an abort. - */ -static -bool asynchErrorHandler(NdbConnection * trans, Ndb* ndb) -{ - - NdbError error = trans->getNdbError(); - ndb->closeTransaction(trans); - switch(error.status) - { - case NdbError::Success: - return false; - // ERROR! - break; - - case NdbError::TemporaryError: - NdbSleep_MilliSleep(10); - return true; - // RETRY - break; - - case NdbError::UnknownResult: - ndbout << error << endl; - return false; - // ERROR! - break; - - default: - case NdbError::PermanentError: - switch (error.code) - { - case 499: - case 250: - NdbSleep_MilliSleep(10); - return true; //temp errors? - default: - break; - } - //ERROR - ndbout << error << endl; - return false; - break; - } - return false; -} - - - -void -BackupRestore::tuple(const TupleS & tup) -{ - if (!m_restore) - return; - while (1) - { - NdbConnection * trans = m_ndb->startTransaction(); - if (trans == NULL) - { - // Deep shit, TODO: handle the error - ndbout << "Cannot start transaction" << endl; - exit(-1); - } // if - - const TableS * table = tup.getTable(); - NdbOperation * op = trans->getNdbOperation(table->getTableName()); - if (op == NULL) - { - ndbout << "Cannot get operation: "; - ndbout << trans->getNdbError() << endl; - exit(-1); - } // if - - // TODO: check return value and handle error - if (op->writeTuple() == -1) - { - ndbout << "writeTuple call failed: "; - ndbout << trans->getNdbError() << endl; - exit(-1); - } // if - - for (int i = 0; i < tup.getNoOfAttributes(); i++) - { - const AttributeS * attr = tup[i]; - int size = attr->Desc->size; - int arraySize = attr->Desc->arraySize; - const char * dataPtr = attr->Data.string_value; - - const Uint32 length = (size * arraySize) / 8; - if (attr->Desc->m_column->getPrimaryKey()) - op->equal(i, dataPtr, length); - } - - for (int i = 0; i < tup.getNoOfAttributes(); i++) - { - const AttributeS * attr = tup[i]; - int size = attr->Desc->size; - int arraySize = attr->Desc->arraySize; - const char * dataPtr = attr->Data.string_value; - - const Uint32 length = (size * arraySize) / 8; - if (!attr->Desc->m_column->getPrimaryKey()) - if (attr->Data.null) - op->setValue(i, NULL, 0); - else - op->setValue(i, dataPtr, length); - } - int ret = trans->execute(Commit); - if (ret != 0) - { - ndbout << "execute failed: "; - ndbout << trans->getNdbError() << endl; - exit(-1); - } - m_ndb->closeTransaction(trans); - if (ret == 0) - break; - } - m_dataCount++; -} - -void -BackupRestore::endOfTuples() -{ - if (!m_restore) - return; - // Send all transactions to NDB - m_ndb->sendPreparedTransactions(0); - // Poll all transactions - m_ndb->pollNdb(3000, nPreparedTransactions); - // Close all transactions - // for (int i = 0; i < nPreparedTransactions; i++) - // m_ndb->closeTransaction(asynchTrans[i]); - nPreparedTransactions=0; -} - -void -BackupRestore::logEntry(const LogEntry & tup) -{ - if (!m_restore) - return; - - NdbConnection * trans = m_ndb->startTransaction(); - if (trans == NULL) - { - // Deep shit, TODO: handle the error - ndbout << "Cannot start transaction" << endl; - exit(-1); - } // if - - const TableS * table = tup.m_table; - NdbOperation * op = trans->getNdbOperation(table->getTableName()); - if (op == NULL) - { - ndbout << "Cannot get operation: "; - ndbout << trans->getNdbError() << endl; - exit(-1); - } // if - - int check = 0; - switch(tup.m_type) - { - case LogEntry::LE_INSERT: - check = op->insertTuple(); - break; - case LogEntry::LE_UPDATE: - check = op->updateTuple(); - break; - case LogEntry::LE_DELETE: - check = op->deleteTuple(); - break; - default: - ndbout << "Log entry has wrong operation type." - << " Exiting..."; - exit(-1); - } - - for (int i = 0; i < tup.m_values.size(); i++) - { - const AttributeS * attr = tup.m_values[i]; - int size = attr->Desc->size; - int arraySize = attr->Desc->arraySize; - const char * dataPtr = attr->Data.string_value; - - const Uint32 length = (size / 8) * arraySize; - if (attr->Desc->m_column->getPrimaryKey()) - op->equal(attr->Desc->attrId, dataPtr, length); - else - op->setValue(attr->Desc->attrId, dataPtr, length); - } - -#if 1 - trans->execute(Commit); -#else - const int ret = trans->execute(Commit); - // Both insert update and delete can fail during log running - // and it's ok - - if (ret != 0) - { - ndbout << "execute failed: "; - ndbout << trans->getNdbError() << endl; - exit(-1); - } -#endif - - m_ndb->closeTransaction(trans); - m_logCount++; -} - -void -BackupRestore::endOfLogEntrys() -{ - if (ga_restore) - { - ndbout << "Restored " << m_dataCount << " tuples and " - << m_logCount << " log entries" << endl; - } -} - -void -BackupPrinter::endOfLogEntrys() -{ - if (m_print || m_print_log) - { - ndbout << "Printed " << m_dataCount << " tuples and " - << m_logCount << " log entries" - << " to stdout." << endl; - } -} - - - -