From 5a7daab5b77c503e5ad2869602dcadc71beaadbd Mon Sep 17 00:00:00 2001 From: Zardosht Kasheff Date: Wed, 17 Apr 2013 00:01:45 -0400 Subject: [PATCH] addresses #1219 move clustering keys to main line git-svn-id: file:///svn/mysql/tokudb-engine/src@7591 c7de825b-a66e-492c-adef-691d508d4ae1 --- storage/tokudb/ha_tokudb.cc | 475 ++++++++++++++++++++++++++++-------- storage/tokudb/ha_tokudb.h | 9 +- 2 files changed, 375 insertions(+), 109 deletions(-) diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc index c10f5dfeabc..6701972c77f 100644 --- a/storage/tokudb/ha_tokudb.cc +++ b/storage/tokudb/ha_tokudb.cc @@ -678,12 +678,6 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const { DBUG_RETURN(flags); } -static int tokudb_cmp_hidden_key(DB * file, const DBT * new_key, const DBT * saved_key) { - ulonglong a = uint5korr((char *) new_key->data); - ulonglong b = uint5korr((char *) saved_key->data); - return a < b ? -1 : (a > b ? 1 : 0); -} - /* Things that are required for ALL data types: key_part->field->null_bit @@ -804,6 +798,13 @@ cleanup: return r; } +static int tokudb_cmp_hidden_key(DB * file, const DBT * new_key, const DBT * saved_key) { + ulonglong a = uint5korr((char *) new_key->data); + ulonglong b = uint5korr((char *) saved_key->data); + return a < b ? -1 : (a > b ? 1 : 0); +} + + static int tokudb_compare_two_keys(KEY *key, const DBT * new_key, const DBT * saved_key, bool cmp_prefix) { uchar new_key_inf_val = *(uchar *) new_key->data; uchar saved_key_inf_val = *(uchar *) saved_key->data; @@ -867,9 +868,122 @@ static int tokudb_compare_two_keys(KEY *key, const DBT * new_key, const DBT * sa return ret_val; } + + +// +// this is super super ugly, copied from compare_two_keys so that it can get done fast +// +static int tokudb_compare_two_clustered_keys(KEY *key, KEY* primary_key, const DBT * new_key, const DBT * saved_key) { + uchar new_key_inf_val = *(uchar *) new_key->data; + uchar saved_key_inf_val = *(uchar *) saved_key->data; + // + // first byte is "infinity" byte + // + uchar *new_key_ptr = (uchar *)(new_key->data) + 1; + uchar *saved_key_ptr = (uchar *)(saved_key->data) + 1; + KEY_PART_INFO *key_part = key->key_part, *end = key_part + key->key_parts; + int ret_val; + // + // do not include the inf val at the beginning + // + uint new_key_length = new_key->size - sizeof(uchar); + uint saved_key_length = saved_key->size - sizeof(uchar); + + //DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size); + for (; key_part != end && (int) new_key_length > 0 && (int) saved_key_length > 0; key_part++) { + int cmp; + uint new_key_field_length; + uint saved_key_field_length; + if (key_part->field->null_bit) { + assert(new_key_ptr < (uchar *) new_key->data + new_key->size); + assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size); + if (*new_key_ptr != *saved_key_ptr) { + return ((int) *new_key_ptr - (int) *saved_key_ptr); } + saved_key_ptr++; + new_key_length--; + saved_key_length--; + if (!*new_key_ptr++) { continue; } + } + new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length); + saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length); + assert(new_key_length >= new_key_field_length); + assert(saved_key_length >= saved_key_field_length); + if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0))) + return cmp; + new_key_ptr += new_key_field_length; + new_key_length -= new_key_field_length; + saved_key_ptr += saved_key_field_length; + saved_key_length -= saved_key_field_length; + } + if (new_key_length == 0 && saved_key_length == 0){ + ret_val = 0; + } + else if (new_key_length == 0 && saved_key_length > 0) { + ret_val = (new_key_inf_val == COL_POS_INF ) ? 1 : -1; + } + else if (new_key_length > 0 && saved_key_length == 0) { + ret_val = (saved_key_inf_val == COL_POS_INF ) ? -1 : 1; + } + // + // now we compare the primary key + // + else { + if (primary_key == NULL) { + // + // primary key hidden + // + ulonglong a = uint5korr((char *) new_key_ptr); + ulonglong b = uint5korr((char *) saved_key_ptr); + ret_val = a < b ? -1 : (a > b ? 1 : 0); + } + else { + // + // primary key not hidden, I know this is bad, basically copying the code from above + // + key_part = primary_key->key_part; + end = key_part + primary_key->key_parts; + for (; key_part != end && (int) new_key_length > 0 && (int) saved_key_length > 0; key_part++) { + int cmp; + uint new_key_field_length; + uint saved_key_field_length; + if (key_part->field->null_bit) { + assert(new_key_ptr < (uchar *) new_key->data + new_key->size); + assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size); + if (*new_key_ptr != *saved_key_ptr) { + return ((int) *new_key_ptr - (int) *saved_key_ptr); } + saved_key_ptr++; + new_key_length--; + saved_key_length--; + if (!*new_key_ptr++) { continue; } + } + new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length); + saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length); + assert(new_key_length >= new_key_field_length); + assert(saved_key_length >= saved_key_field_length); + if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0))) + return cmp; + new_key_ptr += new_key_field_length; + new_key_length -= new_key_field_length; + saved_key_ptr += saved_key_field_length; + saved_key_length -= saved_key_field_length; + } + // + // at this point, we have compared the actual keys and the primary key, we return 0 + // + ret_val = 0; + } + } + return ret_val; +} + + static int tokudb_cmp_packed_key(DB *file, const DBT *keya, const DBT *keyb) { assert(file->app_private != 0); KEY *key = (KEY *) file->app_private; + KEY *primary_key = (KEY *) file->api_internal; + if (key->flags & HA_CLUSTERING) { + return tokudb_compare_two_clustered_keys(key, primary_key, keya, keyb); + } return tokudb_compare_two_keys(key, keya, keyb, false); } @@ -947,7 +1061,7 @@ typedef struct smart_dbt_ai_info { static void smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) { SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context; - info->ha->unpack_row(info->buf,row,key); + info->ha->unpack_row(info->buf,row,key, true); // // copy the key to prim_key // @@ -1207,9 +1321,14 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i } DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name)); - (*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT); + // + // clustering keys are not DB_DUP, because their keys are unique (they have the PK embedded) + // + if (!(key_info->flags & HA_CLUSTERING)) { + (*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT); + (*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key); + } (*ptr)->api_internal = share->file->app_private; - (*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key); if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) { my_errno = error; @@ -1269,7 +1388,8 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { } /* Need some extra memory in case of packed keys */ // the "+ 1" is for the first byte that states +/- infinity - max_key_length = table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar); + // multiply everything by 2 to account for clustered keys having a key and primary key together + max_key_length = 2*(table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar)); if (!(alloc_ptr = my_multi_malloc(MYF(MY_WME), &key_buff, max_key_length, @@ -1342,8 +1462,12 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { TOKUDB_DBUG_RETURN(1); } - if (!hidden_primary_key) + if (!hidden_primary_key) { share->file->app_private = (void *) (table_share->key_info + table_share->primary_key); + } + else { + share->file->app_private = NULL; + } if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) { bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream)); cmp_byte_stream.flags = DB_DBT_MALLOC; @@ -1678,7 +1802,7 @@ ulong ha_tokudb::max_row_length(const uchar * buf) { // [in] record - row in MySQL format // -int ha_tokudb::pack_row(DBT * row, const uchar * record) { +int ha_tokudb::pack_row(DBT * row, const uchar * record, bool strip_pk) { uchar *ptr; int r = ENOSYS; bzero((void *) row, sizeof(*row)); @@ -1692,7 +1816,7 @@ int ha_tokudb::pack_row(DBT * row, const uchar * record) { // fixed length row is first below // if (share->fixed_length_row) { - if (hidden_primary_key) { + if (hidden_primary_key || !strip_pk) { row->data = (void *)record; row->size = table_share->reclength; r = 0; @@ -1757,7 +1881,7 @@ int ha_tokudb::pack_row(DBT * row, const uchar * record) { // if the primary key is hidden, primary_key_offsets will be NULL and // this clause will not execute // - if (primary_key_offsets) { + if (primary_key_offsets && strip_pk) { uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset; if (curr_skip_offset == curr_field_offset) { // @@ -1799,13 +1923,13 @@ cleanup: // [out] record - row in MySQL format // [in] row - row stored in DBT to be converted // -void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) { +void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key, bool pk_stripped) { // // two cases, fixed length row, and variable length row // fixed length row is first below // if (share->fixed_length_row) { - if (hidden_primary_key) { + if (hidden_primary_key || !pk_stripped) { memcpy(record, (void *) row->data, table_share->reclength); } else { @@ -1842,7 +1966,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) { const uchar *ptr = (const uchar *) row->data; memcpy(record, ptr, table_share->null_bytes); ptr += table_share->null_bytes; - if (primary_key_offsets) { + if (primary_key_offsets && pk_stripped) { // // unpack_key will fill in parts of record that are part of the primary key // @@ -1855,7 +1979,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) { uint curr_skip_index = 0; for (Field ** field = table->field; *field; field++) { uint curr_field_offset = field_offset(*field); - if (primary_key_offsets) { + if (primary_key_offsets && pk_stripped) { uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset; if (curr_skip_offset == curr_field_offset) { // @@ -1887,18 +2011,11 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) { } -// -// Store the key and the primary key into the row -// Parameters: -// [out] record - key stored in MySQL format -// [in] key - key stored in DBT to be converted -// index -index into key_file that represents the DB -// unpacking a key of -// -void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) { + +u_int32_t ha_tokudb::place_key_into_mysql_buff(uchar * record, uchar* data, uint index) { KEY *key_info = table->key_info + index; KEY_PART_INFO *key_part = key_info->key_part, *end = key_part + key_info->key_parts; - uchar *pos = (uchar *) key->data + 1; + uchar *pos = data; for (; key_part != end; key_part++) { if (key_part->null_bit) { @@ -1922,8 +2039,76 @@ void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) { unpack_length, table->s->db_low_byte_first); #endif } + return pos-data; } +// +// Store the key and the primary key into the row +// Parameters: +// [out] record - key stored in MySQL format +// [in] key - key stored in DBT to be converted +// index -index into key_file that represents the DB +// unpacking a key of +// +void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) { + u_int32_t bytes_read; + uchar *pos = (uchar *) key->data + 1; + bytes_read = place_key_into_mysql_buff(record,pos,index); + if((table->key_info[index].flags & HA_CLUSTERING) && !hidden_primary_key) { + // + // also unpack primary key + // + place_key_into_mysql_buff(record,pos+bytes_read,primary_key); + } +} + + + + +u_int32_t ha_tokudb::place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length) { + KEY_PART_INFO *key_part = key_info->key_part; + KEY_PART_INFO *end = key_part + key_info->key_parts; + uchar* curr_buff = buff; + *has_null = false; + for (; key_part != end && key_length > 0; key_part++) { + // + // accessing key_part->field->null_bit instead off key_part->null_bit + // because key_part->null_bit is not set in add_index + // filed ticket 862 to look into this + // + if (key_part->field->null_bit) { + /* Store 0 if the key part is a NULL part */ + uint null_offset = (uint) ((char*) key_part->field->null_ptr + - (char*) table->record[0]); + if (record[null_offset] & key_part->field->null_bit) { + *curr_buff++ = NULL_COL_VAL; + *has_null = true; + // + // fractal tree does not handle this falg at the moment + // so commenting out for now + // + //key->flags |= DB_DBT_DUPOK; + continue; + } + *curr_buff++ = NONNULL_COL_VAL; // Store NOT NULL marker + } + // + // accessing field_offset(key_part->field) instead off key_part->offset + // because key_part->offset is SET INCORRECTLY in add_index + // filed ticket 862 to look into this + // + curr_buff = key_part->field->pack_key(curr_buff, (uchar *) (record + field_offset(key_part->field)), +#if MYSQL_VERSION_ID < 50123 + key_part->length); +#else + key_part->length, table->s->db_low_byte_first); +#endif + key_length -= key_part->length; + } + return curr_buff - buff; +} + + // // Create a packed key from a row. This key will be written as such @@ -1940,8 +2125,8 @@ void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) { // DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length) { - KEY_PART_INFO *key_part = key_info->key_part; - KEY_PART_INFO *end = key_part + key_info->key_parts; + u_int32_t size = 0; + uchar* tmp_buff = buff; my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); key->data = buff; @@ -1952,45 +2137,28 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, // from a row, there is no way that columns can be missing, so in practice, // this will be meaningless. Might as well put in a value // - *buff++ = COL_NEG_INF; - - *has_null = false; - for (; key_part != end && key_length > 0; key_part++) { - // - // accessing key_part->field->null_bit instead off key_part->null_bit - // because key_part->null_bit is not set in add_index - // filed ticket 862 to look into this - // - if (key_part->field->null_bit) { - /* Store 0 if the key part is a NULL part */ - uint null_offset = (uint) ((char*) key_part->field->null_ptr - - (char*) table->record[0]); - if (record[null_offset] & key_part->field->null_bit) { - *buff++ = NULL_COL_VAL; - *has_null = true; - // - // fractal tree does not handle this falg at the moment - // so commenting out for now - // - //key->flags |= DB_DBT_DUPOK; - continue; - } - *buff++ = NONNULL_COL_VAL; // Store NOT NULL marker + *tmp_buff++ = COL_NEG_INF; + size++; + size += place_key_into_dbt_buff(key_info, tmp_buff, record, has_null, key_length); + if (key_info->flags & HA_CLUSTERING) { + tmp_buff = buff + size; + if (hidden_primary_key) { + memcpy_fixed(tmp_buff, current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); + size += TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH; + } + else { + bool tmp_bool = false; + size += place_key_into_dbt_buff( + &table->key_info[primary_key], + tmp_buff, + record, + &tmp_bool, + MAX_KEY_LENGTH //this parameter does not matter + ); } - // - // accessing field_offset(key_part->field) instead off key_part->offset - // because key_part->offset is SET INCORRECTLY in add_index - // filed ticket 862 to look into this - // - buff = key_part->field->pack_key(buff, (uchar *) (record + field_offset(key_part->field)), -#if MYSQL_VERSION_ID < 50123 - key_part->length); -#else - key_part->length, table->s->db_low_byte_first); -#endif - key_length -= key_part->length; } - key->size = (buff - (uchar *) key->data); + + key->size = size; DBUG_DUMP("key", (uchar *) key->data, key->size); dbug_tmp_restore_column_map(table->write_set, old_map); return key; @@ -2334,7 +2502,7 @@ int ha_tokudb::write_row(uchar * record) { pthread_mutex_unlock(&share->mutex); } - if ((error = pack_row(&row, (const uchar *) record))){ + if ((error = pack_row(&row, (const uchar *) record, true))){ goto cleanup; } @@ -2369,10 +2537,12 @@ int ha_tokudb::write_row(uchar * record) { last_dup_key = primary_key; goto cleanup; } + // // now insertion for rest of indexes // for (uint keynr = 0; keynr < table_share->keys; keynr++) { + bool cluster_row_created = false; if (keynr == primary_key) { continue; } @@ -2382,13 +2552,30 @@ int ha_tokudb::write_row(uchar * record) { if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) { put_flags = DB_YESOVERWRITE; } - error = share->key_file[keynr]->put( - share->key_file[keynr], - txn, - &key, - &prim_key, - put_flags - ); + if (table->key_info[keynr].flags & HA_CLUSTERING) { + if (!cluster_row_created) { + if ((error = pack_row(&row, (const uchar *) record, false))){ + goto cleanup; + } + cluster_row_created = true; + } + error = share->key_file[keynr]->put( + share->key_file[keynr], + txn, + &key, + &row, + put_flags + ); + } + else { + error = share->key_file[keynr]->put( + share->key_file[keynr], + txn, + &key, + &prim_key, + put_flags + ); + } // // We break if we hit an error, unless it is a dup key error // and MySQL told us to ignore duplicate key errors @@ -2456,7 +2643,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons // Primary key changed or we are updating a key that can have duplicates. // Delete the old row and add a new one if (!(error = remove_key(trans, primary_key, old_row, old_key))) { - if (!(error = pack_row(&row, new_row))) { + if (!(error = pack_row(&row, new_row, true))) { if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) { // Probably a duplicated key; restore old key and row if needed last_dup_key = primary_key; @@ -2466,7 +2653,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons } else { // Primary key didn't change; just update the row data - if (!(error = pack_row(&row, new_row))) { + if (!(error = pack_row(&row, new_row, true))) { error = share->file->put(share->file, trans, new_key, &row, 0); } } @@ -2484,7 +2671,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons // int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { TOKUDB_DBUG_ENTER("update_row"); - DBT prim_key, key, old_prim_key; + DBT prim_key, key, old_prim_key, row; int error; bool primary_key_changed; bool has_null; @@ -2555,6 +2742,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { } // Update all other keys for (uint keynr = 0; keynr < table_share->keys; keynr++) { + bool cluster_row_created = false; if (keynr == primary_key) { continue; } @@ -2568,13 +2756,30 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) { put_flags = DB_YESOVERWRITE; } - error = share->key_file[keynr]->put( - share->key_file[keynr], - txn, - &key, - &prim_key, - put_flags - ); + if (table->key_info[keynr].flags & HA_CLUSTERING) { + if (!cluster_row_created) { + if ((error = pack_row(&row, (const uchar *) new_row, false))){ + goto cleanup; + } + cluster_row_created = true; + } + error = share->key_file[keynr]->put( + share->key_file[keynr], + txn, + &key, + &row, + put_flags + ); + } + else { + error = share->key_file[keynr]->put( + share->key_file[keynr], + txn, + &key, + &prim_key, + put_flags + ); + } // // We break if we hit an error, unless it is a dup key error // and MySQL told us to ignore duplicate key errors @@ -2635,7 +2840,12 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT else if (keynr == primary_key) { // Unique key DBUG_PRINT("Unique key", ("index: %d", keynr)); error = share->key_file[keynr]->del(share->key_file[keynr], trans, prim_key , 0); - } + } + else if (table->key_info[keynr].flags & HA_CLUSTERING) { + DBUG_PRINT("clustering key", ("index: %d", keynr)); + create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null); + error = share->key_file[keynr]->del(share->key_file[keynr], trans, &key , 0); + } else { create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null); error = share->key_file[keynr]->delboth( @@ -2818,6 +3028,16 @@ void ha_tokudb::extract_hidden_primary_key(uint keynr, DBT const *row, DBT const if (keynr == primary_key) { memcpy_fixed(current_ident, (char *) found_key->data, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); } + // + // if clustering key, hidden primary key is at end of found_key + // + else if (table->key_info[keynr].flags & HA_CLUSTERING) { + memcpy_fixed( + current_ident, + (char *) found_key->data + found_key->size - TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH, + TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH + ); + } else { memcpy_fixed(current_ident, (char *) row->data, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); } @@ -2838,7 +3058,7 @@ void ha_tokudb::read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const TOKUDB_DBUG_ENTER("ha_tokudb::read_key_only"); table->status = 0; unpack_key(buf, found_key, keynr); - if (!hidden_primary_key && (keynr != primary_key)) { + if (!hidden_primary_key && (keynr != primary_key) && !(table->key_info[keynr].flags & HA_CLUSTERING)) { unpack_key(buf, row, primary_key); } DBUG_VOID_RETURN; @@ -2857,7 +3077,10 @@ void ha_tokudb::read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) { TOKUDB_DBUG_ENTER("ha_tokudb::read_primary_key"); table->status = 0; - if (keynr != primary_key) { + // + // case where we read from secondary table that is not clustered + // + if (keynr != primary_key && !(table->key_info[keynr].flags & HA_CLUSTERING)) { // // create a DBT that has the same data as row, // @@ -2866,8 +3089,17 @@ void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT co last_key.size = row->size; memcpy(key_buff, row->data, row->size); } + // + // case we read from clustered key, so unpack the entire row + // + else if (keynr != primary_key && (table->key_info[keynr].flags & HA_CLUSTERING)) { + unpack_row(buf, row, found_key, false); + } + // + // case we read from the primary key, so unpack the entire row + // else { - unpack_row(buf, row, found_key); + unpack_row(buf, row, found_key, true); } if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); } DBUG_VOID_RETURN; @@ -2892,7 +3124,7 @@ int ha_tokudb::read_full_row(uchar * buf) { table->status = STATUS_NOT_FOUND; TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error); } - unpack_row(buf, ¤t_row, &last_key); + unpack_row(buf, ¤t_row, &last_key, true); TOKUDB_DBUG_RETURN(0); } @@ -2929,13 +3161,20 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun if (key_read && found_key) { // TOKUDB_DBUG_DUMP("key=", found_key->data, found_key->size); unpack_key(buf, found_key, keynr); - if (!hidden_primary_key) { + if (!hidden_primary_key && !(table->key_info[keynr].flags & HA_CLUSTERING)) { // TOKUDB_DBUG_DUMP("row=", row->data, row->size); unpack_key(buf, row, primary_key); } TOKUDB_DBUG_RETURN(0); } // + // in this case we have a clustered key, so no need to do pt query + // + if (table->key_info[keynr].flags & HA_CLUSTERING) { + unpack_row(buf, row, found_key, false); + TOKUDB_DBUG_RETURN(0); + } + // // create a DBT that has the same data as row, // DBT key; @@ -2951,14 +3190,17 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun table->status = STATUS_NOT_FOUND; TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error); } - unpack_row(buf, ¤t_row, &key); + unpack_row(buf, ¤t_row, &key, true); } else { + // + // in this case we are dealing with the primary key + // if (key_read && !hidden_primary_key) { unpack_key(buf, found_key, keynr); } else { - unpack_row(buf, row, found_key); + unpack_row(buf, row, found_key, true); } } if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); } @@ -3212,7 +3454,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ if (!error && do_read_row) { error = read_row(buf, active_index, &row, &last_key); } - else if (!error && !key_read && active_index != primary_key) { + else if (!error && !key_read && active_index != primary_key && !(table->key_info[active_index].flags & HA_CLUSTERING)) { error = read_full_row(buf); } @@ -3251,7 +3493,7 @@ int ha_tokudb::index_next(uchar * buf) { // still need to get entire contents of the row if operation done on // secondary DB and it was NOT a covering index // - if (!error && !key_read && (active_index != primary_key) ) { + if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { error = read_full_row(buf); } cleanup: @@ -3286,13 +3528,13 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { u_int32_t flags = SET_READ_FLAG(0); error = handle_cursor_error(cursor->c_getf_next_dup(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index); - if (!error && !key_read && active_index != primary_key) { + if (!error && !key_read && active_index != primary_key && !(table->key_info[active_index].flags & HA_CLUSTERING)) { error = read_full_row(buf); } } else { u_int32_t flags = SET_READ_FLAG(0); error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index); - if (!error && !key_read && active_index != primary_key) { + if (!error && !key_read && active_index != primary_key && !(table->key_info[active_index].flags & HA_CLUSTERING)) { error = read_full_row(buf); } if (!error &&::key_cmp_if_same(table, key, active_index, keylen)) @@ -3328,7 +3570,7 @@ int ha_tokudb::index_prev(uchar * buf) { // still need to get entire contents of the row if operation done on // secondary DB and it was NOT a covering index // - if (!error && !key_read && (active_index != primary_key) ) { + if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { error = read_full_row(buf); } @@ -4180,10 +4422,11 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in char part[MAX_ALIAS_NAME + 10]; for (uint i = 0; i < form->s->keys; i++) { if (i != primary_key) { + int flags = (form->s->key_info[i].flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT; sprintf(part, "key-%s", form->s->key_info[i].name); make_name(newname, name, part); fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME); - error = create_sub_table(name_buff, NULL, DB_BTREE, DB_DUP + DB_DUPSORT); + error = create_sub_table(name_buff, NULL, DB_BTREE, flags); if (tokudb_debug & TOKUDB_DEBUG_OPEN) { TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, form->key_info[i].flags, error); } @@ -4329,11 +4572,17 @@ double ha_tokudb::read_time( double total_scan; double ret_val; - if (index != primary_key) { + // + // if it is not the primary key, and it is not a clustering key, then return handler::read_time + // + if (index != primary_key && !(table->key_info[index].flags & HA_CLUSTERING)) { ret_val = handler::read_time(index, ranges, rows); goto cleanup; } + // + // for primary key and for clustered keys, return a fraction of scan_time() + // total_scan = scan_time(); if (stats.records < rows) { @@ -4648,12 +4897,13 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { uchar* tmp_key_buff = NULL; uchar* tmp_prim_key_buff = NULL; uchar* tmp_record = NULL; - THD* thd = ha_thd(); + uchar* tmp_record2 = NULL; newname = (char *)my_malloc(share->table_name_length + NAME_CHAR_LEN, MYF(MY_WME)); tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME)); tmp_prim_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME)); tmp_record = (uchar *)my_malloc(table_arg->s->rec_buff_length,MYF(MY_WME)); + tmp_record2 = (uchar *)my_malloc(2*table_arg->s->rec_buff_length,MYF(MY_WME)); // // number of DB files we have open currently, before add_index is executed @@ -4692,10 +4942,11 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { // char part[MAX_ALIAS_NAME + 10]; for (uint i = 0; i < num_of_keys; i++) { + int flags = (key_info[i].flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT; sprintf(part, "key-%s", key_info[i].name); make_name(newname, share->table_name, part); fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME); - error = create_sub_table(name_buff, NULL, DB_BTREE, DB_DUP + DB_DUPSORT); + error = create_sub_table(name_buff, NULL, DB_BTREE, flags); if (tokudb_debug & TOKUDB_DEBUG_OPEN) { TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error); } @@ -4779,16 +5030,26 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { for (uint i = 0; i < num_of_keys; i++) { - DBT secondary_key; + bool cluster_row_created = false; + DBT secondary_key, row; bool has_null = false; create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record, &has_null); uint curr_index = i + curr_num_DBs; u_int32_t put_flags = share->key_type[curr_index]; - if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) { - put_flags = DB_YESOVERWRITE; + put_flags = DB_YESOVERWRITE; + + if (key_info[i].flags & HA_CLUSTERING) { + if (!cluster_row_created) { + if ((error = pack_row(&row, (const uchar *) tmp_record, false))){ + goto cleanup; + } + cluster_row_created = true; + } + error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags); + } + else { + error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, ¤t_primary_key, put_flags); } - - error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, ¤t_primary_key, put_flags); if (error) { // // in the case of any error anywhere, we can just nuke all the files created, so we dont need diff --git a/storage/tokudb/ha_tokudb.h b/storage/tokudb/ha_tokudb.h index f603daef9db..5bc62199958 100644 --- a/storage/tokudb/ha_tokudb.h +++ b/storage/tokudb/ha_tokudb.h @@ -186,8 +186,10 @@ private: uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH]; ulong max_row_length(const uchar * buf); - int pack_row(DBT * row, const uchar * record); + int pack_row(DBT * row, const uchar * record, bool strip_pk); + u_int32_t place_key_into_mysql_buff(uchar * record, uchar* data, uint index); void unpack_key(uchar * record, DBT const *key, uint index); + u_int32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length); DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH); DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH); DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, uchar inf_byte); @@ -327,6 +329,9 @@ public: bool primary_key_is_clustered() { return true; } + bool supports_clustered_keys() { + return true; + } int cmp_ref(const uchar * ref1, const uchar * ref2); bool check_if_incompatible_data(HA_CREATE_INFO * info, uint table_changes); @@ -341,7 +346,7 @@ public: void read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key); void read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key); int read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key); - void unpack_row(uchar * record, DBT const *row, DBT const *key); + void unpack_row(uchar * record, DBT const *row, DBT const *key, bool pk_stripped); int heavi_ret_val;