diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc index 857a99ca36f..ea22e0d5da5 100644 --- a/storage/tokudb/ha_tokudb.cc +++ b/storage/tokudb/ha_tokudb.cc @@ -962,122 +962,126 @@ cleanup: return error; } -int generate_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra) { +int generate_row_for_put( + DB *dest_db, + DB *src_db, + DBT *dest_key, + DBT *dest_val, + const DBT *src_key, + const DBT *src_val, + void *extra + ) +{ int error; - DBT pk_key, pk_val; - ROW_BUFFERS row_buffs = NULL; - bzero(&pk_key, sizeof(pk_key)); - bzero(&pk_val, sizeof(pk_val)); - - pk_key.size = *(u_int32_t *)row->data; - pk_key.data = (uchar *)row->data + sizeof(u_int32_t); - pk_val.size = row->size - pk_key.size - sizeof(u_int32_t); - pk_val.data = (uchar *)pk_key.data + pk_key.size; + DB* curr_db = dest_db; + uchar* row_desc = NULL; + u_int32_t desc_size; + uchar* buff = NULL; + u_int32_t max_key_len = 0; - row_buffs = (ROW_BUFFERS)extra; - for ( u_int32_t i = 0; i < num_dbs; i++) { - DB* curr_db = dbs[i]; - uchar* row_desc = NULL; - u_int32_t desc_size; + row_desc = (uchar *)curr_db->descriptor->data; + row_desc += (*(u_int32_t *)row_desc); + desc_size = (*(u_int32_t *)row_desc) - 4; + row_desc += 4; + + if (is_key_pk(row_desc, desc_size)) { + assert(dest_key->flags != DB_DBT_USERMEM); + assert(dest_val->flags != DB_DBT_USERMEM); + if (dest_key->flags == DB_DBT_REALLOC && dest_key->data != NULL) { + free(dest_key->data); + } + if (dest_val->flags == DB_DBT_REALLOC && dest_val->data != NULL) { + free(dest_val->data); + } + dest_key->data = src_key->data; + dest_key->size = src_key->size; + dest_key->flags = DB_DBT_TEMPMEMORY; + dest_val->data = src_val->data; + dest_val->size = src_val->size; + dest_val->flags = DB_DBT_TEMPMEMORY; + error = 0; + goto cleanup; + } + if (dest_key->flags == DB_DBT_USERMEM) { + buff = (uchar *)dest_key->data; + } + else if (dest_key->flags == DB_DBT_REALLOC) { + max_key_len = max_key_size_from_desc(row_desc, desc_size); + max_key_len += src_key->size; - row_desc = (uchar *)curr_db->descriptor->data; - row_desc += (*(u_int32_t *)row_desc); - desc_size = (*(u_int32_t *)row_desc) - 4; - row_desc += 4; - - if (is_key_pk(row_desc, desc_size)) { - keys[i].data = pk_key.data; - keys[i].size = pk_key.size; - vals[i].data = pk_val.data; - vals[i].size = pk_val.size; - continue; + if (max_key_len > dest_key->ulen) { + void* old_ptr = dest_key->data; + void* new_ptr = NULL; + new_ptr = realloc(old_ptr, max_key_len); + assert(new_ptr); + dest_key->data = new_ptr; + dest_key->ulen = max_key_len; + } + + buff = (uchar *)dest_key->data; + assert(buff != NULL && max_key_len > 0); + } + else { + assert(false); + } + + dest_key->size = pack_key_from_desc( + buff, + row_desc, + desc_size, + src_key, + src_val + ); + assert(dest_key->ulen >= dest_key->size); + if (tokudb_debug & TOKUDB_DEBUG_CHECK_KEY && !max_key_len) { + max_key_len = max_key_size_from_desc(row_desc, desc_size); + max_key_len += src_key->size; + } + if (max_key_len) { + assert(max_key_len >= dest_key->size); + } + + row_desc += desc_size; + desc_size = (*(u_int32_t *)row_desc) - 4; + row_desc += 4; + if (!is_key_clustering(row_desc, desc_size)) { + dest_val->size = 0; + } + else { + uchar* buff = NULL; + if (dest_val->flags == DB_DBT_USERMEM) { + buff = (uchar *)dest_val->data; + } + else if (dest_val->flags == DB_DBT_REALLOC){ + if (dest_val->ulen < src_val->size) { + void* old_ptr = dest_val->data; + void* new_ptr = NULL; + new_ptr = realloc(old_ptr, src_val->size); + assert(new_ptr); + dest_val->data = new_ptr; + dest_val->ulen = src_val->size; + } + buff = (uchar *)dest_val->data; + assert(buff != NULL); } else { - uchar* buff = NULL; - u_int32_t max_key_len = 0; - if (row_buffs != NULL) { - buff = row_buffs->key_buff[i]; - } - else { - max_key_len = max_key_size_from_desc(row_desc, desc_size); - max_key_len += pk_key.size; - buff = (uchar *)my_malloc(max_key_len, MYF(MY_WME)); - assert(buff != NULL && max_key_len > 0); - } - keys[i].size = pack_key_from_desc( - buff, - row_desc, - desc_size, - &pk_key, - &pk_val - ); - if (tokudb_debug & TOKUDB_DEBUG_CHECK_KEY && !max_key_len) { - max_key_len = max_key_size_from_desc(row_desc, desc_size); - max_key_len += pk_key.size; - } - if (max_key_len) { - assert(max_key_len >= keys[i].size); - } - keys[i].data = buff; - } - row_desc += desc_size; - desc_size = (*(u_int32_t *)row_desc) - 4; - row_desc += 4; - if (!is_key_clustering(row_desc, desc_size)) { - bzero(&vals[i], sizeof(DBT)); - } - else { - uchar* buff = NULL; - if (row_buffs != NULL) { - buff = row_buffs->rec_buff[i]; - } - else { - buff = (uchar *)my_malloc(pk_val.size, MYF(MY_WME)); - assert(buff != NULL); - } - vals[i].size = pack_clustering_val_from_desc( - buff, - row_desc, - desc_size, - &pk_val - ); - vals[i].data = buff; + assert(false); } + dest_val->size = pack_clustering_val_from_desc( + buff, + row_desc, + desc_size, + src_val + ); + assert(dest_val->ulen >= dest_val->size); } error = 0; - +cleanup: return error; } -int cleanup_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra) { - if (extra == NULL) { - // - // handle allocation of buffers in recovery case later - // - for (u_int32_t i = 0; i < num_dbs; i++) { - DB* curr_db = dbs[i]; - uchar* row_desc = NULL; - u_int32_t desc_size; - - row_desc = (uchar *)curr_db->descriptor->data; - row_desc += (*(u_int32_t *)row_desc); - desc_size = (*(u_int32_t *)row_desc) - 4; - row_desc += 4; - - if (is_key_pk(row_desc, desc_size)) { - continue; - } - else { - my_free(keys[i].data, MYF(MY_ALLOW_ZERO_PTR)); - my_free(vals[i].data, MYF(MY_ALLOW_ZERO_PTR)); - } - } - } - return 0; -} - ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, table_arg) // flags defined in sql\handler.h @@ -1107,6 +1111,8 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ai_metadata_update_required = false; bzero(mult_key_buff, sizeof(mult_key_buff)); bzero(mult_rec_buff, sizeof(mult_rec_buff)); + bzero(mult_key_dbt, sizeof(mult_key_dbt)); + bzero(mult_rec_dbt, sizeof(mult_rec_dbt)); } // @@ -1586,7 +1592,7 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { } alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields; - rec_buff = (uchar *) my_malloc(alloced_rec_buff_length + max_key_length + sizeof(u_int32_t), MYF(MY_WME)); + rec_buff = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME)); if (rec_buff == NULL) { ret_val = 1; @@ -1599,9 +1605,15 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { } mult_key_buff[i] = (uchar *)my_malloc(max_key_length, MYF(MY_WME)); assert(mult_key_buff[i] != NULL); + mult_key_dbt[i].ulen = max_key_length; + mult_key_dbt[i].flags = DB_DBT_USERMEM; + mult_key_dbt[i].data = mult_key_buff[i]; if (table_share->key_info[i].flags & HA_CLUSTERING) { mult_rec_buff[i] = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME)); assert(mult_rec_buff[i]); + mult_rec_dbt[i].ulen = alloced_rec_buff_length; + mult_rec_dbt[i].flags = DB_DBT_USERMEM; + mult_rec_dbt[i].data = mult_rec_buff[i]; } } alloced_mult_rec_buff_length = alloced_rec_buff_length; @@ -1889,7 +1901,7 @@ int ha_tokudb::__close(int mutex_is_locked) { bool ha_tokudb::fix_rec_buff_for_blob(ulong length) { if (!rec_buff || (length > alloced_rec_buff_length)) { uchar *newptr; - if (!(newptr = (uchar *) my_realloc((void *) rec_buff, length+max_key_length+sizeof(u_int32_t), MYF(MY_ALLOW_ZERO_PTR)))) + if (!(newptr = (uchar *) my_realloc((void *) rec_buff, length, MYF(MY_ALLOW_ZERO_PTR)))) return 1; rec_buff = newptr; alloced_rec_buff_length = length; @@ -1906,6 +1918,9 @@ void ha_tokudb::fix_mult_rec_buff() { assert(false); } mult_rec_buff[i] = newptr; + mult_rec_dbt[i].ulen = alloced_rec_buff_length; + mult_rec_dbt[i].flags = DB_DBT_USERMEM; + mult_rec_dbt[i].data = mult_rec_buff[i]; } } alloced_mult_rec_buff_length = alloced_rec_buff_length; @@ -1942,12 +1957,10 @@ ulong ha_tokudb::max_row_length(const uchar * buf) { int ha_tokudb::pack_row( DBT * row, - uchar* buf, const uchar* record, uint index ) { - uchar* dest_buf = NULL; uchar* fixed_field_ptr = NULL; uchar* var_field_offset_ptr = NULL; uchar* start_field_data_ptr = NULL; @@ -1957,18 +1970,17 @@ int ha_tokudb::pack_row( my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); - if ((buf == NULL) && table_share->blob_fields) { + if (table_share->blob_fields) { if (fix_rec_buff_for_blob(max_row_length(record))) { r = HA_ERR_OUT_OF_MEM; goto cleanup; } } - dest_buf = (buf == NULL) ? rec_buff : buf; /* Copy null bits */ - memcpy(dest_buf, record, table_share->null_bytes); - fixed_field_ptr = dest_buf + table_share->null_bytes; + memcpy(rec_buff, record, table_share->null_bytes); + fixed_field_ptr = rec_buff + table_share->null_bytes; var_field_offset_ptr = fixed_field_ptr + share->kc_info.mcp_info[index].var_len_offset; start_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets; var_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets; @@ -2011,8 +2023,8 @@ int ha_tokudb::pack_row( ); } - row->data = dest_buf; - row->size = (size_t) (var_field_data_ptr - dest_buf); + row->data = rec_buff; + row->size = (size_t) (var_field_data_ptr - rec_buff); r = 0; cleanup: @@ -2960,7 +2972,7 @@ int ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { // test key packing of clustering keys // if (table->key_info[keynr].flags & HA_CLUSTERING) { - error = pack_row(&row, mult_rec_buff[keynr], (const uchar *) record, keynr); + error = pack_row(&row, (const uchar *) record, keynr); if (error) { goto cleanup; } uchar* tmp_buff = NULL; tmp_buff = (uchar *)my_malloc(alloced_rec_buff_length,MYF(MY_WME)); @@ -2977,7 +2989,7 @@ int ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) { &tmp_pk_val ); assert(tmp_num_bytes == row.size); - cmp = memcmp(tmp_buff,mult_rec_buff[keynr],tmp_num_bytes); + cmp = memcmp(tmp_buff,rec_buff,tmp_num_bytes); assert(cmp == 0); my_free(tmp_buff,MYF(MY_ALLOW_ZERO_PTR)); } @@ -3054,7 +3066,7 @@ int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_v put_flags = DB_YESOVERWRITE; if (table->key_info[keynr].flags & HA_CLUSTERING) { - error = pack_row(&row, NULL, (const uchar *) record, keynr); + error = pack_row(&row, (const uchar *) record, keynr); if (error) { goto cleanup; } } else { @@ -3085,17 +3097,10 @@ cleanup: return error; } -int ha_tokudb::insert_rows_to_dictionaries_mult(uchar* row_buff, u_int32_t row_buff_size, DB_TXN* txn, THD* thd) { +int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd) { int error; - DBT row; - struct row_buffers row_buff_struct; bool is_replace_into; uint curr_num_DBs = table->s->keys + test(hidden_primary_key); - bzero(&row, sizeof(row)); - row.data = row_buff; - row.size = row_buff_size; - row_buff_struct.key_buff = mult_key_buff; - row_buff_struct.rec_buff = mult_rec_buff; is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) || (thd_sql_command(thd) == SQLCOM_REPLACE_SELECT); @@ -3105,7 +3110,21 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(uchar* row_buff, u_int32_t row_b else { share->mult_put_flags[primary_key] = DB_NOOVERWRITE; } - error = db_env->put_multiple(db_env, txn, &row, curr_num_DBs, share->key_file, share->mult_put_flags, &row_buff_struct); + + error = db_env->put_multiple( + db_env, + NULL, + txn, + pk_key, + pk_val, + curr_num_DBs, + share->key_file, + mult_key_dbt, + mult_rec_dbt, + share->mult_put_flags, + NULL + ); + // // We break if we hit an error, unless it is a dup key error // and MySQL told us to ignore duplicate key errors @@ -3184,12 +3203,8 @@ int ha_tokudb::write_row(uchar * record) { } } - create_dbt_key_from_table(&prim_key, primary_key, rec_buff + sizeof(u_int32_t), record, &has_null); - // - // copy len of pk at beginning of rec_buff - // - memcpy(rec_buff, &prim_key.size, sizeof(u_int32_t)); - if ((error = pack_row(&row, rec_buff + prim_key.size+sizeof(u_int32_t), (const uchar *) record, primary_key))){ + create_dbt_key_from_table(&prim_key, primary_key, primary_key_buff, record, &has_null); + if ((error = pack_row(&row, (const uchar *) record, primary_key))){ goto cleanup; } @@ -3220,7 +3235,7 @@ int ha_tokudb::write_row(uchar * record) { if (error) { goto cleanup; } } else { - error = insert_rows_to_dictionaries_mult(rec_buff, sizeof(u_int32_t) + prim_key.size + row.size, txn, thd); + error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd); if (error) { goto cleanup; } } @@ -3287,7 +3302,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons error = remove_key(trans, primary_key, old_row, old_key); if (error) { goto cleanup; } - error = pack_row(&row, NULL, new_row, primary_key); + error = pack_row(&row, new_row, primary_key); if (error) { goto cleanup; } error = share->file->put(share->file, trans, new_key, &row, put_flags); @@ -3298,7 +3313,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons } else { // Primary key didn't change; just update the row data - error = pack_row(&row, NULL, new_row, primary_key); + error = pack_row(&row, new_row, primary_key); if (error) { goto cleanup; } // @@ -3437,7 +3452,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { put_flags = DB_YESOVERWRITE; if (table->key_info[keynr].flags & HA_CLUSTERING) { - error = pack_row(&row, NULL, (const uchar *) new_row, keynr); + error = pack_row(&row, (const uchar *) new_row, keynr); if (error){ goto cleanup; } } else { @@ -6140,7 +6155,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { } if (key_info[i].flags & HA_CLUSTERING) { - if ((error = pack_row(&row, NULL, (const uchar *) tmp_record, curr_index))){ + if ((error = pack_row(&row, (const uchar *) tmp_record, curr_index))){ goto cleanup; } error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags); diff --git a/storage/tokudb/ha_tokudb.h b/storage/tokudb/ha_tokudb.h index 1bcd6a64402..c5dfa18691f 100644 --- a/storage/tokudb/ha_tokudb.h +++ b/storage/tokudb/ha_tokudb.h @@ -97,8 +97,15 @@ typedef enum { } TABLE_LOCK_TYPE; int create_tokudb_trx_data_instance(tokudb_trx_data** out_trx); -int generate_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra); -int cleanup_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra); +int generate_row_for_put( + DB *dest_db, + DB *src_db, + DBT *dest_key, + DBT *dest_val, + const DBT *src_key, + const DBT *src_val, + void *extra + ); class ha_tokudb : public handler { @@ -152,6 +159,9 @@ private: // uchar* mult_key_buff[MAX_KEY]; uchar* mult_rec_buff[MAX_KEY]; + DBT mult_key_dbt[MAX_KEY + 1]; + DBT mult_rec_dbt[MAX_KEY + 1]; + ulong alloced_mult_rec_buff_length; // @@ -241,7 +251,6 @@ private: ulong max_row_length(const uchar * buf); int pack_row( DBT * row, - uchar* buf, const uchar* record, uint index ); @@ -291,7 +300,7 @@ private: int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn); int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd); int insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn); - int insert_rows_to_dictionaries_mult(uchar* row, u_int32_t row_size, DB_TXN* txn, THD* thd); + int insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd); int test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val); diff --git a/storage/tokudb/hatoku_cmp.cc b/storage/tokudb/hatoku_cmp.cc index 66f9e42b0c4..5c50f3672c5 100755 --- a/storage/tokudb/hatoku_cmp.cc +++ b/storage/tokudb/hatoku_cmp.cc @@ -2044,7 +2044,7 @@ u_int32_t pack_clustering_val_from_desc( uchar* buf, void* row_desc, u_int32_t row_desc_size, - DBT* pk_val + const DBT* pk_val ) { uchar* null_bytes_src_ptr = NULL; @@ -2521,8 +2521,8 @@ u_int32_t pack_key_from_desc( uchar* buf, void* row_desc, u_int32_t row_desc_size, - DBT* pk_key, - DBT* pk_val + const DBT* pk_key, + const DBT* pk_val ) { MULTI_COL_PACK_INFO mcp_info; diff --git a/storage/tokudb/hatoku_cmp.h b/storage/tokudb/hatoku_cmp.h index c64b478c4dd..6c9729eda36 100755 --- a/storage/tokudb/hatoku_cmp.h +++ b/storage/tokudb/hatoku_cmp.h @@ -238,7 +238,7 @@ u_int32_t pack_clustering_val_from_desc( uchar* buf, void* row_desc, u_int32_t row_desc_size, - DBT* pk_val + const DBT* pk_val ); u_int32_t get_max_secondary_key_pack_desc_size( @@ -275,8 +275,8 @@ u_int32_t pack_key_from_desc( uchar* buf, void* row_desc, u_int32_t row_desc_size, - DBT* pk_key, - DBT* pk_val + const DBT* pk_key, + const DBT* pk_val ); diff --git a/storage/tokudb/hatoku_hton.cc b/storage/tokudb/hatoku_hton.cc index a780a056c48..c1239c157ed 100644 --- a/storage/tokudb/hatoku_hton.cc +++ b/storage/tokudb/hatoku_hton.cc @@ -273,7 +273,7 @@ static int tokudb_init_func(void *p) { if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env open:flags=%x\n", __FUNCTION__, tokudb_init_flags); - r = db_env->set_multiple_callbacks(db_env, generate_keys_vals_for_put, cleanup_keys_vals_for_put, NULL, NULL); + r = db_env->set_generate_row_callback_for_put(db_env,generate_row_for_put); assert(!r); r = db_env->open(db_env, tokudb_home, tokudb_init_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);