ndb: wl-1732 support all charsets

This commit is contained in:
pekka@mysql.com 2004-12-12 18:37:36 +01:00
parent 40355074d0
commit 31d8a89407
31 changed files with 1642 additions and 1034 deletions

View file

@ -19,6 +19,8 @@
class AttributeDescriptor {
friend class Dbdict;
friend class Dbtc;
friend class Dbacc;
friend class Dbtup;
friend class Dbtux;
@ -36,6 +38,7 @@ private:
static Uint32 getType(const Uint32 &);
static Uint32 getSize(const Uint32 &);
static Uint32 getSizeInBytes(const Uint32 &);
static Uint32 getSizeInWords(const Uint32 &);
static Uint32 getArrayType(const Uint32 &);
static Uint32 getArraySize(const Uint32 &);
@ -79,6 +82,8 @@ private:
#define AD_SIZE_SHIFT (4)
#define AD_SIZE_MASK (7)
#define AD_SIZE_IN_BYTES_SHIFT (3)
#define AD_SIZE_IN_WORDS_OFFSET (31)
#define AD_SIZE_IN_WORDS_SHIFT (5)
@ -185,6 +190,13 @@ AttributeDescriptor::getSize(const Uint32 & desc){
return (desc >> AD_SIZE_SHIFT) & AD_SIZE_MASK;
}
inline
Uint32
AttributeDescriptor::getSizeInBytes(const Uint32 & desc){
return (getArraySize(desc) << getSize(desc))
>> AD_SIZE_IN_BYTES_SHIFT;
}
inline
Uint32
AttributeDescriptor::getSizeInWords(const Uint32 & desc){

View file

@ -118,6 +118,11 @@
*/
#define NDB_BLOB_HEAD_SIZE 2 /* sizeof(NdbBlob::Head) >> 2 */
/*
* Character sets.
*/
#define MAX_XFRM_MULTIPLY 8 /* max expansion when normalizing */
/*
* Long signals
*/

View file

@ -34,7 +34,9 @@ public:
enum ErrorCode {
InvalidAttrInfo = 4110,
InvalidBounds = 4259,
OutOfBuffers = 873
OutOfBuffers = 873,
InvalidCharFormat = 744,
TooMuchAttrInfo = 823
};
STATIC_CONST( SignalLength = 3 );
private:

View file

@ -37,17 +37,23 @@ public:
const char* s2, unsigned n2, bool padded);
/**
* Compare kernel attribute values. Returns -1, 0, +1 for less,
* equal, greater, respectively. Parameters are pointers to values,
* full attribute size in words, and size of available data in words.
* There is also pointer to type specific extra info. Char types
* receive CHARSET_INFO in it.
* Compare attribute values. Returns -1, 0, +1 for less, equal,
* greater, respectively. Parameters are pointers to values and their
* lengths in bytes. The lengths can differ.
*
* If available size is less than full size, CmpUnknown may be
* returned. If a value cannot be parsed, it compares like NULL i.e.
* less than any valid value.
* First value is a full value but second value can be partial. If
* the partial value is not enough to determine the result, CmpUnknown
* will be returned. A shorter second value is not necessarily
* partial. Partial values are allowed only for types where prefix
* comparison is possible (basically, binary types).
*
* First parameter is a pointer to type specific extra info. Char
* types receive CHARSET_INFO in it.
*
* If a value cannot be parsed, it compares like NULL i.e. less than
* any valid value.
*/
typedef int Cmp(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
typedef int Cmp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full);
enum CmpResult {
CmpLess = -1,

View file

@ -70,7 +70,7 @@ NdbSqlUtil::char_like(const char* s1, unsigned n1,
return i1 == n2 && i2 == n2;
}
/**
/*
* Data types.
*/
@ -138,7 +138,7 @@ NdbSqlUtil::m_typeList[] = {
},
{
Type::Varchar,
cmpVarchar
NULL // cmpVarchar
},
{
Type::Binary,
@ -146,23 +146,23 @@ NdbSqlUtil::m_typeList[] = {
},
{
Type::Varbinary,
cmpVarbinary
NULL // cmpVarbinary
},
{
Type::Datetime,
cmpDatetime
NULL // cmpDatetime
},
{
Type::Timespec,
cmpTimespec
NULL // cmpTimespec
},
{
Type::Blob,
cmpBlob
NULL // cmpBlob
},
{
Type::Text,
cmpText
NULL // cmpText
}
};
@ -195,374 +195,299 @@ NdbSqlUtil::getTypeBinary(Uint32 typeId)
return getType(typeId);
}
// compare
/*
* Comparison functions.
*/
int
NdbSqlUtil::cmpTinyint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Int8 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpTinyunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint8 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpSmallint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Int16 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpSmallunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint16 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpMediumint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
Int32 v1 = sint3korr(u1.v);
Int32 v2 = sint3korr(u2.v);
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
int
NdbSqlUtil::cmpMediumunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
Uint32 v1 = uint3korr(u1.v);
Uint32 v2 = uint3korr(u2.v);
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
int
NdbSqlUtil::cmpInt(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Int32 v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpUnsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint32 v; } u1, u2;
u1.v = p1[0];
u2.v = p2[0];
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpBigint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
if (size >= 2) {
union { Uint32 p[2]; Int64 v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
if (u1.v < u2.v)
if (n2 >= sizeof(Int8)) {
Int8 v1, v2;
memcpy(&v1, p1, sizeof(Int8));
memcpy(&v2, p2, sizeof(Int8));
if (v1 < v2)
return -1;
if (u1.v > u2.v)
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpBigunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(full >= size && size > 0);
if (size >= 2) {
union { Uint32 p[2]; Uint64 v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
if (u1.v < u2.v)
if (n2 >= sizeof(Uint8)) {
Uint8 v1, v2;
memcpy(&v1, p1, sizeof(Uint8));
memcpy(&v2, p2, sizeof(Uint8));
if (v1 < v2)
return -1;
if (u1.v > u2.v)
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpFloat(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; float v; } u1, u2;
u1.p[0] = p1[0];
u2.p[0] = p2[0];
// no format check
if (u1.v < u2.v)
return -1;
if (u1.v > u2.v)
return +1;
return 0;
}
int
NdbSqlUtil::cmpDouble(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
if (size >= 2) {
union { Uint32 p[2]; double v; } u1, u2;
u1.p[0] = p1[0];
u1.p[1] = p1[1];
u2.p[0] = p2[0];
u2.p[1] = p2[1];
// no format check
if (u1.v < u2.v)
if (n2 >= sizeof(Int16)) {
Int16 v1, v2;
memcpy(&v1, p1, sizeof(Int16));
memcpy(&v2, p2, sizeof(Int16));
if (v1 < v2)
return -1;
if (u1.v > u2.v)
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpDecimal(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= sizeof(Uint16)) {
Uint16 v1, v2;
memcpy(&v1, p1, sizeof(Uint16));
memcpy(&v2, p2, sizeof(Uint16));
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= 3) {
Int32 v1, v2;
v1 = sint3korr((const uchar*)p1);
v2 = sint3korr((const uchar*)p2);
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= 3) {
Uint32 v1, v2;
v1 = uint3korr((const uchar*)p1);
v2 = uint3korr((const uchar*)p2);
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= sizeof(Int32)) {
Int32 v1, v2;
memcpy(&v1, p1, sizeof(Int32));
memcpy(&v2, p2, sizeof(Int32));
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= sizeof(Uint32)) {
Uint32 v1, v2;
memcpy(&v1, p1, sizeof(Uint32));
memcpy(&v2, p2, sizeof(Uint32));
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= sizeof(Int64)) {
Int64 v1, v2;
memcpy(&v1, p1, sizeof(Int64));
memcpy(&v2, p2, sizeof(Int64));
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= sizeof(Uint64)) {
Uint64 v1, v2;
memcpy(&v1, p1, sizeof(Uint64));
memcpy(&v2, p2, sizeof(Uint64));
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= sizeof(float)) {
float v1, v2;
memcpy(&v1, p1, sizeof(float));
memcpy(&v2, p2, sizeof(float));
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
int
NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
if (n2 >= sizeof(double)) {
double v1, v2;
memcpy(&v1, p1, sizeof(double));
memcpy(&v2, p2, sizeof(double));
if (v1 < v2)
return -1;
if (v1 > v2)
return +1;
return 0;
}
assert(! full);
return CmpUnknown;
}
// not used by MySQL or NDB
int
NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(full >= size && size > 0);
// not used by MySQL or NDB
assert(false);
return 0;
}
int
NdbSqlUtil::cmpChar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
// collation does not work on prefix for some charsets
assert(full == size && size > 0);
/*
* Char is blank-padded to length and null-padded to word size.
*/
union { const Uint32* p; const uchar* v; } u1, u2;
u1.p = p1;
u2.p = p2;
assert(full);
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
// not const in MySQL
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
// length in bytes including null padding to Uint32
uint l1 = (full << 2);
int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1, 0);
// compare with space padding
int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
return k < 0 ? -1 : k > 0 ? +1 : 0;
}
// waiting for MySQL and new NDB implementation
int
NdbSqlUtil::cmpVarchar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(full >= size && size > 0);
/*
* Varchar is not allowed to contain a null byte and the value is
* null-padded. Therefore comparison does not need to use the length.
*
* Not used before MySQL 5.0. Format is likely to change. Handle
* only binary collation for now.
*/
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// skip length in first 2 bytes
int k = strncmp(u1.v + 2, u2.v + 2, (size << 2) - 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
assert(false);
return 0;
}
int
NdbSqlUtil::cmpBinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(full >= size && size > 0);
/*
* Binary data of full length. Compare bytewise.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
int k = memcmp(u1.v, u2.v, size << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
int
NdbSqlUtil::cmpVarbinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
/*
* Binary data of variable length padded with nulls. The comparison
* does not need to use the length.
*
* Not used before MySQL 5.0. Format is likely to change.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// skip length in first 2 bytes
int k = memcmp(u1.v + 2, u2.v + 2, (size << 2) - 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
int
NdbSqlUtil::cmpDatetime(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
/*
* Datetime is CC YY MM DD hh mm ss \0
*
* Not used via MySQL.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// no format check
int k = memcmp(u1.v, u2.v, 4);
if (k != 0)
return k < 0 ? -1 : +1;
if (size >= 2) {
k = memcmp(u1.v + 4, u2.v + 4, 4);
return k < 0 ? -1 : k > 0 ? +1 : 0;
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
// compare as binary strings
unsigned n = (n1 <= n2 ? n1 : n2);
int k = memcmp(v1, v2, n);
if (k == 0) {
if (full)
k = (int)n1 - (int)n2;
else
k = (int)n - (int)n2;
}
return CmpUnknown;
return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
}
// waiting for MySQL and new NDB implementation
int
NdbSqlUtil::cmpTimespec(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(full >= size && size > 0);
/*
* Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN
*
* Not used via MySQL.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
u2.p = p2;
// no format check
int k = memcmp(u1.v, u2.v, 4);
if (k != 0)
return k < 0 ? -1 : +1;
if (size >= 2) {
k = memcmp(u1.v + 4, u2.v + 4, 4);
if (k != 0)
return k < 0 ? -1 : +1;
if (size >= 3) {
Uint32 n1 = *(const Uint32*)(u1.v + 8);
Uint32 n2 = *(const Uint32*)(u2.v + 8);
if (n1 < n2)
return -1;
if (n2 > n1)
return +1;
return 0;
}
}
return CmpUnknown;
assert(false);
return 0;
}
// not used by MySQL or NDB
int
NdbSqlUtil::cmpBlob(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(full >= size && size > 0);
/*
* Blob comparison is on the inline bytes (null padded).
*/
const unsigned head = NDB_BLOB_HEAD_SIZE;
// skip blob head
if (size >= head + 1) {
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1 + head;
u2.p = p2 + head;
int k = memcmp(u1.v, u2.v, (size - head) << 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
}
return CmpUnknown;
assert(false);
return 0;
}
// not used by MySQL or NDB
int
NdbSqlUtil::cmpText(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpTimespec(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
// collation does not work on prefix for some charsets
assert(full == size && size > 0);
/*
* Text comparison is on the inline bytes (blank padded). Currently
* not supported for multi-byte charsets.
*/
const unsigned head = NDB_BLOB_HEAD_SIZE;
// skip blob head
if (size >= head + 1) {
union { const Uint32* p; const uchar* v; } u1, u2;
u1.p = p1 + head;
u2.p = p2 + head;
// not const in MySQL
CHARSET_INFO* cs = (CHARSET_INFO*)(info);
// length in bytes including null padding to Uint32
uint l1 = (full << 2);
int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1,0);
return k < 0 ? -1 : k > 0 ? +1 : 0;
}
return CmpUnknown;
assert(false);
return 0;
}
// not supported
int
NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(false);
return 0;
}
// not supported
int
NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(false);
return 0;
}
// check charset
@ -572,8 +497,6 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info)
{
const Type& type = getType(typeId);
switch (type.m_typeId) {
case Type::Undefined:
break;
case Type::Char:
{
const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
@ -582,11 +505,12 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info)
cs->cset != 0 &&
cs->coll != 0 &&
cs->coll->strnxfrm != 0 &&
cs->strxfrm_multiply <= 1; // current limitation
cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY;
}
break;
case Type::Undefined:
case Type::Varchar:
return true; // Varchar not used via MySQL
case Type::Varbinary:
case Type::Blob:
case Type::Text:
break;
@ -606,9 +530,9 @@ bool
NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info)
{
const Type& type = getType(typeId);
if (type.m_cmp == NULL)
return false;
switch (type.m_typeId) {
case Type::Undefined:
break;
case Type::Char:
{
const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
@ -618,92 +542,17 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info)
cs->coll != 0 &&
cs->coll->strnxfrm != 0 &&
cs->coll->strnncollsp != 0 &&
cs->strxfrm_multiply <= 1; // current limitation
cs->strxfrm_multiply <= MAX_XFRM_MULTIPLY;
}
break;
case Type::Undefined:
case Type::Varchar:
return true; // Varchar not used via MySQL
case Type::Varbinary:
case Type::Blob:
case Type::Text:
{
const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
return
cs != 0 &&
cs->mbmaxlen == 1 && // extra limitation
cs->cset != 0 &&
cs->coll != 0 &&
cs->coll->strnxfrm != 0 &&
cs->coll->strnncollsp != 0 &&
cs->strxfrm_multiply <= 1; // current limitation
}
break;
default:
return true;
}
return false;
}
#ifdef NDB_SQL_UTIL_TEST
#include <NdbTick.h>
#include <NdbOut.hpp>
struct Testcase {
int op; // 1=compare 2=like
int res;
const char* s1;
const char* s2;
int pad;
};
const Testcase testcase[] = {
{ 2, 1, "abc", "abc", 0 },
{ 2, 1, "abc", "abc%", 0 },
{ 2, 1, "abcdef", "abc%", 0 },
{ 2, 1, "abcdefabcdefabcdef", "abc%", 0 },
{ 2, 1, "abcdefabcdefabcdef", "abc%f", 0 },
{ 2, 0, "abcdefabcdefabcdef", "abc%z", 0 },
{ 2, 1, "abcdefabcdefabcdef", "%f", 0 },
{ 2, 1, "abcdef", "a%b%c%d%e%f", 0 },
{ 0, 0, 0, 0 }
};
int
main(int argc, char** argv)
{
ndb_init(); // for charsets
unsigned count = argc > 1 ? atoi(argv[1]) : 1000000;
ndbout_c("count = %u", count);
assert(count != 0);
for (const Testcase* t = testcase; t->s1 != 0; t++) {
ndbout_c("%d = '%s' %s '%s' pad=%d",
t->res, t->s1, t->op == 1 ? "comp" : "like", t->s2);
NDB_TICKS x1 = NdbTick_CurrentMillisecond();
unsigned n1 = strlen(t->s1);
unsigned n2 = strlen(t->s2);
for (unsigned i = 0; i < count; i++) {
if (t->op == 1) {
int res = NdbSqlUtil::char_compare(t->s1, n1, t->s2, n2, t->pad);
assert(res == t->res);
continue;
}
if (t->op == 2) {
int res = NdbSqlUtil::char_like(t->s1, n1, t->s2, n2, t->pad);
assert(res == t->res);
continue;
}
assert(false);
}
NDB_TICKS x2 = NdbTick_CurrentMillisecond();
if (x2 < x1)
x2 = x1;
double usec = 1000000.0 * double(x2 - x1) / double(count);
ndbout_c("time %.0f usec per call", usec);
}
// quick check
for (unsigned i = 0; i < sizeof(m_typeList) / sizeof(m_typeList[0]); i++) {
const NdbSqlUtil::Type& t = m_typeList[i];
assert(t.m_typeId == i);
}
return 0;
}
#endif

View file

@ -607,8 +607,7 @@ struct Fragmentrec {
//-----------------------------------------------------------------------------
// elementLength: Length of element in bucket and overflow pages
// keyLength: Length of key (== 0 if long key or variable key length)
// wl-2066 always Length of key
// keyLength: Length of key
//-----------------------------------------------------------------------------
Uint8 elementLength;
Uint16 keyLength;
@ -637,6 +636,11 @@ struct Fragmentrec {
//-----------------------------------------------------------------------------
Uint8 nodetype;
Uint8 stopQueOp;
//-----------------------------------------------------------------------------
// flag to avoid accessing table record if no char attributes
//-----------------------------------------------------------------------------
Uint8 hasCharAttr;
};
typedef Ptr<Fragmentrec> FragmentrecPtr;
@ -719,6 +723,7 @@ struct Operationrec {
State transactionstate;
Uint16 elementContainer;
Uint16 tupkeylen;
Uint32 xfrmtupkeylen;
Uint32 userblockref;
Uint32 scanBits;
Uint8 elementIsDisappeared;
@ -846,6 +851,13 @@ struct Tabrec {
Uint32 fragptrholder[MAX_FRAG_PER_NODE];
Uint32 tabUserPtr;
BlockReference tabUserRef;
Uint8 noOfKeyAttr;
Uint8 hasCharAttr;
struct KeyAttr {
Uint32 attributeDescriptor;
CHARSET_INFO* charsetInfo;
} keyAttr[MAX_ATTRIBUTES_IN_INDEX];
};
typedef Ptr<Tabrec> TabrecPtr;
@ -891,6 +903,7 @@ private:
void execACCKEYREQ(Signal* signal);
void execACCSEIZEREQ(Signal* signal);
void execACCFRAGREQ(Signal* signal);
void execTC_SCHVERREQ(Signal* signal);
void execACC_SRREQ(Signal* signal);
void execNEXT_SCANREQ(Signal* signal);
void execACC_ABORTREQ(Signal* signal);
@ -1016,7 +1029,7 @@ private:
void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal);
void readTablePk(Uint32 localkey1);
Uint32 readTablePk(Uint32 localkey1);
void getElement(Signal* signal);
void getdirindex(Signal* signal);
void commitdelete(Signal* signal, bool systemRestart);
@ -1123,6 +1136,8 @@ private:
void lcp_write_op_to_undolog(Signal* signal);
void reenable_expand_after_redo_log_exection_complete(Signal*);
// charsets
void xfrmKeyData(Signal* signal);
// Initialisation
void initData();

View file

@ -179,6 +179,7 @@ Dbacc::Dbacc(const class Configuration & conf):
addRecSignal(GSN_ACCKEYREQ, &Dbacc::execACCKEYREQ);
addRecSignal(GSN_ACCSEIZEREQ, &Dbacc::execACCSEIZEREQ);
addRecSignal(GSN_ACCFRAGREQ, &Dbacc::execACCFRAGREQ);
addRecSignal(GSN_TC_SCHVERREQ, &Dbacc::execTC_SCHVERREQ);
addRecSignal(GSN_ACC_SRREQ, &Dbacc::execACC_SRREQ);
addRecSignal(GSN_NEXT_SCANREQ, &Dbacc::execNEXT_SCANREQ);
addRecSignal(GSN_ACC_ABORTREQ, &Dbacc::execACC_ABORTREQ);

View file

@ -16,6 +16,7 @@
#define DBACC_C
#include "Dbacc.hpp"
#include <my_sys.h>
#include <AttributeHeader.hpp>
#include <signaldata/AccFrag.hpp>
@ -27,6 +28,7 @@
#include <signaldata/FsRemoveReq.hpp>
#include <signaldata/DropTab.hpp>
#include <signaldata/DumpStateOrd.hpp>
#include <SectionReader.hpp>
// TO_DO_RONM is a label for comments on what needs to be improved in future versions
// when more time is given.
@ -1033,6 +1035,12 @@ void Dbacc::initialiseTableRec(Signal* signal)
tabptr.p->fragholder[i] = RNIL;
tabptr.p->fragptrholder[i] = RNIL;
}//for
tabptr.p->noOfKeyAttr = 0;
tabptr.p->hasCharAttr = 0;
for (Uint32 k = 0; k < MAX_ATTRIBUTES_IN_INDEX; k++) {
tabptr.p->keyAttr[k].attributeDescriptor = 0;
tabptr.p->keyAttr[k].charsetInfo = 0;
}
}//for
}//Dbacc::initialiseTableRec()
@ -1187,6 +1195,66 @@ void Dbacc::addFragRefuse(Signal* signal, Uint32 errorCode)
return;
}//Dbacc::addFragRefuseEarly()
void
Dbacc::execTC_SCHVERREQ(Signal* signal)
{
jamEntry();
if (! assembleFragments(signal)) {
jam();
return;
}
tabptr.i = signal->theData[0];
ptrCheckGuard(tabptr, ctablesize, tabrec);
Uint32 noOfKeyAttr = signal->theData[6];
ndbrequire(noOfKeyAttr <= MAX_ATTRIBUTES_IN_INDEX);
Uint32 hasCharAttr = 0;
SegmentedSectionPtr s0Ptr;
signal->getSection(s0Ptr, 0);
SectionReader r0(s0Ptr, getSectionSegmentPool());
Uint32 i = 0;
while (i < noOfKeyAttr) {
jam();
Uint32 attributeDescriptor = ~0;
Uint32 csNumber = ~0;
if (! r0.getWord(&attributeDescriptor) ||
! r0.getWord(&csNumber)) {
jam();
break;
}
CHARSET_INFO* cs = 0;
if (csNumber != 0) {
cs = all_charsets[csNumber];
ndbrequire(cs != 0);
hasCharAttr = 1;
}
tabptr.p->keyAttr[i].attributeDescriptor = attributeDescriptor;
tabptr.p->keyAttr[i].charsetInfo = cs;
i++;
}
ndbrequire(i == noOfKeyAttr);
releaseSections(signal);
tabptr.p->noOfKeyAttr = noOfKeyAttr;
tabptr.p->hasCharAttr = hasCharAttr;
// copy char attr flag to each fragment
for (Uint32 i1 = 0; i1 < MAX_FRAG_PER_NODE; i1++) {
jam();
if (tabptr.p->fragptrholder[i1] != RNIL) {
rootfragrecptr.i = tabptr.p->fragptrholder[i1];
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
for (Uint32 i2 = 0; i2 < 2; i2++) {
fragrecptr.i = rootfragrecptr.p->fragmentptr[i2];
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
fragrecptr.p->hasCharAttr = hasCharAttr;
}
}
}
// no reply to DICT
}
void
Dbacc::execDROP_TAB_REQ(Signal* signal){
jamEntry();
@ -1550,6 +1618,7 @@ void Dbacc::initOpRec(Signal* signal)
operationRecPtr.p->hashValue = signal->theData[3];
operationRecPtr.p->tupkeylen = signal->theData[4];
operationRecPtr.p->xfrmtupkeylen = signal->theData[4];
operationRecPtr.p->transId1 = signal->theData[5];
operationRecPtr.p->transId2 = signal->theData[6];
operationRecPtr.p->transactionstate = ACTIVE;
@ -1664,6 +1733,10 @@ void Dbacc::execACCKEYREQ(Signal* signal)
ndbrequire(operationRecPtr.p->transactionstate == IDLE);
initOpRec(signal);
// normalize key if any char attr
if (! operationRecPtr.p->isAccLockReq && fragrecptr.p->hasCharAttr)
xfrmKeyData(signal);
/*---------------------------------------------------------------*/
/* */
/* WE WILL USE THE HASH VALUE TO LOOK UP THE PROPER MEMORY */
@ -1758,6 +1831,54 @@ void Dbacc::execACCKEYREQ(Signal* signal)
return;
}//Dbacc::execACCKEYREQ()
void
Dbacc::xfrmKeyData(Signal* signal)
{
tabptr.i = fragrecptr.p->myTableId;
ptrCheckGuard(tabptr, ctablesize, tabrec);
Uint32 dst[1024];
Uint32 dstSize = (sizeof(dst) >> 2);
Uint32* src = &signal->theData[7];
const Uint32 noOfKeyAttr = tabptr.p->noOfKeyAttr;
Uint32 dstPos = 0;
Uint32 srcPos = 0;
Uint32 i = 0;
while (i < noOfKeyAttr) {
const Tabrec::KeyAttr& keyAttr = tabptr.p->keyAttr[i];
Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor);
Uint32 srcWords = (srcBytes + 3) / 4;
Uint32 dstWords = ~0;
uchar* dstPtr = (uchar*)&dst[dstPos];
const uchar* srcPtr = (const uchar*)&src[srcPos];
CHARSET_INFO* cs = keyAttr.charsetInfo;
if (cs == 0) {
jam();
memcpy(dstPtr, srcPtr, srcWords << 2);
dstWords = srcWords;
} else {
jam();
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
Uint32 dstLen = xmul * srcBytes;
ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
while ((n & 3) != 0)
dstPtr[n++] = 0;
dstWords = (n >> 2);
}
dstPos += dstWords;
srcPos += srcWords;
i++;
}
memcpy(src, dst, dstPos << 2);
operationRecPtr.p->xfrmtupkeylen = dstPos;
}
void Dbacc::accIsLockedLab(Signal* signal)
{
ndbrequire(csystemRestart == ZFALSE);
@ -1848,6 +1969,7 @@ void Dbacc::insertelementLab(Signal* signal)
}//if
}//if
if (fragrecptr.p->keyLength != operationRecPtr.p->tupkeylen) {
// historical
ndbrequire(fragrecptr.p->keyLength == 0);
}//if
@ -3251,7 +3373,7 @@ void Dbacc::getdirindex(Signal* signal)
ptrCheckGuard(gdiPageptr, cpagesize, page8);
}//Dbacc::getdirindex()
void
Uint32
Dbacc::readTablePk(Uint32 localkey1)
{
Uint32 tableId = fragrecptr.p->myTableId;
@ -3259,10 +3381,11 @@ Dbacc::readTablePk(Uint32 localkey1)
Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
#ifdef VM_TRACE
memset(ckeys, 0x1f, fragrecptr.p->keyLength << 2);
memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2);
#endif
int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys);
ndbrequire(ret == fragrecptr.p->keyLength);
int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true);
ndbrequire(ret > 0);
return ret;
}
/* --------------------------------------------------------------------------------- */
@ -3306,7 +3429,6 @@ void Dbacc::getElement(Signal* signal)
Uint32 tgeNextptrtype;
register Uint32 tgeKeyptr;
register Uint32 tgeRemLen;
register Uint32 tgeCompareLen;
register Uint32 TelemLen = fragrecptr.p->elementLength;
register Uint32* Tkeydata = (Uint32*)&signal->theData[7];
@ -3314,7 +3436,6 @@ void Dbacc::getElement(Signal* signal)
tgePageindex = tgdiPageindex;
gePageptr = gdiPageptr;
tgeResult = ZFALSE;
tgeCompareLen = fragrecptr.p->keyLength;
/*
* The value seached is
* - table key for ACCKEYREQ, stored in TUP
@ -3381,12 +3502,15 @@ void Dbacc::getElement(Signal* signal)
Uint32 localkey2 = 0;
bool found;
if (! searchLocalKey) {
readTablePk(localkey1);
found = (memcmp(Tkeydata, ckeys, fragrecptr.p->keyLength << 2) == 0);
Uint32 len = readTablePk(localkey1);
found = (len == operationRecPtr.p->xfrmtupkeylen) &&
(memcmp(Tkeydata, ckeys, len << 2) == 0);
} else {
jam();
found = (localkey1 == Tkeydata[0]);
}
if (found) {
jam();
tgeLocked = ElementHeader::getLocked(tgeElementHeader);
tgeResult = ZTRUE;
operationRecPtr.p->localdata[0] = localkey1;
@ -7731,6 +7855,7 @@ void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr)
regFragPtr.p->activeDataPage = 0;
regFragPtr.p->createLcp = ZFALSE;
regFragPtr.p->stopQueOp = ZFALSE;
regFragPtr.p->hasCharAttr = ZFALSE;
regFragPtr.p->nextAllocPage = 0;
regFragPtr.p->nrWaitWriteUndoExit = 0;
regFragPtr.p->lastUndoIsStored = ZFALSE;
@ -8680,6 +8805,7 @@ void Dbacc::srDoUndoLab(Signal* signal)
const Uint32 tkeylen = undopageptr.p->undoword[tmpindex];
tmpindex++;
operationRecPtr.p->tupkeylen = tkeylen;
operationRecPtr.p->xfrmtupkeylen = 0; // not used
operationRecPtr.p->fragptr = fragrecptr.i;
ndbrequire(fragrecptr.p->keyLength != 0 &&
@ -9750,6 +9876,7 @@ void Dbacc::initScanOpRec(Signal* signal)
arrGuard(tisoLocalPtr, 2048);
operationRecPtr.p->keydata[0] = isoPageptr.p->word32[tisoLocalPtr];
operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength;
operationRecPtr.p->xfrmtupkeylen = 0; // not used
}//Dbacc::initScanOpRec()
/* --------------------------------------------------------------------------------- */

View file

@ -4318,7 +4318,28 @@ Dbdict::execTAB_COMMITCONF(Signal* signal){
signal->theData[3] = reference();
signal->theData[4] = (Uint32)tabPtr.p->tableType;
signal->theData[5] = createTabPtr.p->key;
sendSignal(DBTC_REF, GSN_TC_SCHVERREQ, signal, 6, JBB);
signal->theData[6] = (Uint32)tabPtr.p->noOfPrimkey;
Uint32 buf[2 * MAX_ATTRIBUTES_IN_INDEX];
Uint32 sz = 0;
Uint32 tAttr = tabPtr.p->firstAttribute;
while (tAttr != RNIL) {
jam();
AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr);
if (aRec->tupleKey) {
buf[sz++] = aRec->attributeDescriptor;
buf[sz++] = (aRec->extPrecision >> 16); // charset number
}
tAttr = aRec->nextAttrInTable;
}
ndbrequire(sz == 2 * tabPtr.p->noOfPrimkey);
LinearSectionPtr lsPtr[3];
lsPtr[0].p = buf;
lsPtr[0].sz = sz;
// note: ACC does not reply
sendSignal(DBACC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB, lsPtr, 1);
sendSignal(DBTC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB, lsPtr, 1);
return;
}
@ -4785,12 +4806,18 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
// charset in upper half of precision
unsigned csNumber = (attrPtr.p->extPrecision >> 16);
if (csNumber != 0) {
/*
* A new charset is first accessed here on this node.
* TODO use separate thread (e.g. via NDBFS) if need to load from file
*/
CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
if (cs == NULL) {
parseP->errorCode = CreateTableRef::InvalidCharset;
parseP->errorLine = __LINE__;
return;
}
// XXX should be done somewhere in mysql
all_charsets[cs->number] = cs;
unsigned i = 0;
while (i < noOfCharsets) {
if (charsets[i] == csNumber)

View file

@ -8280,7 +8280,7 @@ Dblqh::readPrimaryKeys(ScanRecord *scanP, TcConnectionrec *tcConP, Uint32 *dst)
tableId = tFragPtr.p->tabRef;
}
int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst);
int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false);
if(0)
ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d",
tableId, fragId, fragPageId, pageIndex, ret);

View file

@ -972,6 +972,7 @@ public:
typedef Ptr<HostRecord> HostRecordPtr;
/* *********** TABLE RECORD ********************************************* */
/********************************************************/
/* THIS RECORD CONTAINS THE CURRENT SCHEMA VERSION OF */
/* ALL TABLES IN THE SYSTEM. */
@ -982,13 +983,21 @@ public:
Uint8 dropping;
Uint8 tableType;
Uint8 storedTable;
Uint8 noOfKeyAttr;
Uint8 hasCharAttr;
struct KeyAttr {
Uint32 attributeDescriptor;
CHARSET_INFO* charsetInfo;
} keyAttr[MAX_ATTRIBUTES_IN_INDEX];
bool checkTable(Uint32 schemaVersion) const {
return enabled && !dropping && (schemaVersion == currentSchemaVersion);
}
Uint32 getErrorCode(Uint32 schemaVersion) const;
struct DropTable {
Uint32 senderRef;
Uint32 senderData;
@ -1436,6 +1445,7 @@ private:
void gcpTcfinished(Signal* signal);
void handleGcp(Signal* signal);
void hash(Signal* signal);
Uint32 xfrmKeyData(Signal* signal, Uint32* dst, Uint32 dstSize, const Uint32* src);
void initApiConnect(Signal* signal);
void initApiConnectRec(Signal* signal,
ApiConnectRecord * const regApiPtr,

View file

@ -20,6 +20,7 @@
#include "md5_hash.hpp"
#include <RefConvert.hpp>
#include <ndb_limits.h>
#include <my_sys.h>
#include <signaldata/EventReport.hpp>
#include <signaldata/TcKeyReq.hpp>
@ -63,6 +64,8 @@
#include <signaldata/PackedSignal.hpp>
#include <AttributeHeader.hpp>
#include <signaldata/DictTabInfo.hpp>
#include <AttributeDescriptor.hpp>
#include <SectionReader.hpp>
#include <NdbOut.hpp>
#include <DebuggerNames.hpp>
@ -313,6 +316,10 @@ void Dbtc::execREAD_NODESREF(Signal* signal)
void Dbtc::execTC_SCHVERREQ(Signal* signal)
{
jamEntry();
if (! assembleFragments(signal)) {
jam();
return;
}
tabptr.i = signal->theData[0];
ptrCheckGuard(tabptr, ctabrecFilesize, tableRecord);
tabptr.p->currentSchemaVersion = signal->theData[1];
@ -320,10 +327,41 @@ void Dbtc::execTC_SCHVERREQ(Signal* signal)
BlockReference retRef = signal->theData[3];
tabptr.p->tableType = (Uint8)signal->theData[4];
BlockReference retPtr = signal->theData[5];
Uint32 noOfKeyAttr = signal->theData[6];
ndbrequire(noOfKeyAttr <= MAX_ATTRIBUTES_IN_INDEX);
Uint32 hasCharAttr = 0;
SegmentedSectionPtr s0Ptr;
signal->getSection(s0Ptr, 0);
SectionReader r0(s0Ptr, getSectionSegmentPool());
Uint32 i = 0;
while (i < noOfKeyAttr) {
jam();
Uint32 attributeDescriptor = ~0;
Uint32 csNumber = ~0;
if (! r0.getWord(&attributeDescriptor) ||
! r0.getWord(&csNumber)) {
jam();
break;
}
CHARSET_INFO* cs = 0;
if (csNumber != 0) {
cs = all_charsets[csNumber];
ndbrequire(cs != 0);
hasCharAttr = 1;
}
tabptr.p->keyAttr[i].attributeDescriptor = attributeDescriptor;
tabptr.p->keyAttr[i].charsetInfo = cs;
i++;
}
ndbrequire(i == noOfKeyAttr);
releaseSections(signal);
ndbrequire(tabptr.p->enabled == false);
tabptr.p->enabled = true;
tabptr.p->dropping = false;
tabptr.p->noOfKeyAttr = noOfKeyAttr;
tabptr.p->hasCharAttr = hasCharAttr;
signal->theData[0] = tabptr.i;
signal->theData[1] = retPtr;
@ -2221,6 +2259,7 @@ void Dbtc::hash(Signal* signal)
UintR Tdata3;
UintR* Tdata32;
Uint64 Tdata[512];
Uint64 Txfrmdata[512 * MAX_XFRM_MULTIPLY];
CacheRecord * const regCachePtr = cachePtr.p;
Tdata32 = (UintR*)&Tdata[0];
@ -2250,8 +2289,21 @@ void Dbtc::hash(Signal* signal)
ti += 4;
}//while
}//if
UintR keylen = (UintR)regCachePtr->keylen;
TableRecordPtr tabptrSave = tabptr;
tabptr.i = regCachePtr->tableref; // table or hash index id
ptrCheckGuard(tabptr, ctabrecFilesize, tableRecord);
if (tabptr.p->hasCharAttr) {
jam();
keylen = xfrmKeyData(signal, (Uint32*)Txfrmdata, sizeof(Txfrmdata) >> 2, Tdata32);
Tdata32 = (UintR*)&Txfrmdata[0];
}
tabptr = tabptrSave;
Uint32 tmp[4];
md5_hash(tmp, (Uint64*)&Tdata32[0], (UintR)regCachePtr->keylen);
md5_hash(tmp, (Uint64*)&Tdata32[0], keylen);
thashValue = tmp[0];
if (regCachePtr->distributionKeyIndicator == 1) {
@ -2263,6 +2315,47 @@ void Dbtc::hash(Signal* signal)
}//if
}//Dbtc::hash()
Uint32
Dbtc::xfrmKeyData(Signal* signal, Uint32* dst, Uint32 dstSize, const Uint32* src)
{
const Uint32 noOfKeyAttr = tabptr.p->noOfKeyAttr;
Uint32 dstPos = 0;
Uint32 srcPos = 0;
Uint32 i = 0;
while (i < noOfKeyAttr) {
const TableRecord::KeyAttr& keyAttr = tabptr.p->keyAttr[i];
Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor);
Uint32 srcWords = (srcBytes + 3) / 4;
Uint32 dstWords = ~0;
uchar* dstPtr = (uchar*)&dst[dstPos];
const uchar* srcPtr = (const uchar*)&src[srcPos];
CHARSET_INFO* cs = keyAttr.charsetInfo;
if (cs == NULL) {
jam();
memcpy(dstPtr, srcPtr, srcWords << 2);
dstWords = srcWords;
} else {
jam();
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
Uint32 dstLen = xmul * srcBytes;
ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
uint n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
while ((n & 3) != 0) {
dstPtr[n++] = 0;
}
dstWords = (n >> 2);
}
dstPos += dstWords;
srcPos += srcWords;
i++;
}
return dstPos;
}
/*
INIT_API_CONNECT_REC
---------------------------
@ -9973,6 +10066,12 @@ void Dbtc::initTable(Signal* signal)
tabptr.p->tableType = 0;
tabptr.p->enabled = false;
tabptr.p->dropping = false;
tabptr.p->noOfKeyAttr = 0;
tabptr.p->hasCharAttr = 0;
for (unsigned k = 0; k < MAX_ATTRIBUTES_IN_INDEX; k++) {
tabptr.p->keyAttr[k].attributeDescriptor = 0;
tabptr.p->keyAttr[k].charsetInfo = 0;
}
}//for
}//Dbtc::initTable()

View file

@ -207,6 +207,8 @@
#define ZTUPLE_DELETED_ERROR 626
#define ZINSERT_ERROR 630
#define ZINVALID_CHAR_FORMAT 744
/* SOME WORD POSITIONS OF FIELDS IN SOME HEADERS */
#define ZPAGE_STATE_POS 0 /* POSITION OF PAGE STATE */
@ -1020,14 +1022,14 @@ public:
* for md5 summing and when returning keyinfo. Returns number of
* words or negative (-terrorCode) on error.
*/
int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut);
int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut, bool xfrmFlag);
/*
* ACC reads primary key without headers into an array of words. At
* this point in ACC deconstruction, ACC still uses logical references
* to fragment and tuple.
*/
int accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut);
int accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag);
/*
* TUX checks if tuple is visible to scan.
@ -1637,20 +1639,6 @@ private:
bool readBitsNotNULL(Uint32* outBuffer, AttributeHeader*, Uint32, Uint32);
bool updateBitsNotNULL(Uint32* inBuffer, Uint32, Uint32);
// *****************************************************************
// Read char routines optionally (tXfrmFlag) apply strxfrm
// *****************************************************************
bool readCharNotNULL(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
bool readCharNULLable(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2);
//------------------------------------------------------------------
//------------------------------------------------------------------
bool nullFlagCheck(Uint32 attrDes2);

View file

@ -173,7 +173,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
}
int
Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut)
Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut, bool xfrmFlag)
{
ljamEntry();
// use own variables instead of globals
@ -200,7 +200,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data
operPtr.i = RNIL;
operPtr.p = NULL;
// do it
int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, xfrmFlag);
// restore globals
tabptr = tabptr_old;
fragptr = fragptr_old;
@ -229,7 +229,7 @@ Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* data
}
int
Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut)
Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag)
{
ljamEntry();
// get table
@ -245,7 +245,7 @@ Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIn
ndbrequire((pageIndex & 0x1) == 0);
Uint32 pageOffset = ZPAGE_HEADER_SIZE + (pageIndex >> 1) * tablePtr.p->tupheadsize;
// use TUX routine - optimize later
int ret = tuxReadPk(fragPtr.i, pageId, pageOffset, dataOut);
int ret = tuxReadPk(fragPtr.i, pageId, pageOffset, dataOut, xfrmFlag);
return ret;
}

View file

@ -364,13 +364,8 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ndbrequire(false);
}//if
if (csNumber != 0) {
CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
if (cs == NULL) {
ljam();
terrorCode = TupAddAttrRef::InvalidCharset;
addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId);
return;
}
CHARSET_INFO* cs = all_charsets[csNumber];
ndbrequire(cs != NULL);
Uint32 i = 0;
while (i < fragOperPtr.p->charsetIndex) {
ljam();

View file

@ -59,10 +59,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
} else {
ndbrequire(false);
}//if
// replace read function of char attribute
// replace functions for char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL;
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNotNULL;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNotNULL;
}
} else {
if (AttributeDescriptor::getSize(attrDescriptor) == 0){
@ -86,10 +87,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}//if
// replace read function of char attribute
// replace functions for char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable;
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}
}//if
} else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) {
@ -337,25 +339,68 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
Uint32 newIndexBuf = indexBuf + attrNoOfWords;
Uint32 maxRead = tMaxRead;
ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
if (newIndexBuf <= maxRead) {
ljam();
ahOut->setDataSize(attrNoOfWords);
MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
&tTupleHeader[readOffset],
attrNoOfWords);
tOutBufIndex = newIndexBuf;
return true;
if (! charsetFlag || ! tXfrmFlag) {
Uint32 newIndexBuf = indexBuf + attrNoOfWords;
if (newIndexBuf <= maxRead) {
ljam();
ahOut->setDataSize(attrNoOfWords);
MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
&tTupleHeader[readOffset],
attrNoOfWords);
tOutBufIndex = newIndexBuf;
return true;
} else {
ljam();
terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
}//if
} else {
ljam();
terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
return false;
}//if
Tablerec* regTabPtr = tabptr.p;
Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
ndbrequire(i < regTabPtr->noOfCharsets);
CHARSET_INFO* cs = regTabPtr->charsetArray[i];
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
Uint32 dstLen = xmul * srcBytes;
Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
if (maxIndexBuf <= maxRead) {
ljam();
uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset];
const char* ssrcPtr = (const char*)srcPtr;
// could verify data format optionally
if (true ||
(*cs->cset->well_formed_len)(cs, ssrcPtr, ssrcPtr + srcBytes, ZNIL) == srcBytes) {
ljam();
// normalize
Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
while ((n & 3) != 0) {
dstPtr[n++] = 0;
}
Uint32 dstWords = (n >> 2);
ahOut->setDataSize(dstWords);
Uint32 newIndexBuf = indexBuf + dstWords;
ndbrequire(newIndexBuf <= maxRead);
tOutBufIndex = newIndexBuf;
return true;
} else {
ljam();
terrorCode = ZTUPLE_CORRUPTED_ERROR;
}
} else {
ljam();
terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
}
}
return false;
}//Dbtup::readFixedSizeTHManyWordNotNULL()
bool
@ -402,7 +447,6 @@ Dbtup::readFixedSizeTHManyWordNULLable(Uint32* outBuffer,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
ljam();
if (!nullFlagCheck(attrDes2)) {
ljam();
return readFixedSizeTHManyWordNotNULL(outBuffer,
@ -563,74 +607,6 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer,
return false;
}//Dbtup::readDynSmallVarSize()
bool
Dbtup::readCharNotNULL(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
Uint32 newIndexBuf = indexBuf + attrNoOfWords;
Uint32 maxRead = tMaxRead;
ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
if (newIndexBuf <= maxRead) {
ljam();
ahOut->setDataSize(attrNoOfWords);
if (! tXfrmFlag) {
MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
&tTupleHeader[readOffset],
attrNoOfWords);
} else {
ljam();
Tablerec* regTabPtr = tabptr.p;
Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
ndbrequire(i < tabptr.p->noOfCharsets);
// not const in MySQL
CHARSET_INFO* cs = tabptr.p->charsetArray[i];
// XXX should strip Uint32 null padding
const unsigned nBytes = attrNoOfWords << 2;
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)&outBuffer[indexBuf],
nBytes,
(const uchar*)&tTupleHeader[readOffset],
nBytes);
// pad with ascii spaces
while (n < nBytes)
((uchar*)&outBuffer[indexBuf])[n++] = 0x20;
}
tOutBufIndex = newIndexBuf;
return true;
} else {
ljam();
terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
return false;
}
}
bool
Dbtup::readCharNULLable(Uint32* outBuffer,
AttributeHeader* ahOut,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
if (!nullFlagCheck(attrDes2)) {
ljam();
return readCharNotNULL(outBuffer,
ahOut,
attrDescriptor,
attrDes2);
} else {
ljam();
ahOut->setNULL();
return true;
}
}
/* ---------------------------------------------------------------------- */
/* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */
/* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */
@ -818,6 +794,7 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
Uint32 indexBuf = tInBufIndex;
Uint32 inBufLen = tInBufLen;
Uint32 updateOffset = AttributeOffset::getOffset(attrDes2);
Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
AttributeHeader ahIn(inBuffer[indexBuf]);
Uint32 nullIndicator = ahIn.isNULL();
Uint32 noOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
@ -827,6 +804,21 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
if (newIndex <= inBufLen) {
if (!nullIndicator) {
ljam();
if (charsetFlag) {
ljam();
Tablerec* regTabPtr = tabptr.p;
Uint32 bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
ndbrequire(i < regTabPtr->noOfCharsets);
// not const in MySQL
CHARSET_INFO* cs = regTabPtr->charsetArray[i];
const char* ssrc = (const char*)&inBuffer[tInBufIndex + 1];
if ((*cs->cset->well_formed_len)(cs, ssrc, ssrc + bytes, ZNIL) != bytes) {
ljam();
terrorCode = ZINVALID_CHAR_FORMAT;
return false;
}
}
tInBufIndex = newIndex;
MEMCOPY_NO_WORDS(&tTupleHeader[updateOffset],
&inBuffer[indexBuf + 1],

View file

@ -753,7 +753,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
regTabPtr->noOfKeyAttr,
keyBuffer,
ZATTR_BUFFER_SIZE,
true);
false);
ndbrequire(ret != -1);
noPrimKey= ret;
@ -796,7 +796,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
numAttrsToRead,
mainBuffer,
ZATTR_BUFFER_SIZE,
true);
false);
ndbrequire(ret != -1);
noMainWords= ret;
} else {
@ -822,7 +822,7 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
numAttrsToRead,
copyBuffer,
ZATTR_BUFFER_SIZE,
true);
false);
ndbrequire(ret != -1);
noCopyWords = ret;

View file

@ -18,24 +18,26 @@
#include "Dbtux.hpp"
/*
* Search key vs node prefix or entry
* Search key vs node prefix or entry.
*
* The comparison starts at given attribute position. The position is
* updated by number of equal initial attributes found. The entry data
* may be partial in which case CmpUnknown may be returned.
*
* The attributes are normalized and have variable size given in words.
*/
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// number of words of attribute data left
unsigned len2 = maxlen;
// skip to right position in search key only
for (unsigned i = 0; i < start; i++) {
jam();
searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
}
// number of words of entry data left
unsigned len2 = maxlen;
int ret = 0;
while (start < numAttrs) {
if (len2 <= AttributeHeaderSize) {
@ -47,18 +49,20 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
if (! searchKey.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
jam();
// current attribute
// verify attribute id
const DescAttr& descAttr = descEnt.m_descAttr[start];
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2);
ndbrequire(searchKey.ah().getAttributeId() == descAttr.m_primaryAttrId);
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// sizes
const unsigned size1 = searchKey.ah().getDataSize();
const unsigned size2 = min(entryData.ah().getDataSize(), len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
const Uint32* const p1 = &searchKey[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = (*cmp)(0, p1, p2, size1, size2);
const bool full = (maxlen == MaxAttrDataSize);
ret = (*cmp)(0, p1, size1 << 2, p2, size2 << 2, full);
if (ret != 0) {
jam();
break;
@ -104,6 +108,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
* 0 a >= 2 and b > 3 yes +1
* 1 a <= 2 and b <= 3 no +1
* 1 a <= 2 and b < 3 yes -1
*
* The attributes are normalized and have variable size given in words.
*/
int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
@ -127,21 +133,21 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
if (! boundInfo.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
jam();
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
// verify attribute id
const Uint32 index = boundInfo.ah().getAttributeId();
ndbrequire(index < frag.m_numAttrs);
const DescAttr& descAttr = descEnt.m_descAttr[index];
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size
// sizes
const unsigned size1 = boundInfo.ah().getDataSize();
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2);
const unsigned size2 = min(entryData.ah().getDataSize(), len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
int ret = (*cmp)(0, p1, p2, size1, size2);
const bool full = (maxlen == MaxAttrDataSize);
int ret = (*cmp)(0, p1, size1 << 2, p2, size2 << 2, full);
if (ret != 0) {
jam();
return ret;

View file

@ -340,7 +340,7 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
out << " [savePointId " << dec << scan.m_savePointId << "]";
out << " [accLockOp " << hex << scan.m_accLockOp << "]";
out << " [accLockOps";
for (unsigned i = 0; i < Dbtux::MaxAccLockOps; i++) {
for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
if (scan.m_accLockOps[i] != RNIL)
out << " " << hex << scan.m_accLockOps[i];
}

View file

@ -217,6 +217,7 @@ Dbtux::setKeyAttrs(const Frag& frag)
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
for (unsigned i = 0; i < numAttrs; i++) {
jam();
const DescAttr& descAttr = descEnt.m_descAttr[i];
Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// set attr id and fixed size
@ -244,6 +245,26 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
jamEntry();
// TODO handle error
ndbrequire(ret > 0);
#ifdef VM_TRACE
if (debugFlags & (DebugMaint | DebugScan)) {
debugOut << "readKeyAttrs:" << endl;
ConstData data = keyData;
Uint32 totalSize = 0;
for (Uint32 i = start; i < numAttrs; i++) {
Uint32 attrId = data.ah().getAttributeId();
Uint32 dataSize = data.ah().getDataSize();
debugOut << i << " attrId=" << attrId << " size=" << dataSize;
data += 1;
for (Uint32 j = 0; j < dataSize; j++) {
debugOut << " " << hex << data[0];
data += 1;
}
debugOut << endl;
totalSize += 1 + dataSize;
}
ndbassert(totalSize == ret);
}
#endif
}
void
@ -251,7 +272,7 @@ Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize)
{
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc;
int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData);
int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData, true);
jamEntry();
// TODO handle error
ndbrequire(ret > 0);

View file

@ -59,7 +59,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
// get base fragment id and extra bits
const Uint32 fragId = req->fragId & ~1;
const Uint32 fragBit = req->fragId & 1;
// get the fragment
FragPtr fragPtr;
fragPtr.i = RNIL;
@ -71,7 +70,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
}
}
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
// set up index keys for this operation

View file

@ -16,6 +16,7 @@
#define DBTUX_META_CPP
#include "Dbtux.hpp"
#include <my_sys.h>
/*
* Create index.
@ -215,17 +216,15 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
errorCode = TuxAddAttrRef::InvalidAttributeType;
break;
}
#ifdef dbtux_uses_charset
if (descAttr.m_charset != 0) {
CHARSET_INFO *cs = get_charset(descAttr.m_charset, MYF(0));
// here use the non-binary type
CHARSET_INFO *cs = all_charsets[descAttr.m_charset];
ndbrequire(cs != 0);
if (! NdbSqlUtil::usable_in_ordered_index(descAttr.m_typeId, cs)) {
jam();
errorCode = TuxAddAttrRef::InvalidCharset;
break;
}
}
#endif
const bool lastAttr = (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd);
if (ERROR_INSERTED(12003) && fragOpPtr.p->m_fragNo == 0 && attrId == 0 ||
ERROR_INSERTED(12004) && fragOpPtr.p->m_fragNo == 0 && lastAttr ||

View file

@ -16,6 +16,7 @@
#define DBTUX_SCAN_CPP
#include "Dbtux.hpp"
#include <my_sys.h>
void
Dbtux::execACC_SCANREQ(Signal* signal)
@ -112,50 +113,89 @@ Dbtux::execACC_SCANREQ(Signal* signal)
* keys and that all but possibly last bound is non-strict.
*
* Finally save the sets of lower and upper bounds (i.e. start key and
* end key). Full bound type (< 4) is included but only the strict bit
* is used since lower and upper have now been separated.
* end key). Full bound type is included but only the strict bit is
* used since lower and upper have now been separated.
*/
void
Dbtux::execTUX_BOUND_INFO(Signal* signal)
{
jamEntry();
struct BoundInfo {
int type;
unsigned offset;
unsigned size;
};
TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
const TuxBoundInfo reqCopy = *(const TuxBoundInfo*)sig;
const TuxBoundInfo* const req = &reqCopy;
// get records
TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
const TuxBoundInfo* const req = (const TuxBoundInfo*)sig;
ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI);
Index& index = *c_indexPool.getPtr(scan.m_indexId);
// collect lower and upper bounds
const Index& index = *c_indexPool.getPtr(scan.m_indexId);
const DescEnt& descEnt = getDescEnt(index.m_descPage, index.m_descOff);
// collect normalized lower and upper bounds
struct BoundInfo {
int type2; // with EQ -> LE/GE
Uint32 offset; // offset in xfrmData
Uint32 size;
};
BoundInfo boundInfo[2][MaxIndexAttributes];
const unsigned dstSize = 1024 * MAX_XFRM_MULTIPLY;
Uint32 xfrmData[dstSize];
Uint32 dstPos = 0;
// largest attrId seen plus one
Uint32 maxAttrId[2] = { 0, 0 };
unsigned offset = 0;
const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
// walk through entries
const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
Uint32 offset = 0;
while (offset + 2 <= req->boundAiLength) {
jam();
const unsigned type = data[offset];
if (type > 4) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return;
}
const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
const Uint32 attrId = ah->getAttributeId();
const Uint32 dataSize = ah->getDataSize();
if (attrId >= index.m_numAttrs) {
if (type > 4 || attrId >= index.m_numAttrs || dstPos + 2 + dataSize > dstSize) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return;
}
// copy header
xfrmData[dstPos + 0] = data[offset + 0];
xfrmData[dstPos + 1] = data[offset + 1];
// copy bound value
Uint32 dstWords = 0;
if (! ah->isNULL()) {
jam();
const DescAttr& descAttr = descEnt.m_descAttr[attrId];
Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(descAttr.m_attrDesc);
Uint32 srcWords = (srcBytes + 3) / 4;
if (srcWords != dataSize) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return;
}
uchar* dstPtr = (uchar*)&xfrmData[dstPos + 2];
const uchar* srcPtr = (const uchar*)&data[offset + 2];
if (descAttr.m_charset == 0) {
memcpy(dstPtr, srcPtr, srcWords << 2);
dstWords = srcWords;
} else {
jam();
CHARSET_INFO* cs = all_charsets[descAttr.m_charset];
Uint32 xmul = cs->strxfrm_multiply;
if (xmul == 0)
xmul = 1;
Uint32 dstLen = xmul * srcBytes;
if (dstLen > ((dstSize - dstPos) << 2)) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::TooMuchAttrInfo;
return;
}
Uint32 n = (*cs->coll->strnxfrm)(cs, dstPtr, dstLen, srcPtr, srcBytes);
while ((n & 3) != 0) {
dstPtr[n++] = 0;
}
dstWords = n / 4;
}
}
for (unsigned j = 0; j <= 1; j++) {
jam();
// check if lower/upper bit matches
const unsigned luBit = (j << 1);
if ((type & 0x2) != luBit && type != 4)
@ -164,29 +204,35 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
const unsigned type2 = (type & 0x1) | luBit;
// fill in any gap
while (maxAttrId[j] <= attrId) {
jam();
BoundInfo& b = boundInfo[j][maxAttrId[j]++];
b.type = -1;
b.type2 = -1;
}
BoundInfo& b = boundInfo[j][attrId];
if (b.type != -1) {
// compare with previous bound
if (b.type != (int)type2 ||
b.size != 2 + dataSize ||
memcmp(&data[b.offset + 2], &data[offset + 2], dataSize << 2) != 0) {
if (b.type2 != -1) {
// compare with previously defined bound
if (b.type2 != (int)type2 ||
b.size != 2 + dstWords ||
memcmp(&xfrmData[b.offset + 2], &xfrmData[dstPos + 2], dstWords << 2) != 0) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidBounds;
return;
}
} else {
// fix length
AttributeHeader* ah = (AttributeHeader*)&xfrmData[dstPos + 1];
ah->setDataSize(dstWords);
// enter new bound
b.type = type2;
b.offset = offset;
b.size = 2 + dataSize;
jam();
b.type2 = type2;
b.offset = dstPos;
b.size = 2 + dstWords;
}
}
// jump to next
offset += 2 + dataSize;
dstPos += 2 + dstWords;
}
if (offset != req->boundAiLength) {
jam();
@ -200,13 +246,13 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
jam();
const BoundInfo& b = boundInfo[j][i];
// check for gap or strict bound before last
if (b.type == -1 || (i + 1 < maxAttrId[j] && (b.type & 0x1))) {
if (b.type2 == -1 || (i + 1 < maxAttrId[j] && (b.type2 & 0x1))) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidBounds;
return;
}
bool ok = scan.m_bound[j]->append(&data[b.offset], b.size);
bool ok = scan.m_bound[j]->append(&xfrmData[b.offset], b.size);
if (! ok) {
jam();
scan.m_state = ScanOp::Invalid;

View file

@ -1546,6 +1546,11 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
m_error.code = 743;
return -1;
}
// distribution key not supported for Char attribute
if (col->m_distributionKey && col->m_cs != NULL) {
m_error.code = 745;
return -1;
}
// charset in upper half of precision
if (col->getCharType()) {
tmpAttr.AttributeExtPrecision |= (col->m_cs->number << 16);

View file

@ -520,16 +520,6 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
// Insert Attribute Id into ATTRINFO part.
const Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
CHARSET_INFO* cs = tAttrInfo->m_cs;
// invalid data can crash kernel
if (cs != NULL &&
(*cs->cset->well_formed_len)(cs,
aValue,
aValue + sizeInBytes,
sizeInBytes) != sizeInBytes) {
setErrorCodeAbort(744);
return -1;
}
#if 0
tAttrSize = tAttrInfo->theAttrSize;
tArraySize = tAttrInfo->theArraySize;

View file

@ -62,7 +62,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData;
Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed;
Uint64 xfrmData[512];
Uint64 tempData[512];
if ((theStatus == OperationDefined) &&
@ -140,21 +139,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
aValue = (char*)&tempData[0];
}//if
}
const char* aValueToWrite = aValue;
CHARSET_INFO* cs = tAttrInfo->m_cs;
if (cs != 0) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply <= 1);
((Uint32*)xfrmData)[sizeInBytes >> 2] = 0;
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, sizeInBytes);
while (n < sizeInBytes)
((uchar*)xfrmData)[n++] = 0x20;
aValue = (char*)xfrmData;
}
Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word
@ -200,13 +184,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
*************************************************************************/
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) {
// invalid data can crash kernel
if (cs != NULL &&
(*cs->cset->well_formed_len)(cs,
aValueToWrite,
aValueToWrite + sizeInBytes,
sizeInBytes) != sizeInBytes)
goto equal_error4;
Uint32 ahValue;
const Uint32 sz = totalSizeInWords;
@ -224,7 +201,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
}
insertATTRINFO( ahValue );
insertATTRINFOloop((Uint32*)aValueToWrite, sz);
insertATTRINFOloop((Uint32*)aValue, sz);
}//if
/**************************************************************************
@ -321,10 +298,6 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
equal_error3:
setErrorCodeAbort(4209);
return -1;
equal_error4:
setErrorCodeAbort(744);
return -1;
}
/******************************************************************************

View file

@ -1072,29 +1072,6 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
setErrorCodeAbort(4209);
return -1;
}
// normalize char bound
CHARSET_INFO* cs = tAttrInfo->m_cs;
Uint64 xfrmData[1001];
if (cs != NULL && aValue != NULL) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply <= 1);
((Uint32*)xfrmData)[len >> 2] = 0;
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
(const uchar*)aValue, len);
while (n < len)
((uchar*)xfrmData)[n++] = 0x20;
if(len & 3)
{
len += (4 - (len & 3));
}
aValue = (char*)xfrmData;
}
// insert attribute header
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
@ -1117,7 +1094,7 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
theTotalNrOfKeyWordInSignal = currLen + totalLen;
} else {
if(!aligned || !nobytes){
Uint32 *tempData = (Uint32*)xfrmData;
Uint32 tempData[2000];
tempData[0] = type;
tempData[1] = ahValue;
tempData[2 + (len >> 2)] = 0;
@ -1273,10 +1250,10 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
return (r1_null ? -1 : 1);
}
const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
Uint32 len = r1->theAttrSize * r1->theArraySize;
if(!r1_null){
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType);
int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size);
int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
if(r){
assert(r != NdbSqlUtil::CmpUnknown);
return r;

View file

@ -205,6 +205,7 @@ ErrorBundle ErrorCodes[] = {
*/
{ 892, IE, "Inconsistent hash index. The index needs to be dropped and recreated" },
{ 895, IE, "Inconsistent ordered index. The index needs to be dropped and recreated" },
{ 896, IE, "Tuple corrupted - wrong checksum or column data in invalid format" },
{ 202, IE, "202" },
{ 203, IE, "203" },
{ 207, IE, "207" },
@ -311,6 +312,7 @@ ErrorBundle ErrorCodes[] = {
{ 742, SE, "Unsupported attribute type in index" },
{ 743, SE, "Unsupported character set in table or index" },
{ 744, SE, "Character string is invalid for given character set" },
{ 745, SE, "Distribution key not supported for char attribute (use binary attribute)" },
{ 241, SE, "Invalid schema object version" },
{ 283, SE, "Table is being dropped" },
{ 284, SE, "Table not defined in transaction coordinator" },

View file

@ -32,7 +32,7 @@ testTransactions \
testDeadlock \
test_event ndbapi_slow_select testReadPerf testLcp \
testPartitioning \
testBitfield
testBitfield foo
#flexTimedAsynch
#testBlobs
@ -73,6 +73,7 @@ testReadPerf_SOURCES = testReadPerf.cpp
testLcp_SOURCES = testLcp.cpp
testPartitioning_SOURCES = testPartitioning.cpp
testBitfield_SOURCES = testBitfield.cpp
foo_SOURCES = foo.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel
@ -89,3 +90,4 @@ testBackup_LDADD = $(LDADD) bank/libbank.a
%::SCCS/s.%

File diff suppressed because it is too large Load diff

View file

@ -102,7 +102,7 @@ int main(int argc, char** argv){
unsigned j;
for (j= 0; (int)j < pTab->getNoOfPrimaryKeys(); j++)
{
const NdbDictionary::Column * col = pTab->getColumn(j);
const NdbDictionary::Column * col = pTab->getColumn(pTab->getPrimaryKey(j));
ndbout << col->getName();
if ((int)j < pTab->getNoOfPrimaryKeys()-1)
ndbout << ", ";