#if TOKU_INCLUDE_UPSERT // Point updates and upserts // Restrictions: // No triggers // Statement or mixed replication // Does not support row based replication // Primary key must be defined // Simple and compound primary key // Int, char and varchar primary key types // No updates on fields that are part of any key // No clustering keys // Integer and char field updates // Update expressions: // x = constant // x = x+constant // x = x-constant // x = if(x=0,0,x-1) // Session variable enables fast updates and fast upserts // Session variable disables slow updates and slow upserts // Future features: // Support more primary key types // Force statement logging for fast updates // Support clustering keys using broadcast updates // Support primary key ranges using multicast messages // Support more complicated update expressions // Replace field_offset // Debug function to dump an Item static void dump_item(Item *item) { fprintf(stderr, "%u", item->type()); switch (item->type()) { case Item::FUNC_ITEM: { Item_func *func = static_cast(item); uint n = func->argument_count(); Item **arguments = func->arguments(); fprintf(stderr, ":func=%u,%s,%u(", func->functype(), func->func_name(), n); for (uint i = 0; i < n ; i++) { dump_item(arguments[i]); if (i < n-1) fprintf(stderr,","); } fprintf(stderr, ")"); break; } case Item::INT_ITEM: { Item_int *int_item = static_cast(item); fprintf(stderr, ":int=%lld", int_item->val_int()); break; } case Item::STRING_ITEM: { Item_string *str_item = static_cast(item); fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr()); break; } case Item::FIELD_ITEM: { Item_field *field_item = static_cast(item); fprintf(stderr, ":field=%s.%s.%s", field_item->db_name, field_item->table_name, field_item->field_name); break; } case Item::COND_ITEM: { Item_cond *cond_item = static_cast(item); fprintf(stderr, ":cond=%s(\n", cond_item->func_name()); List_iterator li(*cond_item->argument_list()); Item *list_item; while ((list_item = li++)) { dump_item(list_item); fprintf(stderr, "\n"); } fprintf(stderr, ")\n"); break; } default: break; } } // Debug function to dump an Item list static void dump_item_list(const char *h, List &l) { fprintf(stderr, "%s elements=%u\n", h, l.elements); List_iterator li(l); Item *item; while ((item = li++) != NULL) { dump_item(item); fprintf(stderr, "\n"); } } // Find a Field by its Item name static Field *find_field_by_name(TABLE *table, Item *item) { if (item->type() != Item::FIELD_ITEM) return NULL; Item_field *field_item = static_cast(item); #if 0 if (strcmp(table->s->db.str, field_item->db_name) != 0 || strcmp(table->s->table_name.str, field_item->table_name) != 0) return NULL; Field *found_field = NULL; for (uint i = 0; i < table->s->fields; i++) { Field *test_field = table->s->field[i]; if (strcmp(field_item->field_name, test_field->field_name) == 0) { found_field = test_field; break; } } return found_field; #else // item->field may be a shortcut instead of the above table lookup return field_item->field; #endif } // Return the starting offset in the value for a particular index (selected by idx) of a // particular field (selected by expand_field_num). // This only works for fixed length fields static uint32_t fixed_field_offset(uint32_t null_bytes, KEY_AND_COL_INFO *kc_info, uint idx, uint expand_field_num) { uint32_t offset = null_bytes; for (uint i = 0; i < expand_field_num; i++) { if (bitmap_is_set(&kc_info->key_filters[idx], i)) continue; offset += kc_info->field_lengths[i]; } return offset; } static uint32_t var_field_index(TABLE *table, KEY_AND_COL_INFO *kc_info, uint idx, uint field_num) { assert(field_num < table->s->fields); uint v_index = 0; for (uint i = 0; i < table->s->fields; i++) { if (bitmap_is_set(&kc_info->key_filters[idx], i)) continue; if (kc_info->length_bytes[i]) { if (i == field_num) break; v_index++; } } return v_index; } static uint32_t blob_field_index(TABLE *table, KEY_AND_COL_INFO *kc_info, uint idx, uint field_num) { assert(field_num < table->s->fields); uint b_index; for (b_index = 0; b_index < kc_info->num_blobs; b_index++) { if (kc_info->blob_fields[b_index] == field_num) break; } assert(b_index < kc_info->num_blobs); return b_index; } // Determine if an update operation can be offloaded to the storage engine. // The update operation consists of a list of update expressions (fields[i] = values[i]), and a list // of where conditions (conds). The function returns 0 if the update is handled in the storage engine. // Otherwise, an error is returned. int ha_tokudb::fast_update(THD *thd, List &update_fields, List &update_values, Item *conds) { TOKUDB_DBUG_ENTER("ha_tokudb::fast_update"); int error = 0; unsigned line = 0; // debug if (tokudb_debug & TOKUDB_DEBUG_UPSERT) { dump_item_list("fields", update_fields); dump_item_list("values", update_values); if (conds) { fprintf(stderr, "conds\n"); dump_item(conds); fprintf(stderr, "\n"); } } if (update_fields.elements < 1 || update_fields.elements != update_values.elements) { error = ENOTSUP; // something is fishy with the parameters line = __LINE__; goto return_error; } if (!check_fast_update(thd, update_fields, update_values, conds)) { error = ENOTSUP; line = __LINE__; goto check_error; } error = send_update_message(update_fields, update_values, conds, transaction); if (error != 0) { line = __LINE__; goto check_error; } check_error: line = line; // debug if (error != 0) { if (get_disable_slow_update(thd)) error = HA_ERR_UNSUPPORTED; if (error != ENOTSUP) print_error(error, MYF(0)); } return_error: TOKUDB_DBUG_RETURN(error); } // Return true if an expression is a simple int expression or a simple function of +- int expression. static bool check_int_result(Item *item) { Item::Type t = item->type(); if (t == Item::INT_ITEM) return true; else if (t == Item::FUNC_ITEM) { Item_func *item_func = static_cast(item); if (strcmp(item_func->func_name(), "+") != 0 && strcmp(item_func->func_name(), "-") != 0) return false; if (item_func->argument_count() != 1) return false; Item **arguments = item_func->arguments(); if (arguments[0]->type() != Item::INT_ITEM) return false; return true; } else return false; } // Return true if an expression looks like field_name op constant. static bool check_x_op_constant(const char *field_name, Item *item, const char *op, Item **item_constant) { if (item->type() != Item::FUNC_ITEM) return false; Item_func *item_func = static_cast(item); if (strcmp(item_func->func_name(), op) != 0) return false; Item **arguments = item_func->arguments(); uint n = item_func->argument_count(); if (n != 2) return false; if (arguments[0]->type() != Item::FIELD_ITEM) return false; Item_field *arg0 = static_cast(arguments[0]); if (strcmp(field_name, arg0->field_name) != 0) return false; if (!check_int_result(arguments[1])) return false; *item_constant = arguments[1]; return true; } // Return true if an expression looks like field_name = constant static bool check_x_equal_0(const char *field_name, Item *item) { Item *item_constant; if (!check_x_op_constant(field_name, item, "=", &item_constant)) return false; if (item_constant->val_int() != 0) return false; return true; } // Return true if an expression looks like fieldname - 1 static bool check_x_minus_1(const char *field_name, Item *item) { Item *item_constant; if (!check_x_op_constant(field_name, item, "-", &item_constant)) return false; if (item_constant->val_int() != 1) return false; return true; } // Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and // the field named by fieldname is an unsigned int. static bool check_decr_floor_expression(Field *lhs_field, Item *item) { if (item->type() != Item::FUNC_ITEM) return false; Item_func *item_func = static_cast(item); if (strcmp(item_func->func_name(), "if") != 0) return false; Item **arguments = item_func->arguments(); uint n = item_func->argument_count(); if (n != 3) return false; if (!check_x_equal_0(lhs_field->field_name, arguments[0])) return false; if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0) return false; if (!check_x_minus_1(lhs_field->field_name, arguments[2])) return false; if (!(lhs_field->flags & UNSIGNED_FLAG)) return false; return true; } // Check if lhs = rhs expression is simple. Return true if it is. static bool check_update_expression(Item *lhs_item, Item *rhs_item, TABLE *table) { Field *lhs_field = find_field_by_name(table, lhs_item); if (lhs_field == NULL) return false; if (!lhs_field->part_of_key.is_clear_all()) return false; enum_field_types lhs_type = lhs_field->type(); Item::Type rhs_type = rhs_item->type(); switch (lhs_type) { case MYSQL_TYPE_TINY: case MYSQL_TYPE_SHORT: case MYSQL_TYPE_INT24: case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONGLONG: if (check_int_result(rhs_item)) return true; Item *item_constant; if (check_x_op_constant(lhs_field->field_name, rhs_item, "+", &item_constant)) return true; if (check_x_op_constant(lhs_field->field_name, rhs_item, "-", &item_constant)) return true; if (check_decr_floor_expression(lhs_field, rhs_item)) return true; break; case MYSQL_TYPE_STRING: if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM) return true; break; case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_BLOB: if (rhs_type == Item::STRING_ITEM) return true; break; default: break; } return false; } // Check that all update expressions are simple. Return true if they are. static bool check_all_update_expressions(List &fields, List &values, TABLE *table) { List_iterator lhs_i(fields); List_iterator rhs_i(values); while (1) { Item *lhs_item = lhs_i++; if (lhs_item == NULL) break; Item *rhs_item = rhs_i++; if (rhs_item == NULL) assert(0); // can not happen if (!check_update_expression(lhs_item, rhs_item, table)) return false; } return true; } static bool full_field_in_key(TABLE *table, Field *field) { assert(table->s->primary_key < table->s->keys); KEY *key = &table->s->key_info[table->s->primary_key]; for (uint i = 0; i < get_key_parts(key); i++) { KEY_PART_INFO *key_part = &key->key_part[i]; if (strcmp(field->field_name, key_part->field->field_name) == 0) { return key_part->length == field->field_length; } } return false; } // Check that an expression looks like fieldname = constant, fieldname is part of the // primary key, and the named field is an int, char or varchar type. Return true if it does. static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP *pk_fields) { if (item->type() != Item::FUNC_ITEM) return false; Item_func *func = static_cast(item); if (strcmp(func->func_name(), "=") != 0) return false; uint n = func->argument_count(); if (n != 2) return false; Item **arguments = func->arguments(); Field *field = find_field_by_name(table, arguments[0]); if (field == NULL) return false; if (!bitmap_test_and_clear(pk_fields, field->field_index)) return false; switch (field->type()) { case MYSQL_TYPE_TINY: case MYSQL_TYPE_SHORT: case MYSQL_TYPE_INT24: case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONGLONG: return arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM; case MYSQL_TYPE_STRING: case MYSQL_TYPE_VARCHAR: return full_field_in_key(table, field) && (arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM); default: return false; } } // Check that the where condition covers all of the primary key components with fieldname = constant // expressions. Return true if it does. static bool check_point_update(Item *conds, TABLE *table) { bool result = false; if (conds == NULL) return false; // no where condition on the update if (table->s->primary_key >= table->s->keys) return false; // no primary key defined // use a bitmap of the primary key fields to keep track of those fields that are covered // by the where conditions MY_BITMAP pk_fields; if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure return false; KEY *key = &table->s->key_info[table->s->primary_key]; for (uint i = 0; i < get_key_parts(key); i++) bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index); switch (conds->type()) { case Item::FUNC_ITEM: result = check_pk_field_equal_constant(conds, table, &pk_fields); break; case Item::COND_ITEM: { Item_cond *cond_item = static_cast(conds); if (strcmp(cond_item->func_name(), "and") != 0) break; List_iterator li(*cond_item->argument_list()); Item *list_item; result = true; while (result == true && (list_item = li++)) { result = check_pk_field_equal_constant(list_item, table, &pk_fields); } break; } default: break; } if (!bitmap_is_clear_all(&pk_fields)) result = false; bitmap_free(&pk_fields); return result; } // Return true if there are any clustering keys (except the primary). // Precompute this when the table is opened. static bool clustering_keys_exist(TABLE *table) { for (uint keynr = 0; keynr < table->s->keys; keynr++) { if (keynr != table->s->primary_key && (table->s->key_info[keynr].flags & HA_CLUSTERING)) return true; } return false; } static bool is_strict_mode(THD *thd) { #if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699 return thd->is_strict_mode(); #else return test(thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES)); #endif } #if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699 #include #elif 50500 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50599 #include #endif // Check if an update operation can be handled by this storage engine. Return true if it can. bool ha_tokudb::check_fast_update(THD *thd, List &fields, List &values, Item *conds) { if (!transaction) return false; // avoid strict mode arithmetic overflow issues if (is_strict_mode(thd)) return false; // no triggers if (table->triggers) return false; // no binlog if (mysql_bin_log.is_open() && !(thd->variables.binlog_format == BINLOG_FORMAT_STMT || thd->variables.binlog_format == BINLOG_FORMAT_MIXED)) return false; // no clustering keys (need to broadcast an increment into the clustering keys since we are selecting with the primary key) if (clustering_keys_exist(table)) return false; if (!check_all_update_expressions(fields, values, table)) return false; if (!check_point_update(conds, table)) return false; return true; } static void marshall_varchar_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO *kc_info, uint key_num) { b.append_ui('v'); b.append_ui(table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size); uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets; b.append_ui(var_offset_bytes); b.append_ui(var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes); } static void marshall_blobs_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO *kc_info) { b.append_ui('b'); uint32_t n = kc_info->num_blobs; b.append_ui(n); for (uint i = 0; i < n; i++) { uint blob_field_index = kc_info->blob_fields[i]; assert(blob_field_index < table->s->fields); uint8_t blob_field_length = table->s->field[blob_field_index]->row_pack_length(); b.append(&blob_field_length, sizeof blob_field_length); } } static inline uint32_t get_null_bit_position(uint32_t null_bit); // Marshall update operatins to a buffer. static void marshall_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_item, TABLE *table, TOKUDB_SHARE *share) { // figure out the update operation type (again) Field *lhs_field = find_field_by_name(table, lhs_item); assert(lhs_field); // we found it before, so this should work // compute the update info uint32_t field_type; uint32_t field_null_num = 0; if (lhs_field->real_maybe_null()) { uint32_t field_num = lhs_field->field_index; field_null_num = ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1; } uint32_t offset; void *v_ptr = NULL; uint32_t v_length; uint32_t update_operation; longlong v_ll; String v_str; switch (lhs_field->type()) { case MYSQL_TYPE_TINY: case MYSQL_TYPE_SHORT: case MYSQL_TYPE_INT24: case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONGLONG: { Field_num *lhs_num = static_cast(lhs_field); field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT; offset = fixed_field_offset(table->s->null_bytes, &share->kc_info, table->s->primary_key, lhs_field->field_index); switch (rhs_item->type()) { case Item::INT_ITEM: { update_operation = '='; v_ll = rhs_item->val_int(); v_length = lhs_field->pack_length(); v_ptr = &v_ll; break; } case Item::FUNC_ITEM: { Item_func *rhs_func = static_cast(rhs_item); Item **arguments = rhs_func->arguments(); if (strcmp(rhs_func->func_name(), "if") == 0) { update_operation = '-'; // we only support one if function for now, and it is a descrement with floor. v_ll = 1; } else if (rhs_func->argument_count() == 1) { update_operation = '='; v_ll = rhs_func->val_int(); } else { update_operation = rhs_func->func_name()[0]; v_ll = arguments[1]->val_int(); } v_length = lhs_field->pack_length(); v_ptr = &v_ll; break; } default: assert(0); } break; } case MYSQL_TYPE_STRING: { update_operation = '='; field_type = lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR; offset = fixed_field_offset(table->s->null_bytes, &share->kc_info, table->s->primary_key, lhs_field->field_index); v_str = *rhs_item->val_str(&v_str); v_length = v_str.length(); if (v_length >= lhs_field->pack_length()) { v_length = lhs_field->pack_length(); v_str.length(v_length); // truncate } else { v_length = lhs_field->pack_length(); uchar pad_char = lhs_field->binary() ? 0 : lhs_field->charset()->pad_char; v_str.fill(lhs_field->pack_length(), pad_char); // pad } v_ptr = v_str.c_ptr(); break; } case MYSQL_TYPE_VARCHAR: { update_operation = '='; field_type = lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR; offset = var_field_index(table, &share->kc_info, table->s->primary_key, lhs_field->field_index); v_str = *rhs_item->val_str(&v_str); v_length = v_str.length(); if (v_length >= lhs_field->row_pack_length()) { v_length = lhs_field->row_pack_length(); v_str.length(v_length); // truncate } v_ptr = v_str.c_ptr(); break; } case MYSQL_TYPE_BLOB: { update_operation = '='; field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT; offset = blob_field_index(table, &share->kc_info, table->s->primary_key, lhs_field->field_index); v_str = *rhs_item->val_str(&v_str); v_length = v_str.length(); if (v_length >= lhs_field->max_data_length()) { v_length = lhs_field->max_data_length(); v_str.length(v_length); // truncate } v_ptr = v_str.c_ptr(); break; } default: assert(0); } // marshall the update fields into the buffer b.append_ui(update_operation); b.append_ui(field_type); b.append_ui(field_null_num); b.append_ui(offset); b.append_ui(v_length); b.append(v_ptr, v_length); } // Save an item's value into the appropriate field. Return 0 if successful. static int save_in_field(Item *item, TABLE *table) { assert(item->type() == Item::FUNC_ITEM); Item_func *func = static_cast(item); assert(strcmp(func->func_name(), "=") == 0); uint n = func->argument_count(); assert(n == 2); Item **arguments = func->arguments(); assert(arguments[0]->type() == Item::FIELD_ITEM); Item_field *field_item = static_cast(arguments[0]); my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); int error = arguments[1]->save_in_field(field_item->field, 0); dbug_tmp_restore_column_map(table->write_set, old_map); return error; } static void count_update_types(Field *lhs_field, uint *num_varchars, uint *num_blobs) { switch (lhs_field->type()) { case MYSQL_TYPE_VARCHAR: *num_varchars += 1; break; case MYSQL_TYPE_BLOB: *num_blobs += 1; break; default: break; } } // Generate an update message for an update operation and send it into the primary tree. Return 0 if successful. int ha_tokudb::send_update_message(List &update_fields, List &update_values, Item *conds, DB_TXN *txn) { int error; // Save the primary key from the where conditions Item::Type t = conds->type(); if (t == Item::FUNC_ITEM) { error = save_in_field(conds, table); } else if (t == Item::COND_ITEM) { Item_cond *cond_item = static_cast(conds); List_iterator li(*cond_item->argument_list()); Item *list_item; error = 0; while (error == 0 && (list_item = li++)) { error = save_in_field(list_item, table); } } else assert(0); if (error) return error; // put the primary key into key_buff and wrap it with key_dbt DBT key_dbt; bool has_null; create_dbt_key_from_table(&key_dbt, primary_key, key_buff, table->record[0], &has_null); // construct the update message tokudb::buffer update_message; uint8_t op = UPDATE_OP_UPDATE_2; update_message.append(&op, sizeof op); uint32_t num_updates = update_fields.elements; uint num_varchars = 0, num_blobs = 0; if (1) { List_iterator lhs_i(update_fields); Item *lhs_item; while ((lhs_item = lhs_i++)) { if (lhs_item == NULL) break; Field *lhs_field = find_field_by_name(table, lhs_item); assert(lhs_field); // we found it before, so this should work count_update_types(lhs_field, &num_varchars, &num_blobs); } if (num_varchars > 0 || num_blobs > 0) num_updates++; if (num_blobs > 0) num_updates++; } // append the updates update_message.append_ui(num_updates); if (num_varchars > 0 || num_blobs > 0) marshall_varchar_descriptor(update_message, table, &share->kc_info, table->s->primary_key); if (num_blobs > 0) marshall_blobs_descriptor(update_message, table, &share->kc_info); List_iterator lhs_i(update_fields); List_iterator rhs_i(update_values); while (error == 0) { Item *lhs_item = lhs_i++; if (lhs_item == NULL) break; Item *rhs_item = rhs_i++; if (rhs_item == NULL) assert(0); // can not happen marshall_update(update_message, lhs_item, rhs_item, table, share); } rw_rdlock(&share->num_DBs_lock); if (share->num_DBs > table->s->keys + test(hidden_primary_key)) { // hot index in progress error = ENOTSUP; // run on the slow path } else { // send the update message DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt); update_dbt.data = update_message.data(); update_dbt.size = update_message.size(); error = share->key_file[primary_key]->update(share->key_file[primary_key], txn, &key_dbt, &update_dbt, 0); } rw_unlock(&share->num_DBs_lock); return error; } // Determine if an upsert operation can be offloaded to the storage engine. // An upsert consists of a row and a list of update expressions (update_fields[i] = update_values[i]). // The function returns 0 if the upsert is handled in the storage engine. Otherwise, an error code is returned. int ha_tokudb::upsert(THD *thd, List &update_fields, List &update_values) { TOKUDB_DBUG_ENTER("ha_tokudb::upsert"); int error = 0; unsigned line = 0; // debug if (tokudb_debug & TOKUDB_DEBUG_UPSERT) { fprintf(stderr, "upsert\n"); dump_item_list("update_fields", update_fields); dump_item_list("update_values", update_values); } if (update_fields.elements < 1 || update_fields.elements != update_values.elements) { error = ENOTSUP; // not an upsert or something is fishy with the parameters line = __LINE__; goto return_error; } if (!check_upsert(thd, update_fields, update_values)) { error = ENOTSUP; line = __LINE__; goto check_error; } error = send_upsert_message(thd, update_fields, update_values, transaction); if (error != 0) { line = __LINE__; goto check_error; } check_error: line = line; // debug if (error != 0) { if (get_disable_slow_upsert(thd)) error = HA_ERR_UNSUPPORTED; if (error != ENOTSUP) print_error(error, MYF(0)); } return_error: TOKUDB_DBUG_RETURN(error); } // Check if an upsert can be handled by this storage engine. Return trus if it can. bool ha_tokudb::check_upsert(THD *thd, List &update_fields, List &update_values) { if (!transaction) return false; // avoid strict mode arithmetic overflow issues if (is_strict_mode(thd)) return false; // no triggers if (table->triggers) return false; // primary key must exist if (table->s->primary_key >= table->s->keys) return false; // no secondary keys if (table->s->keys > 1) return false; // no binlog if (mysql_bin_log.is_open() && !(thd->variables.binlog_format == BINLOG_FORMAT_STMT || thd->variables.binlog_format == BINLOG_FORMAT_MIXED)) return false; if (!check_all_update_expressions(update_fields, update_values, table)) return false; return true; } // Generate an upsert message and send it into the primary tree. Return 0 if successful. int ha_tokudb::send_upsert_message(THD *thd, List &update_fields, List &update_values, DB_TXN *txn) { int error = 0; // generate primary key DBT key_dbt; bool has_null; create_dbt_key_from_table(&key_dbt, primary_key, primary_key_buff, table->record[0], &has_null); // generate packed row DBT row; error = pack_row(&row, (const uchar *) table->record[0], primary_key); if (error) return error; tokudb::buffer update_message; // append the operation uint8_t op = UPDATE_OP_UPSERT_2; update_message.append(&op, sizeof op); // append the row update_message.append_ui(row.size); update_message.append(row.data, row.size); uint32_t num_updates = update_fields.elements; uint num_varchars = 0, num_blobs = 0; if (1) { List_iterator lhs_i(update_fields); Item *lhs_item; while ((lhs_item = lhs_i++)) { if (lhs_item == NULL) break; Field *lhs_field = find_field_by_name(table, lhs_item); assert(lhs_field); // we found it before, so this should work count_update_types(lhs_field, &num_varchars, &num_blobs); } if (num_varchars > 0 || num_blobs > 0) num_updates++; if (num_blobs > 0) num_updates++; } // append the updates update_message.append_ui(num_updates); if (num_varchars > 0 || num_blobs > 0) marshall_varchar_descriptor(update_message, table, &share->kc_info, table->s->primary_key); if (num_blobs > 0) marshall_blobs_descriptor(update_message, table, &share->kc_info); List_iterator lhs_i(update_fields); List_iterator rhs_i(update_values); while (1) { Item *lhs_item = lhs_i++; if (lhs_item == NULL) break; Item *rhs_item = rhs_i++; if (rhs_item == NULL) assert(0); // can not happen marshall_update(update_message, lhs_item, rhs_item, table, share); } rw_rdlock(&share->num_DBs_lock); if (share->num_DBs > table->s->keys + test(hidden_primary_key)) { // hot index in progress error = ENOTSUP; // run on the slow path } else { // send the upsert message DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt); update_dbt.data = update_message.data(); update_dbt.size = update_message.size(); error = share->key_file[primary_key]->update(share->key_file[primary_key], txn, &key_dbt, &update_dbt, 0); } rw_unlock(&share->num_DBs_lock); return error; } #endif