From 0caba84b46fa28cddab1cfdc257f2600fbd3791a Mon Sep 17 00:00:00 2001 From: "pekka@mysql.com" <> Date: Wed, 15 Sep 2004 17:44:13 +0200 Subject: [PATCH] ndb charsets: DICT --- ndb/include/kernel/signaldata/DictTabInfo.hpp | 4 +- ndb/include/ndbapi/NdbDictionary.hpp | 10 ++ ndb/include/util/NdbSqlUtil.hpp | 7 ++ ndb/src/common/util/NdbSqlUtil.cpp | 77 ++++++++++++++ ndb/src/kernel/blocks/dbdict/Dbdict.cpp | 2 + ndb/src/ndbapi/NdbDictionary.cpp | 20 +++- ndb/src/ndbapi/NdbDictionaryImpl.cpp | 100 ++++++++++-------- ndb/src/ndbapi/NdbDictionaryImpl.hpp | 10 ++ ndb/src/ndbapi/ndberror.c | 3 + sql/ha_ndbcluster.cc | 26 +++-- 10 files changed, 202 insertions(+), 57 deletions(-) diff --git a/ndb/include/kernel/signaldata/DictTabInfo.hpp b/ndb/include/kernel/signaldata/DictTabInfo.hpp index dec7145c897..a9a50f19fbc 100644 --- a/ndb/include/kernel/signaldata/DictTabInfo.hpp +++ b/ndb/include/kernel/signaldata/DictTabInfo.hpp @@ -438,8 +438,8 @@ public: case DictTabInfo::ExtText: AttributeType = DictTabInfo::StringType; AttributeSize = DictTabInfo::an8Bit; - // head + inline part [ attr precision ] - AttributeArraySize = (NDB_BLOB_HEAD_SIZE << 2) + AttributeExtPrecision; + // head + inline part [ attr precision lower half ] + AttributeArraySize = (NDB_BLOB_HEAD_SIZE << 2) + (AttributeExtPrecision & 0xFFFF); return true; }; return false; diff --git a/ndb/include/ndbapi/NdbDictionary.hpp b/ndb/include/ndbapi/NdbDictionary.hpp index 3257133bd82..51a6895648f 100644 --- a/ndb/include/ndbapi/NdbDictionary.hpp +++ b/ndb/include/ndbapi/NdbDictionary.hpp @@ -32,6 +32,8 @@ #include class Ndb; +struct charset_info_st; +typedef struct charset_info_st CHARSET_INFO; /** * @class NdbDictionary @@ -305,6 +307,14 @@ public: */ int getLength() const; + /** + * For Char or Varchar or Text, set or get MySQL CHARSET_INFO. This + * specifies both character set and collation. See get_charset() + * etc in MySQL. (The cs is not "const" in MySQL). + */ + void setCharset(CHARSET_INFO* cs); + CHARSET_INFO* getCharset() const; + /** * For blob, set or get "inline size" i.e. number of initial bytes * to store in table's blob attribute. This part is normally in diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp index 1d3e96d5c7e..df1cb716f93 100644 --- a/ndb/include/util/NdbSqlUtil.hpp +++ b/ndb/include/util/NdbSqlUtil.hpp @@ -90,6 +90,13 @@ public: */ static const Type& getType(Uint32 typeId); + /** + * Check character set. + */ + static bool usable_in_pk(Uint32 typeId, const void* cs); + static bool usable_in_hash_index(Uint32 typeId, const void* cs); + static bool usable_in_ordered_index(Uint32 typeId, const void* cs); + private: /** * List of all types. Must match Type::Enum. diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp index 84a6f6e6c21..afb9bcfff62 100644 --- a/ndb/src/common/util/NdbSqlUtil.cpp +++ b/ndb/src/common/util/NdbSqlUtil.cpp @@ -529,6 +529,83 @@ NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size return CmpUnknown; } +// check charset + +bool +NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info) +{ + const Type& type = getType(typeId); + switch (type.m_typeId) { + case Type::Undefined: + break; + case Type::Char: + { + const CHARSET_INFO *cs = (const CHARSET_INFO*)info; + return + cs != 0 && + cs->cset != 0 && + cs->coll != 0 && + cs->coll->strnxfrm != 0 && + cs->strxfrm_multiply == 1; // current limitation + } + break; + case Type::Varchar: + return true; // Varchar not used via MySQL + case Type::Blob: + case Type::Text: + break; + default: + return true; + } + return false; +} + +bool +NdbSqlUtil::usable_in_hash_index(Uint32 typeId, const void* info) +{ + return usable_in_pk(typeId, info); +} + +bool +NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info) +{ + const Type& type = getType(typeId); + switch (type.m_typeId) { + case Type::Undefined: + break; + case Type::Char: + { + const CHARSET_INFO *cs = (const CHARSET_INFO*)info; + return + cs != 0 && + cs->cset != 0 && + cs->coll != 0 && + cs->coll->strnxfrm != 0 && + cs->coll->strnncollsp != 0 && + cs->strxfrm_multiply == 1; // current limitation + } + break; + case Type::Varchar: + return true; // Varchar not used via MySQL + case Type::Text: + { + const CHARSET_INFO *cs = (const CHARSET_INFO*)info; + return + cs != 0 && + cs->mbmaxlen == 1 && // extra limitation + cs->cset != 0 && + cs->coll != 0 && + cs->coll->strnxfrm != 0 && + cs->coll->strnncollsp != 0 && + cs->strxfrm_multiply == 1; // current limitation + } + break; + default: + return true; + } + return false; +} + #ifdef NDB_SQL_UTIL_TEST #include diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp index 7126842459e..d82083684b7 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp @@ -6317,6 +6317,8 @@ Dbdict::createIndex_toCreateTable(Signal* signal, OpCreateIndexPtr opPtr) w.add(DictTabInfo::AttributeStoredInd, (Uint32)DictTabInfo::Stored); // ext type overrides w.add(DictTabInfo::AttributeExtType, aRec->extType); + w.add(DictTabInfo::AttributeExtPrecision, aRec->extPrecision); + w.add(DictTabInfo::AttributeExtScale, aRec->extScale); w.add(DictTabInfo::AttributeExtLength, aRec->extLength); w.add(DictTabInfo::AttributeEnd, (Uint32)true); } diff --git a/ndb/src/ndbapi/NdbDictionary.cpp b/ndb/src/ndbapi/NdbDictionary.cpp index add1fa4cc91..6396cb6bb1d 100644 --- a/ndb/src/ndbapi/NdbDictionary.cpp +++ b/ndb/src/ndbapi/NdbDictionary.cpp @@ -109,6 +109,18 @@ NdbDictionary::Column::setInlineSize(int size) m_impl.m_precision = size; } +void +NdbDictionary::Column::setCharset(CHARSET_INFO* cs) +{ + m_impl.m_cs = cs; +} + +CHARSET_INFO* +NdbDictionary::Column::getCharset() const +{ + return m_impl.m_cs; +} + int NdbDictionary::Column::getInlineSize() const { @@ -856,6 +868,8 @@ NdbDictionary::Dictionary::getNdbError() const { NdbOut& operator<<(NdbOut& out, const NdbDictionary::Column& col) { + const CHARSET_INFO *cs = col.getCharset(); + const char *csname = cs ? cs->name : "?"; out << col.getName() << " "; switch (col.getType()) { case NdbDictionary::Column::Tinyint: @@ -898,10 +912,10 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col) out << "Decimal(" << col.getScale() << "," << col.getPrecision() << ")"; break; case NdbDictionary::Column::Char: - out << "Char(" << col.getLength() << ")"; + out << "Char(" << col.getLength() << ";" << csname << ")"; break; case NdbDictionary::Column::Varchar: - out << "Varchar(" << col.getLength() << ")"; + out << "Varchar(" << col.getLength() << ";" << csname << ")"; break; case NdbDictionary::Column::Binary: out << "Binary(" << col.getLength() << ")"; @@ -921,7 +935,7 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col) break; case NdbDictionary::Column::Text: out << "Text(" << col.getInlineSize() << "," << col.getPartSize() - << ";" << col.getStripeSize() << ")"; + << ";" << col.getStripeSize() << ";" << csname << ")"; break; case NdbDictionary::Column::Undefined: out << "Undefined"; diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 64f349be53e..c2c987f3bdb 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -36,6 +36,7 @@ #include "NdbEventOperationImpl.hpp" #include "NdbBlob.hpp" #include +#include #define DEBUG_PRINT 0 #define INCOMPATIBLE_VERSION -2 @@ -64,6 +65,7 @@ NdbColumnImpl::operator=(const NdbColumnImpl& col) m_name = col.m_name; m_type = col.m_type; m_precision = col.m_precision; + m_cs = col.m_cs; m_scale = col.m_scale; m_length = col.m_length; m_pk = col.m_pk; @@ -89,6 +91,9 @@ NdbColumnImpl::operator=(const NdbColumnImpl& col) void NdbColumnImpl::init(Type t) { + // do not use default_charset_info as it may not be initialized yet + // use binary collation until NDB tests can handle charsets + CHARSET_INFO* default_cs = &my_charset_latin1_bin; m_attrId = -1; m_type = t; switch (m_type) { @@ -107,17 +112,20 @@ NdbColumnImpl::init(Type t) m_precision = 0; m_scale = 0; m_length = 1; + m_cs = NULL; break; case Decimal: m_precision = 10; m_scale = 0; m_length = 1; + m_cs = NULL; break; case Char: case Varchar: m_precision = 0; m_scale = 0; m_length = 1; + m_cs = default_cs; break; case Binary: case Varbinary: @@ -126,16 +134,19 @@ NdbColumnImpl::init(Type t) m_precision = 0; m_scale = 0; m_length = 1; + m_cs = NULL; break; case Blob: m_precision = 256; m_scale = 8000; m_length = 4; + m_cs = NULL; break; case Text: m_precision = 256; m_scale = 8000; m_length = 4; + m_cs = default_cs; break; } m_pk = false; @@ -191,52 +202,12 @@ NdbColumnImpl::equal(const NdbColumnImpl& col) const return false; } } - if(m_length != col.m_length){ + if (m_precision != col.m_precision || + m_scale != col.m_scale || + m_length != col.m_length || + m_cs != col.m_cs) { return false; } - - switch(m_type){ - case NdbDictionary::Column::Undefined: - break; - case NdbDictionary::Column::Tinyint: - case NdbDictionary::Column::Tinyunsigned: - case NdbDictionary::Column::Smallint: - case NdbDictionary::Column::Smallunsigned: - case NdbDictionary::Column::Mediumint: - case NdbDictionary::Column::Mediumunsigned: - case NdbDictionary::Column::Int: - case NdbDictionary::Column::Unsigned: - case NdbDictionary::Column::Float: - break; - case NdbDictionary::Column::Decimal: - if(m_scale != col.m_scale || - m_precision != col.m_precision){ - return false; - } - break; - case NdbDictionary::Column::Char: - case NdbDictionary::Column::Varchar: - case NdbDictionary::Column::Binary: - case NdbDictionary::Column::Varbinary: - if(m_length != col.m_length){ - return false; - } - break; - case NdbDictionary::Column::Bigint: - case NdbDictionary::Column::Bigunsigned: - case NdbDictionary::Column::Double: - case NdbDictionary::Column::Datetime: - case NdbDictionary::Column::Timespec: - break; - case NdbDictionary::Column::Blob: - case NdbDictionary::Column::Text: - if (m_precision != col.m_precision || - m_scale != col.m_scale || - m_length != col.m_length) { - return false; - } - break; - } if (m_autoIncrement != col.m_autoIncrement){ return false; } @@ -1176,6 +1147,7 @@ indexTypeMapping[] = { { -1, -1 } }; +// TODO: remove, api-kernel type codes must match now static const ApiKernelMapping columnTypeMapping[] = { @@ -1282,9 +1254,23 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, return 703; } col->m_extType = attrDesc.AttributeExtType; - col->m_precision = attrDesc.AttributeExtPrecision; + col->m_precision = (attrDesc.AttributeExtPrecision & 0xFFFF); col->m_scale = attrDesc.AttributeExtScale; col->m_length = attrDesc.AttributeExtLength; + // charset in upper half of precision + unsigned cs_number = (attrDesc.AttributeExtPrecision >> 16); + // charset is defined exactly for char types + if (col->getCharType() != (cs_number != 0)) { + delete impl; + return 703; + } + if (col->getCharType()) { + col->m_cs = get_charset(cs_number, MYF(0)); + if (col->m_cs == NULL) { + delete impl; + return 743; + } + } // translate to old kernel types and sizes if (! attrDesc.translateExtType()) { @@ -1535,9 +1521,23 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, getKernelConstant(col->m_type, columnTypeMapping, DictTabInfo::ExtUndefined); - tmpAttr.AttributeExtPrecision = col->m_precision; + tmpAttr.AttributeExtPrecision = ((unsigned)col->m_precision & 0xFFFF); tmpAttr.AttributeExtScale = col->m_scale; tmpAttr.AttributeExtLength = col->m_length; + // charset is defined exactly for char types + if (col->getCharType() != (col->m_cs != NULL)) { + m_error.code = 703; + return -1; + } + // primary key type check + if (col->m_pk && ! NdbSqlUtil::usable_in_pk(col->m_type, col->m_cs)) { + m_error.code = 743; + return -1; + } + // charset in upper half of precision + if (col->getCharType()) { + tmpAttr.AttributeExtPrecision |= (col->m_cs->number << 16); + } // DICT will ignore and recompute this (void)tmpAttr.translateExtType(); @@ -1999,6 +1999,14 @@ NdbDictInterface::createIndex(Ndb & ndb, m_error.code = 4245; return -1; } + // index key type check + if (it == DictTabInfo::UniqueHashIndex && + ! NdbSqlUtil::usable_in_hash_index(col->m_type, col->m_cs) || + it == DictTabInfo::OrderedIndex && + ! NdbSqlUtil::usable_in_ordered_index(col->m_type, col->m_cs)) { + m_error.code = 743; + return -1; + } attributeList.id[i] = col->m_attrId; } if (it == DictTabInfo::UniqueHashIndex) { diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp index d77cc4d44bc..cf659c71397 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -60,6 +60,7 @@ public: int m_precision; int m_scale; int m_length; + CHARSET_INFO * m_cs; // not const in MySQL bool m_pk; bool m_tupleKey; @@ -82,6 +83,7 @@ public: Uint32 m_keyInfoPos; Uint32 m_extType; // used by restore (kernel type in versin v2x) bool getInterpretableType() const ; + bool getCharType() const; bool getBlobType() const; /** @@ -446,6 +448,14 @@ NdbColumnImpl::getInterpretableType() const { m_type == NdbDictionary::Column::Bigunsigned); } +inline +bool +NdbColumnImpl::getCharType() const { + return (m_type == NdbDictionary::Column::Char || + m_type == NdbDictionary::Column::Varchar || + m_type == NdbDictionary::Column::Text); +} + inline bool NdbColumnImpl::getBlobType() const { diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index 7991004e3d0..2ebcf4be444 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -280,6 +280,9 @@ ErrorBundle ErrorCodes[] = { { 739, SE, "Unsupported primary key length" }, { 740, SE, "Nullable primary key not supported" }, { 741, SE, "Unsupported alter table" }, + { 742, SE, "Unsupported attribute type in index" }, + { 743, SE, "Unsupported character set in table or index" }, + { 744, SE, "Character conversion error" }, { 241, SE, "Invalid schema object version" }, { 283, SE, "Table is being dropped" }, { 284, SE, "Table not defined in transaction coordinator" }, diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index bb3e54e74d5..95247063e31 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -2897,6 +2897,8 @@ static int create_ndb_column(NDBCOL &col, { // Set name col.setName(field->field_name); + // Get char set + CHARSET_INFO *cs= field->charset(); // Set type and sizes const enum enum_field_types mysql_type= field->real_type(); switch (mysql_type) { @@ -2968,15 +2970,19 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_STRING: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Binary); - else + else { col.setType(NDBCOL::Char); + col.setCharset(cs); + } col.setLength(field->pack_length()); break; case MYSQL_TYPE_VAR_STRING: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Varbinary); - else + else { col.setType(NDBCOL::Varchar); + col.setCharset(cs); + } col.setLength(field->pack_length()); break; // Blob types (all come in as MYSQL_TYPE_BLOB) @@ -2984,8 +2990,10 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_TINY_BLOB: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Blob); - else + else { col.setType(NDBCOL::Text); + col.setCharset(cs); + } col.setInlineSize(256); // No parts col.setPartSize(0); @@ -2995,8 +3003,10 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_BLOB: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Blob); - else + else { col.setType(NDBCOL::Text); + col.setCharset(cs); + } // Use "<=" even if "<" is the exact condition if (field->max_length() <= (1 << 8)) goto mysql_type_tiny_blob; @@ -3015,8 +3025,10 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_MEDIUM_BLOB: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Blob); - else + else { col.setType(NDBCOL::Text); + col.setCharset(cs); + } col.setInlineSize(256); col.setPartSize(4000); col.setStripeSize(8); @@ -3025,8 +3037,10 @@ static int create_ndb_column(NDBCOL &col, case MYSQL_TYPE_LONG_BLOB: if (field->flags & BINARY_FLAG) col.setType(NDBCOL::Blob); - else + else { col.setType(NDBCOL::Text); + col.setCharset(cs); + } col.setInlineSize(256); col.setPartSize(8000); col.setStripeSize(4);