MDEV-9220 Split filesort.cc:make_sortkey() and filesort.cc::sortlength() into virtual methods in Type_handler

This commit is contained in:
Alexander Barkov 2016-01-12 17:03:29 +04:00
parent 454589b67f
commit 5b9ee3f2ae
7 changed files with 312 additions and 209 deletions

View file

@ -964,6 +964,177 @@ static inline void store_length(uchar *to, uint length, uint pack_length)
}
void
Type_handler_string_result::make_sort_key(uchar *to, Item *item,
const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const
{
CHARSET_INFO *cs= item->collation.collation;
bool maybe_null= item->maybe_null;
if (maybe_null)
*to++= 1;
char *tmp_buffer= param->tmp_buffer ? param->tmp_buffer : (char*) to;
String tmp(tmp_buffer, param->sort_length, cs);
String *res= item->str_result(&tmp);
if (!res)
{
if (maybe_null)
memset(to - 1, 0, sort_field->length + 1);
else
{
/* purecov: begin deadcode */
/*
This should only happen during extreme conditions if we run out
of memory or have an item marked not null when it can be null.
This code is here mainly to avoid a hard crash in this case.
*/
DBUG_ASSERT(0);
DBUG_PRINT("warning",
("Got null on something that shouldn't be null"));
memset(to, 0, sort_field->length); // Avoid crash
/* purecov: end */
}
return;
}
if (use_strnxfrm(cs))
{
uint tmp_length __attribute__((unused));
tmp_length= cs->coll->strnxfrm(cs, to, sort_field->length,
item->max_char_length() *
cs->strxfrm_multiply,
(uchar*) res->ptr(), res->length(),
MY_STRXFRM_PAD_WITH_SPACE |
MY_STRXFRM_PAD_TO_MAXLEN);
DBUG_ASSERT(tmp_length == sort_field->length);
}
else
{
uint diff;
uint sort_field_length= sort_field->length - sort_field->suffix_length;
uint length= res->length();
if (sort_field_length < length)
{
diff= 0;
length= sort_field_length;
}
else
diff= sort_field_length - length;
if (sort_field->suffix_length)
{
/* Store length last in result_string */
store_length(to + sort_field_length, length, sort_field->suffix_length);
}
/* apply cs->sort_order for case-insensitive comparison if needed */
my_strnxfrm(cs,(uchar*)to,length,(const uchar*)res->ptr(),length);
char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
cs->cset->fill(cs, (char *)to+length,diff,fill_char);
}
}
void
Type_handler_int_result::make_sort_key(uchar *to, Item *item,
const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const
{
longlong value= item->val_int_result();
make_sort_key_longlong(to, item->maybe_null, item->null_value,
item->unsigned_flag, value);
}
void
Type_handler_temporal_result::make_sort_key(uchar *to, Item *item,
const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const
{
MYSQL_TIME buf;
if (item->get_date_result(&buf, TIME_INVALID_DATES))
{
DBUG_ASSERT(item->maybe_null);
DBUG_ASSERT(item->null_value);
make_sort_key_longlong(to, item->maybe_null, true,
item->unsigned_flag, 0);
}
else
make_sort_key_longlong(to, item->maybe_null, false,
item->unsigned_flag, pack_time(&buf));
}
void
Type_handler::make_sort_key_longlong(uchar *to,
bool maybe_null,
bool null_value,
bool unsigned_flag,
longlong value) const
{
if (maybe_null)
{
if (null_value)
{
memset(to, 0, 9);
return;
}
*to++= 1;
}
to[7]= (uchar) value;
to[6]= (uchar) (value >> 8);
to[5]= (uchar) (value >> 16);
to[4]= (uchar) (value >> 24);
to[3]= (uchar) (value >> 32);
to[2]= (uchar) (value >> 40);
to[1]= (uchar) (value >> 48);
if (unsigned_flag) /* Fix sign */
to[0]= (uchar) (value >> 56);
else
to[0]= (uchar) (value >> 56) ^ 128; /* Reverse signbit */
}
void
Type_handler_decimal_result::make_sort_key(uchar *to, Item *item,
const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const
{
my_decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
if (item->maybe_null)
{
if (item->null_value)
{
memset(to, 0, sort_field->length + 1);
return;
}
*to++= 1;
}
my_decimal2binary(E_DEC_FATAL_ERROR, dec_val, to,
item->max_length - (item->decimals ? 1 : 0),
item->decimals);
}
void
Type_handler_real_result::make_sort_key(uchar *to, Item *item,
const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const
{
double value= item->val_result();
if (item->maybe_null)
{
if (item->null_value)
{
memset(to, 0, sort_field->length + 1);
return;
}
*to++= 1;
}
change_double_for_sort(value, to);
}
/** Make a sort-key from record. */
static void make_sortkey(register Sort_param *param,
@ -986,161 +1157,9 @@ static void make_sortkey(register Sort_param *param,
}
else
{ // Item
Item *item=sort_field->item;
maybe_null= item->maybe_null;
switch (sort_field->result_type) {
case STRING_RESULT:
{
CHARSET_INFO *cs=item->collation.collation;
char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
if (maybe_null)
*to++=1;
char *tmp_buffer= param->tmp_buffer ? param->tmp_buffer : (char*)to;
String tmp(tmp_buffer, param->sort_length, cs);
String *res= item->str_result(&tmp);
if (!res)
{
if (maybe_null)
memset(to-1, 0, sort_field->length+1);
else
{
/* purecov: begin deadcode */
/*
This should only happen during extreme conditions if we run out
of memory or have an item marked not null when it can be null.
This code is here mainly to avoid a hard crash in this case.
*/
DBUG_ASSERT(0);
DBUG_PRINT("warning",
("Got null on something that shouldn't be null"));
memset(to, 0, sort_field->length); // Avoid crash
/* purecov: end */
}
break;
}
length= res->length();
if (sort_field->need_strxnfrm)
{
uint tmp_length __attribute__((unused));
tmp_length= cs->coll->strnxfrm(cs, to, sort_field->length,
item->max_char_length() *
cs->strxfrm_multiply,
(uchar*) res->ptr(), length,
MY_STRXFRM_PAD_WITH_SPACE |
MY_STRXFRM_PAD_TO_MAXLEN);
DBUG_ASSERT(tmp_length == sort_field->length);
}
else
{
uint diff;
uint sort_field_length= sort_field->length -
sort_field->suffix_length;
if (sort_field_length < length)
{
diff= 0;
length= sort_field_length;
}
else
diff= sort_field_length - length;
if (sort_field->suffix_length)
{
/* Store length last in result_string */
store_length(to + sort_field_length, length,
sort_field->suffix_length);
}
/* apply cs->sort_order for case-insensitive comparison if needed */
my_strnxfrm(cs,(uchar*)to,length,(const uchar*)res->ptr(),length);
cs->cset->fill(cs, (char *)to+length,diff,fill_char);
}
break;
}
case INT_RESULT:
case TIME_RESULT:
{
longlong UNINIT_VAR(value);
if (sort_field->result_type == INT_RESULT)
value= item->val_int_result();
else
{
MYSQL_TIME buf;
if (item->get_date_result(&buf, TIME_INVALID_DATES))
{
DBUG_ASSERT(maybe_null);
DBUG_ASSERT(item->null_value);
}
else
value= pack_time(&buf);
}
if (maybe_null)
{
*to++=1; /* purecov: inspected */
if (item->null_value)
{
if (maybe_null)
memset(to-1, 0, sort_field->length+1);
else
{
DBUG_PRINT("warning",
("Got null on something that shouldn't be null"));
memset(to, 0, sort_field->length);
}
break;
}
}
to[7]= (uchar) value;
to[6]= (uchar) (value >> 8);
to[5]= (uchar) (value >> 16);
to[4]= (uchar) (value >> 24);
to[3]= (uchar) (value >> 32);
to[2]= (uchar) (value >> 40);
to[1]= (uchar) (value >> 48);
if (item->unsigned_flag) /* Fix sign */
to[0]= (uchar) (value >> 56);
else
to[0]= (uchar) (value >> 56) ^ 128; /* Reverse signbit */
break;
}
case DECIMAL_RESULT:
{
my_decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
if (maybe_null)
{
if (item->null_value)
{
memset(to, 0, sort_field->length+1);
to++;
break;
}
*to++=1;
}
my_decimal2binary(E_DEC_FATAL_ERROR, dec_val, to,
item->max_length - (item->decimals ? 1:0),
item->decimals);
break;
}
case REAL_RESULT:
{
double value= item->val_result();
if (maybe_null)
{
if (item->null_value)
{
memset(to, 0, sort_field->length+1);
to++;
break;
}
*to++=1;
}
change_double_for_sort(value,(uchar*) to);
break;
}
case ROW_RESULT:
default:
// This case should never be choosen
DBUG_ASSERT(0);
break;
}
sort_field->item->make_sort_key(to, sort_field->item, sort_field, param);
if ((maybe_null= sort_field->item->maybe_null))
to++;
}
if (sort_field->reverse)
{ /* Revers key */
@ -1842,6 +1861,64 @@ static uint suffix_length(ulong string_length)
}
void
Type_handler_string_result::sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *sortorder) const
{
CHARSET_INFO *cs;
sortorder->length= item->max_length;
set_if_smaller(sortorder->length, thd->variables.max_sort_length);
if (use_strnxfrm((cs= item->collation.collation)))
{
sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
}
else if (cs == &my_charset_bin)
{
/* Store length last to be able to sort blob/varbinary */
sortorder->suffix_length= suffix_length(sortorder->length);
sortorder->length+= sortorder->suffix_length;
}
}
void
Type_handler_temporal_result::sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *sortorder) const
{
sortorder->length= 8; // Sizof intern longlong
}
void
Type_handler_int_result::sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *sortorder) const
{
sortorder->length= 8; // Sizof intern longlong
}
void
Type_handler_real_result::sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *sortorder) const
{
sortorder->length= sizeof(double);
}
void
Type_handler_decimal_result::sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *sortorder) const
{
sortorder->length=
my_decimal_get_binary_size(item->max_length - (item->decimals ? 1 : 0),
item->decimals);
}
/**
Calculate length of sort key.
@ -1854,8 +1931,6 @@ static uint suffix_length(ulong string_length)
@note
sortorder->length is updated for each sort item.
@n
sortorder->need_strxnfrm is set 1 if we have to use strxnfrm
@return
Total length of sort buffer in bytes
@ -1866,23 +1941,19 @@ sortlength(THD *thd, SORT_FIELD *sortorder, uint s_length,
bool *multi_byte_charset)
{
uint length;
CHARSET_INFO *cs;
*multi_byte_charset= 0;
length=0;
for (; s_length-- ; sortorder++)
{
sortorder->need_strxnfrm= 0;
sortorder->suffix_length= 0;
if (sortorder->field)
{
cs= sortorder->field->sort_charset();
CHARSET_INFO *cs= sortorder->field->sort_charset();
sortorder->length= sortorder->field->sort_length();
if (use_strnxfrm((cs=sortorder->field->sort_charset())))
{
sortorder->need_strxnfrm= 1;
*multi_byte_charset= 1;
*multi_byte_charset= true;
sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
}
if (sortorder->field->maybe_null())
@ -1890,42 +1961,10 @@ sortlength(THD *thd, SORT_FIELD *sortorder, uint s_length,
}
else
{
sortorder->result_type= sortorder->item->cmp_type();
switch (sortorder->result_type) {
case STRING_RESULT:
sortorder->length=sortorder->item->max_length;
set_if_smaller(sortorder->length, thd->variables.max_sort_length);
if (use_strnxfrm((cs=sortorder->item->collation.collation)))
{
sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
sortorder->need_strxnfrm= 1;
*multi_byte_charset= 1;
}
else if (cs == &my_charset_bin)
{
/* Store length last to be able to sort blob/varbinary */
sortorder->suffix_length= suffix_length(sortorder->length);
sortorder->length+= sortorder->suffix_length;
}
break;
case TIME_RESULT:
case INT_RESULT:
sortorder->length=8; // Size of intern longlong
break;
case DECIMAL_RESULT:
sortorder->length=
my_decimal_get_binary_size(sortorder->item->max_length -
(sortorder->item->decimals ? 1 : 0),
sortorder->item->decimals);
break;
case REAL_RESULT:
sortorder->length=sizeof(double);
break;
case ROW_RESULT:
default:
// This case should never be choosen
DBUG_ASSERT(0);
break;
sortorder->item->sortlength(thd, sortorder->item, sortorder);
if (use_strnxfrm(sortorder->item->collation.collation))
{
*multi_byte_charset= true;
}
if (sortorder->item->maybe_null)
length++; // Place for NULL marker

View file

@ -24,10 +24,10 @@ class SQL_SELECT;
class SQL_SELECT;
class THD;
struct TABLE;
typedef struct st_sort_field SORT_FIELD;
struct SORT_FIELD;
class Filesort_tracker;
ha_rows filesort(THD *thd, TABLE *table, st_sort_field *sortorder,
ha_rows filesort(THD *thd, TABLE *table, SORT_FIELD *sortorder,
uint s_length, SQL_SELECT *select,
ha_rows max_rows, bool sort_positions,
ha_rows *examined_rows, ha_rows *found_rows,

View file

@ -774,6 +774,17 @@ public:
Item_result result_type() const { return type_handler()->result_type(); }
/* ... while cmp_type() specifies how it should be compared */
Item_result cmp_type() const { return type_handler()->cmp_type(); }
void make_sort_key(uchar *to, Item *item, const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const
{
type_handler()->make_sort_key(to, item, sort_field, param);
}
void sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *attr) const
{
type_handler()->sortlength(thd, item, attr);
}
virtual Item_result cast_to_int_type() const { return cmp_type(); }
enum_field_types string_field_type() const
{

View file

@ -4883,16 +4883,19 @@ public:
/* Structs used when sorting */
struct SORT_FIELD_ATTR
{
uint length; /* Length of sort field */
uint suffix_length; /* Length suffix (0-4) */
};
typedef struct st_sort_field {
struct SORT_FIELD: public SORT_FIELD_ATTR
{
Field *field; /* Field to sort */
Item *item; /* Item if not sorting fields */
uint length; /* Length of sort field */
uint suffix_length; /* Length suffix (0-4) */
Item_result result_type; /* Type of item */
bool reverse; /* if descending sort */
bool need_strxnfrm; /* If we have to use strxnfrm() */
} SORT_FIELD;
};
typedef struct st_sort_buffer {

View file

@ -23,8 +23,8 @@
#include "queues.h"
typedef struct st_buffpek BUFFPEK;
typedef struct st_sort_field SORT_FIELD;
struct SORT_FIELD;
class Field;
struct TABLE;

View file

@ -22,7 +22,7 @@ class JOIN;
struct TABLE_LIST;
typedef class Item COND;
typedef class st_select_lex SELECT_LEX;
typedef struct st_sort_field SORT_FIELD;
struct SORT_FIELD;
#ifndef DBUG_OFF
void print_where(COND *cond,const char *info, enum_query_type query_type);

View file

@ -25,12 +25,19 @@
class Field;
class Item;
class Type_std_attributes;
class Sort_param;
struct TABLE;
struct SORT_FIELD_ATTR;
class Type_handler
{
protected:
const Type_handler *string_type_handler(uint max_octet_length) const;
void make_sort_key_longlong(uchar *to,
bool maybe_null, bool null_value,
bool unsigned_flag,
longlong value) const;
public:
static const Type_handler *get_handler_by_field_type(enum_field_types type);
static const Type_handler *get_handler_by_real_type(enum_field_types type);
@ -79,6 +86,12 @@ public:
virtual Field *make_conversion_table_field(TABLE *TABLE,
uint metadata,
const Field *target) const= 0;
virtual void make_sort_key(uchar *to, Item *item,
const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const= 0;
virtual void sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *attr) const= 0;
};
@ -90,6 +103,11 @@ public:
Item_result result_type() const { return REAL_RESULT; }
Item_result cmp_type() const { return REAL_RESULT; }
virtual ~Type_handler_real_result() {}
void make_sort_key(uchar *to, Item *item, const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const;
void sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *attr) const;
};
@ -100,6 +118,11 @@ public:
Item_result cmp_type() const { return DECIMAL_RESULT; }
virtual ~Type_handler_decimal_result() {};
Field *make_num_distinct_aggregator_field(MEM_ROOT *, const Item *) const;
void make_sort_key(uchar *to, Item *item, const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const;
void sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *attr) const;
};
@ -110,6 +133,11 @@ public:
Item_result cmp_type() const { return INT_RESULT; }
virtual ~Type_handler_int_result() {}
Field *make_num_distinct_aggregator_field(MEM_ROOT *, const Item *) const;
void make_sort_key(uchar *to, Item *item, const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const;
void sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *attr) const;
};
@ -119,6 +147,11 @@ public:
Item_result result_type() const { return STRING_RESULT; }
Item_result cmp_type() const { return TIME_RESULT; }
virtual ~Type_handler_temporal_result() {}
void make_sort_key(uchar *to, Item *item, const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const;
void sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *attr) const;
};
@ -131,6 +164,11 @@ public:
const Type_handler *
type_handler_adjusted_to_max_octet_length(uint max_octet_length,
CHARSET_INFO *cs) const;
void make_sort_key(uchar *to, Item *item, const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const;
void sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *attr) const;
};
@ -526,6 +564,18 @@ public:
{
return m_type_handler->make_conversion_table_field(table, metadata, target);
}
void make_sort_key(uchar *to, Item *item, const SORT_FIELD_ATTR *sort_field,
Sort_param *param) const
{
m_type_handler->make_sort_key(to, item, sort_field, param);
}
void sortlength(THD *thd,
const Type_std_attributes *item,
SORT_FIELD_ATTR *attr) const
{
m_type_handler->sortlength(thd, item, attr);
}
};