References: lp:1299430 - initial support for tokudb replication in master-slave model

This commit is contained in:
Seppo Jaakola 2014-03-29 13:34:50 +02:00
parent 8fb80a58bf
commit f4defb0b7b
3 changed files with 304 additions and 1 deletions

View file

@ -71,7 +71,8 @@ void wsrep_register_hton(THD* thd, bool all)
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next())
{
if (i->ht()->db_type == DB_TYPE_INNODB)
if ((i->ht()->db_type == DB_TYPE_INNODB) ||
(i->ht()->db_type == DB_TYPE_TOKUDB))
{
trans_register_ha(thd, all, wsrep_hton);

View file

@ -138,6 +138,38 @@ static const char *ha_tokudb_exts[] = {
NullS
};
#ifdef WITH_WSREP
#include <wsrep_mysqld.h>
#include <my_md5.h>
#include <openssl/md5.h>
extern my_bool wsrep_certify_nonPK;
class binlog_trx_data;
extern handlerton *binlog_hton;
extern "C" int thd_binlog_format(const MYSQL_THD thd);
extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
size_t cache_key_len,
const uchar* row_id,
size_t row_id_len,
wsrep_buf_t* key,
size_t* key_len);
extern handlerton * wsrep_hton;
static inline wsrep_ws_handle_t*
//wsrep_ws_handle_t*
//ha_tokudb::wsrep_ws_handle_t*
wsrep_ws_handle(THD* thd) {
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
DB_TXN* txn = (trx->all) ? trx->all : trx->stmt;
assert(txn);
WSREP_DEBUG("txn->id: %lu", txn->id64(txn));
return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd),
(wsrep_trx_id_t)txn->id64(txn));
}
#endif /* WITH_WSREP */
//
// This offset is calculated starting from AFTER the NULL bytes
//
@ -4170,6 +4202,19 @@ int ha_tokudb::write_row(uchar * record) {
added_rows++;
trx->stmt_progress.inserted++;
track_progress(thd);
#ifdef WITH_WSREP
if (wsrep_thd_exec_mode(thd) == LOCAL_STATE &&
WSREP(thd) && !wsrep_consistency_check(thd) &&
(thd_sql_command(thd) != SQLCOM_LOAD ||
thd_binlog_format(thd) == BINLOG_FORMAT_ROW)) {
if (wsrep_append_keys(thd, false, record, NULL)) {
DBUG_PRINT("wsrep", ("row key failed"));
error = HA_ERR_INTERNAL_ERROR;
}
}
#endif
}
cleanup:
if (!num_DBs_locked_in_bulk) {
@ -8400,3 +8445,256 @@ namespace tokudb {
template size_t vlq_encode_ui(uint64_t n, void *p, size_t s);
template size_t vlq_decode_ui(uint64_t *np, void *p, size_t s);
};
#ifdef WITH_WSREP
static
int
wsrep_calc_row_hash(
/*================*/
uchar* digest, /*!< in/out: md5 sum */
const uchar* row, /*!< in: row in MySQL format */
TABLE* table, /*!< in: table in MySQL data
dictionary */
THD* thd) /*!< in: user thread */
{
Field* field;
enum_field_types field_mysql_type;
uint n_fields;
unsigned long int len;
const uchar* ptr;
unsigned long int col_type;
uint i;
*digest = rand();
#ifdef REMOVED
MD5_CTX ctx;
MD5_Init (&ctx);
n_fields = table->s->fields;
for (i = 0; i < n_fields; i++) {
uchar null_byte=0;
uchar true_byte=1;
ptr = (const byte*) row + get_field_offset(table, field);
field = table->field[i];
ptr = (const uchar*) row + get_field_offset(table, field);
len = field->pack_length();
field_mysql_type = field->type();
col_type = table->cols[i].mtype;
switch (col_type) {
case DATA_BLOB:
ptr = row_mysql_read_blob_ref(&len, ptr, len);
break;
case DATA_VARCHAR:
case DATA_BINARY:
case DATA_VARMYSQL:
if (field_mysql_type == MYSQL_TYPE_VARCHAR) {
/* This is a >= 5.0.3 type true VARCHAR where
the real payload data length is stored in
1 or 2 bytes */
ptr = row_mysql_read_true_varchar(
&len, ptr,
(ulint)
(((Field_varstring*)field)->length_bytes));
}
break;
default:
;
}
/*
if (field->null_ptr &&
field_in_record_is_null(table, field, (char*) row)) {
*/
if( field->real_maybe_null() && field->is_null_in_record(row)) {
MD5_Update (&ctx, &null_byte, 1);
} else {
MD5_Update (&ctx, &true_byte, 1);
MD5_Update (&ctx, ptr, len);
}
}
MD5_Final (digest, &ctx);
#endif
return(0);
}
#endif /* WITH_WSREP */
#ifdef WITH_WSREP
int
wsrep_append_key(
/*==================*/
THD *thd,
TABLE_SHARE *table_share,
TABLE *table,
const char* key,
uint16_t key_len,
bool shared
)
{
DBUG_ENTER("wsrep_append_key");
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
bool const copy = true;
#ifdef WSREP_DEBUG_PRINT
fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ",
(shared) ? "Shared" : "Exclusive",
wsrep_thd_thread_id(thd), trx->id, key_len,
table_share->table_name.str);
for (int i=0; i<key_len; i++) {
fprintf(stderr, "%hhX, ", key[i]);
}
fprintf(stderr, "\n");
#endif
wsrep_buf_t wkey_part[3];
wsrep_key_t wkey = {wkey_part, 3};
if (!wsrep_prepare_key_for_innodb(
(const uchar*)table_share->table_cache_key.str,
table_share->table_cache_key.length,
(const uchar*)key, key_len,
wkey_part,
(size_t*)&wkey.key_parts_num)) {
WSREP_WARN("key prepare failed for: %s",
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void");
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
int rcode = (int)wsrep->append_key(
wsrep,
wsrep_ws_handle(thd),
&wkey,
1,
shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
copy);
if (rcode) {
DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
WSREP_WARN("Appending row key failed: %s, %d",
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void", rcode);
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
DBUG_RETURN(0);
}
int
ha_tokudb::wsrep_append_keys(
/*==================*/
THD *thd,
bool shared,
const uchar* record0, /* in: row in MySQL format */
const uchar* record1) /* in: row in MySQL format */
{
int rcode;
DBUG_ENTER("wsrep_append_keys");
bool key_appended = false;
#ifdef REMOVED
trx_t *trx = thd_to_trx(thd);
if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
wsrep_thd_thread_id(thd),
table_share->tmp_table,
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void");
DBUG_RETURN(0);
}
ut_a(table->s->keys <= 256);
uint i;
for (i=0; i<table->s->keys; ++i) {
uint len;
char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
char* key0 = &keyval0[1];
char* key1 = &keyval1[1];
KEY* key_info = table->key_info + i;
ibool is_null;
dict_index_t* idx = innobase_get_index(i);
dict_table_t* tab = (idx) ? idx->table : NULL;
keyval0[0] = (char)i;
keyval1[0] = (char)i;
if (!tab) {
WSREP_WARN("MySQL-InnoDB key mismatch %s %s",
table->s->table_name.str,
key_info->name);
}
if (key_info->flags & HA_NOSAME ||
((tab &&
dict_table_get_referenced_constraint(tab, idx)) ||
(!tab && referenced_by_foreign_key()))) {
if (key_info->flags & HA_NOSAME || shared)
key_appended = true;
len = wsrep_store_key_val_for_row(
table, i, key0, key_info->key_length,
record0, &is_null);
if (!is_null) {
rcode = wsrep_append_key(
thd, trx, table_share, table,
keyval0, len+1, shared);
if (rcode) DBUG_RETURN(rcode);
}
else
{
WSREP_DEBUG("NULL key skipped: %s",
wsrep_thd_query(thd));
}
if (record1) {
len = wsrep_store_key_val_for_row(
table, i, key1, key_info->key_length,
record1, &is_null);
if (!is_null && memcmp(key0, key1, len)) {
rcode = wsrep_append_key(
thd, trx, table_share,
table,
keyval1, len+1, shared);
if (rcode) DBUG_RETURN(rcode);
}
}
}
}
#endif
/* if no PK, calculate hash of full row, to be the key value */
if (!key_appended && wsrep_certify_nonPK) {
uchar digest[16];
int rcode;
wsrep_calc_row_hash(digest, record0, table, thd);
if ((rcode = wsrep_append_key(thd, table_share, table,
(const char*) digest, 16,
shared))) {
DBUG_RETURN(rcode);
}
if (record1) {
wsrep_calc_row_hash(
digest, record1, table, thd);
if ((rcode = wsrep_append_key(thd, table_share,
table,
(const char*) digest,
16, shared))) {
DBUG_RETURN(rcode);
}
}
DBUG_RETURN(0);
}
DBUG_RETURN(0);
}
#endif

View file

@ -788,6 +788,10 @@ private:
bool check_upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values);
int send_upsert_message(THD *thd, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn);
#endif
#ifdef WITH_WSREP
int wsrep_append_keys(THD *thd, bool shared,
const uchar* record0, const uchar* record1);
#endif
};
static inline bool key_is_clustering(const KEY *key) {