IB: 0.2 part I

* SYS_VTQ internal InnoDB table;
* I_S.INNODB_SYS_VTQ table;
* vers_notify_vtq(): add record to SYS_VTQ on versioned DML;
* SYS_VTQ columns filled: TRX_ID, BEGIN_TS.
This commit is contained in:
Aleksey Midenkov 2016-09-22 05:56:34 +00:00
parent bd0b21d22c
commit 84e1971128
20 changed files with 618 additions and 17 deletions

View file

@ -401,6 +401,7 @@ enum enum_alter_inplace_result {
#define HA_LEX_CREATE_TMP_TABLE 1U
#define HA_CREATE_TMP_ALTER 8U
#define HA_LEX_CREATE_SEQUENCE 16U
#define HA_VERSIONED_TABLE 32U
#define HA_MAX_REC_LENGTH 65535

View file

@ -4627,6 +4627,12 @@ extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
}
/* Returns high resolution timestamp for the start
of the current query. */
extern "C" time_t thd_start_time(const MYSQL_THD thd)
{
return thd->start_time;
}
/* Returns high resolution timestamp for the start
of the current query. */

View file

@ -1878,6 +1878,115 @@ dict_create_or_check_sys_virtual()
return(err);
}
UNIV_INTERN
dberr_t
dict_create_or_check_vtq_table(void)
/*================================================*/
{
trx_t* trx;
my_bool srv_file_per_table_backup;
dberr_t err;
dberr_t sys_vtq_err;
ut_a(srv_get_active_thread_type() == SRV_NONE);
/* Note: The master thread has not been started at this point. */
sys_vtq_err = dict_check_if_system_table_exists(
"SYS_VTQ", DICT_NUM_FIELDS__SYS_VTQ + 1, 3);
if (sys_vtq_err == DB_SUCCESS) {
mutex_enter(&dict_sys->mutex);
dict_sys->sys_vtq = dict_table_get_low("SYS_VTQ");
mutex_exit(&dict_sys->mutex);
return(DB_SUCCESS);
}
trx = trx_allocate_for_mysql();
trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
trx->op_info = "creating VTQ sys table";
row_mysql_lock_data_dictionary(trx);
/* Check which incomplete table definition to drop. */
if (sys_vtq_err == DB_CORRUPTION) {
ib::warn() <<
"Dropping incompletely created "
"SYS_VTQ table.";
row_drop_table_for_mysql("SYS_VTQ", trx, false, TRUE);
}
ib::warn() <<
"Creating VTQ system table.";
srv_file_per_table_backup = srv_file_per_table;
/* We always want SYSTEM tables to be created inside the system
tablespace. */
srv_file_per_table = 0;
err = que_eval_sql(
NULL,
"PROCEDURE CREATE_VTQ_SYS_TABLE_PROC () IS\n"
"BEGIN\n"
"CREATE TABLE\n"
"SYS_VTQ(TRX_ID BIGINT UNSIGNED, BEGIN_TS BIGINT UNSIGNED,"
" COMMIT_TS BIGINT UNSIGNED, CONCURR_TRX BLOB);\n"
"CREATE UNIQUE CLUSTERED INDEX TRX_ID_IND"
" ON SYS_VTQ (TRX_ID);\n"
"CREATE INDEX BEGIN_TS_IND"
" ON SYS_VTQ (BEGIN_TS);\n"
"CREATE INDEX COMMIT_TS_IND"
" ON SYS_VTQ (COMMIT_TS);\n"
"END;\n",
FALSE, trx);
if (err != DB_SUCCESS) {
ib::error() << "Creation of SYS_VTQ"
" failed: " << ut_strerr(err) << ". Tablespace is"
" full or too many transactions."
" Dropping incompletely created tables.";
ut_ad(err == DB_OUT_OF_FILE_SPACE
|| err == DB_TOO_MANY_CONCURRENT_TRXS);
row_drop_table_for_mysql("SYS_VTQ", trx, false, TRUE);
if (err == DB_OUT_OF_FILE_SPACE) {
err = DB_MUST_GET_MORE_FILE_SPACE;
}
}
trx_commit_for_mysql(trx);
row_mysql_unlock_data_dictionary(trx);
trx_free_for_mysql(trx);
srv_file_per_table = srv_file_per_table_backup;
if (err == DB_SUCCESS) {
ib::info() <<
"VTQ system table created";
}
/* Note: The master thread has not been started at this point. */
/* Confirm and move to the non-LRU part of the table LRU list. */
sys_vtq_err = dict_check_if_system_table_exists(
"SYS_VTQ", DICT_NUM_FIELDS__SYS_VTQ + 1, 3);
ut_a(sys_vtq_err == DB_SUCCESS);
mutex_enter(&dict_sys->mutex);
dict_sys->sys_vtq = dict_table_get_low("SYS_VTQ");
mutex_exit(&dict_sys->mutex);
return(err);
}
/****************************************************************//**
Evaluate the given foreign key SQL statement.
@return error code or DB_SUCCESS */

View file

@ -50,7 +50,7 @@ Created 4/24/1996 Heikki Tuuri
#include <set>
/** Following are the InnoDB system tables. The positions in
this array are referenced by enum dict_system_table_id. */
this array are referenced by enum dict_system_id_t. */
static const char* SYSTEM_TABLE_NAME[] = {
"SYS_TABLES",
"SYS_INDEXES",
@ -60,7 +60,8 @@ static const char* SYSTEM_TABLE_NAME[] = {
"SYS_FOREIGN_COLS",
"SYS_TABLESPACES",
"SYS_DATAFILES",
"SYS_VIRTUAL"
"SYS_VIRTUAL",
"SYS_VTQ"
};
/** Loads a table definition and also all its index definitions.
@ -821,6 +822,64 @@ err_len:
return(NULL);
}
/********************************************************************//**
This function parses a SYS_VTQ record, extracts necessary
information from the record and returns it to the caller.
@return error message, or NULL on success */
UNIV_INTERN
const char*
dict_process_sys_vtq(
/*=======================*/
mem_heap_t* heap, /*!< in/out: heap memory */
const rec_t* rec, /*!< in: current rec */
ullong* col_trx_id, /*!< out: field values */
ullong* col_begin_ts,
ullong* col_commit_ts,
ullong* col_concurr_trx)
{
ulint len;
const byte* field;
if (rec_get_deleted_flag(rec, 0)) {
return("delete-marked record in SYS_VTQ");
}
if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_VTQ) {
return("wrong number of columns in SYS_VTQ record");
}
field = rec_get_nth_field_old(
rec, DICT_FLD__SYS_VTQ__TRX_ID, &len);
if (len != sizeof(col_trx_id)) {
err_len:
return("incorrect column length in SYS_VTQ");
}
*col_trx_id = mach_read_from_8(field);
field = rec_get_nth_field_old(
rec, DICT_FLD__SYS_VTQ__BEGIN_TS, &len);
if (len != sizeof(col_begin_ts)) {
goto err_len;
}
*col_begin_ts = mach_read_from_8(field);
field = rec_get_nth_field_old(
rec, DICT_FLD__SYS_VTQ__COMMIT_TS, &len);
if (len != sizeof(col_commit_ts)) {
goto err_len;
}
*col_commit_ts = mach_read_from_8(field);
field = rec_get_nth_field_old(
rec, DICT_FLD__SYS_VTQ__CONCURR_TRX, &len);
if (len != sizeof(col_concurr_trx)) {
goto err_len;
}
*col_concurr_trx = mach_read_from_8(field);
return(NULL);
}
/** Get the first filepath from SYS_DATAFILES for a given space_id.
@param[in] space_id Tablespace ID
@return First filepath (caller must invoke ut_free() on it)
@ -3714,3 +3773,4 @@ dict_table_open_on_index_id(
}
return table;
}

View file

@ -1781,9 +1781,7 @@ thd_start_time_in_secs(
/*===================*/
THD* thd) /*!< in: thread handle, or NULL */
{
// FIXME: This function should be added to the server code.
//return(thd_start_time(thd));
return(ulint(ut_time()));
return(thd_start_time(thd));
}
/** Enter InnoDB engine after checking the max number of user threads
@ -12855,6 +12853,10 @@ index_bad:
DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
m_flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME;);
if (m_create_info->options & HA_VERSIONED_TABLE) {
m_flags2 |= DICT_TF2_VERSIONED;
}
DBUG_RETURN(true);
}
@ -22000,7 +22002,8 @@ i_s_innodb_sys_virtual,
i_s_innodb_mutexes,
i_s_innodb_sys_semaphore_waits,
i_s_innodb_tablespaces_encryption,
i_s_innodb_tablespaces_scrubbing
i_s_innodb_tablespaces_scrubbing,
i_s_innodb_vtq
maria_declare_plugin_end;
/** @brief Initialize the default value of innodb_commit_concurrency.

View file

@ -347,6 +347,54 @@ field_store_ulint(
# define I_S_AHI 0 /* Omit the IS_HASHED column */
#endif
/*******************************************************************//**
Auxiliary function to store ulint value in MYSQL_TYPE_LONGLONG field.
If the value is UINT64_UNDEFINED then the field it set to NULL.
@return 0 on success */
int
field_store_ullong(
/*==============*/
Field* field, /*!< in/out: target field for storage */
ullong n) /*!< in: value to store */
{
int ret;
if (n != UINT64_UNDEFINED) {
ret = field->store(n, 1);
field->set_notnull();
} else {
ret = 0; /* success */
field->set_null();
}
return(ret);
}
/*******************************************************************//**
Auxiliary function to store packed timestamp value in MYSQL_TYPE_DATETIME field.
If the value is ULINT_UNDEFINED then the field it set to NULL.
@return 0 on success */
int
field_store_packed_ts(
/*==============*/
Field* field, /*!< in/out: target field for storage */
ullong n) /*!< in: value to store */
{
int ret;
MYSQL_TIME tmp;
if (n != UINT64_UNDEFINED) {
unpack_time(n, &tmp);
ret = field->store_time(&tmp);
field->set_notnull();
} else {
ret = 0; /* success */
field->set_null();
}
return(ret);
}
/* Fields of the dynamic table INFORMATION_SCHEMA.innodb_trx */
static ST_FIELD_INFO innodb_trx_fields_info[] =
{
@ -9608,3 +9656,236 @@ UNIV_INTERN struct st_maria_plugin i_s_innodb_sys_semaphore_waits =
STRUCT_FLD(version_info, INNODB_VERSION_STR),
STRUCT_FLD(maturity, MariaDB_PLUGIN_MATURITY_STABLE),
};
/* Fields of the dynamic table INFORMATION_SCHEMA.innodb_vtq */
static ST_FIELD_INFO innodb_vtq_fields_info[] =
{
#define SYS_VTQ_TRX_ID 0
{ STRUCT_FLD(field_name, "trx_id"),
STRUCT_FLD(field_length, MY_INT64_NUM_DECIMAL_DIGITS),
STRUCT_FLD(field_type, MYSQL_TYPE_LONGLONG),
STRUCT_FLD(value, 0),
STRUCT_FLD(field_flags, MY_I_S_UNSIGNED),
STRUCT_FLD(old_name, ""),
STRUCT_FLD(open_method, SKIP_OPEN_TABLE) },
#define SYS_VTQ_BEGIN_TS 1
{ STRUCT_FLD(field_name, "begin_ts"),
STRUCT_FLD(field_length, 6),
STRUCT_FLD(field_type, MYSQL_TYPE_TIMESTAMP),
STRUCT_FLD(value, 0),
STRUCT_FLD(field_flags, 0),
STRUCT_FLD(old_name, ""),
STRUCT_FLD(open_method, SKIP_OPEN_TABLE) },
#define SYS_VTQ_COMMIT_TS 2
{ STRUCT_FLD(field_name, "commit_ts"),
STRUCT_FLD(field_length, 6),
STRUCT_FLD(field_type, MYSQL_TYPE_TIMESTAMP),
STRUCT_FLD(value, 0),
STRUCT_FLD(field_flags, 0),
STRUCT_FLD(old_name, ""),
STRUCT_FLD(open_method, SKIP_OPEN_TABLE) },
#define SYS_VTQ_CONCURR_TRX 3
{ STRUCT_FLD(field_name, "concurr_trx"),
STRUCT_FLD(field_length, 120),
STRUCT_FLD(field_type, MYSQL_TYPE_MEDIUM_BLOB),
STRUCT_FLD(value, 0),
STRUCT_FLD(field_flags, 0),
STRUCT_FLD(old_name, ""),
STRUCT_FLD(open_method, SKIP_OPEN_TABLE) },
END_OF_ST_FIELD_INFO
};
/**********************************************************************//**
Function to fill INFORMATION_SCHEMA.INNODB_SYS_VTQ with information
collected by scanning SYS_VTQ table.
@return 0 on success */
static
int
i_s_dict_fill_vtq(
/*========================*/
THD* thd, /*!< in: thread */
ullong col_trx_id, /*!< in: table fields */
ullong col_begin_ts,
ullong col_commit_ts,
ullong col_concurr_trx,
TABLE* table_to_fill) /*!< in/out: fill this table */
{
Field** fields;
DBUG_ENTER("i_s_dict_fill_vtq");
fields = table_to_fill->field;
OK(field_store_ullong(fields[SYS_VTQ_TRX_ID], col_trx_id));
OK(field_store_packed_ts(fields[SYS_VTQ_BEGIN_TS], col_begin_ts));
OK(field_store_packed_ts(fields[SYS_VTQ_COMMIT_TS], col_commit_ts));
OK(field_store_ullong(fields[SYS_VTQ_CONCURR_TRX], col_concurr_trx));
OK(schema_table_store_record(thd, table_to_fill));
DBUG_RETURN(0);
}
/*******************************************************************//**
Function to populate INFORMATION_SCHEMA.INNODB_SYS_VTQ table.
Loop through each record in SYS_VTQ, and extract the column
information and fill the INFORMATION_SCHEMA.INNODB_SYS_VTQ table.
@return 0 on success */
static const int I_S_SYS_VTQ_LIMIT = 1000; // maximum number of records in I_S.INNODB_SYS_VTQ
static
int
i_s_sys_vtq_fill_table(
/*=========================*/
THD* thd, /*!< in: thread */
TABLE_LIST* tables, /*!< in/out: tables to fill */
Item* ) /*!< in: condition (not used) */
{
btr_pcur_t pcur;
const rec_t* rec;
mem_heap_t* heap;
mtr_t mtr;
int err = 0;
DBUG_ENTER("i_s_sys_vtq_fill_table");
RETURN_IF_INNODB_NOT_STARTED(tables->schema_table_name);
/* deny access to user without PROCESS_ACL privilege */
if (check_global_access(thd, PROCESS_ACL)) {
DBUG_RETURN(0);
}
heap = mem_heap_create(1000);
mutex_enter(&dict_sys->mutex);
mtr_start(&mtr);
rec = dict_startscan_system(&pcur, &mtr, SYS_VTQ);
for (int i = 0; rec && i < I_S_SYS_VTQ_LIMIT; ++i) {
const char* err_msg;
ullong col_trx_id;
ullong col_begin_ts;
ullong col_commit_ts;
ullong col_concurr_trx;
/* Extract necessary information from a SYS_VTQ row */
err_msg = dict_process_sys_vtq(
heap,
rec,
&col_trx_id,
&col_begin_ts,
&col_commit_ts,
&col_concurr_trx);
mtr_commit(&mtr);
mutex_exit(&dict_sys->mutex);
if (!err_msg) {
err = i_s_dict_fill_vtq(
thd,
col_trx_id,
col_begin_ts,
col_commit_ts,
col_concurr_trx,
tables->table);
} else {
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_CANT_FIND_SYSTEM_REC, "%s",
err_msg);
err = 1;
}
if (err)
break;
mem_heap_empty(heap);
/* Get the next record */
mutex_enter(&dict_sys->mutex);
mtr_start(&mtr);
rec = dict_getnext_system(&pcur, &mtr);
}
if (!err) {
mtr_commit(&mtr);
mutex_exit(&dict_sys->mutex);
}
mem_heap_free(heap);
DBUG_RETURN(err);
}
/*******************************************************************//**
Bind the dynamic table INFORMATION_SCHEMA.innodb_vtq
@return 0 on success */
static
int
innodb_vtq_init(
/*===================*/
void* p) /*!< in/out: table schema object */
{
ST_SCHEMA_TABLE* schema;
DBUG_ENTER("innodb_vtq_init");
schema = (ST_SCHEMA_TABLE*) p;
schema->fields_info = innodb_vtq_fields_info;
schema->fill_table = i_s_sys_vtq_fill_table;
DBUG_RETURN(0);
}
UNIV_INTERN struct st_maria_plugin i_s_innodb_vtq =
{
/* the plugin type (a MYSQL_XXX_PLUGIN value) */
/* int */
STRUCT_FLD(type, MYSQL_INFORMATION_SCHEMA_PLUGIN),
/* pointer to type-specific plugin descriptor */
/* void* */
STRUCT_FLD(info, &i_s_info),
/* plugin name */
/* const char* */
STRUCT_FLD(name, "INNODB_VTQ"),
/* plugin author (for SHOW PLUGINS) */
/* const char* */
STRUCT_FLD(author, plugin_author),
/* general descriptive text (for SHOW PLUGINS) */
/* const char* */
STRUCT_FLD(descr, "InnoDB Versioning Transaction Query table"),
/* the plugin license (PLUGIN_LICENSE_XXX) */
/* int */
STRUCT_FLD(license, PLUGIN_LICENSE_GPL),
/* the function to invoke when plugin is loaded */
/* int (*)(void*); */
STRUCT_FLD(init, innodb_vtq_init),
/* the function to invoke when plugin is unloaded */
/* int (*)(void*); */
STRUCT_FLD(deinit, i_s_common_deinit),
/* plugin version (for SHOW PLUGINS) */
/* unsigned int */
STRUCT_FLD(version, INNODB_VERSION_SHORT),
/* struct st_mysql_show_var* */
STRUCT_FLD(status_vars, NULL),
/* struct st_mysql_sys_var** */
STRUCT_FLD(system_vars, NULL),
/* Maria extension */
STRUCT_FLD(version_info, INNODB_VERSION_STR),
STRUCT_FLD(maturity, MariaDB_PLUGIN_MATURITY_GAMMA),
};

View file

@ -64,6 +64,7 @@ extern struct st_maria_plugin i_s_innodb_sys_virtual;
extern struct st_maria_plugin i_s_innodb_tablespaces_encryption;
extern struct st_maria_plugin i_s_innodb_tablespaces_scrubbing;
extern struct st_maria_plugin i_s_innodb_sys_semaphore_waits;
extern struct st_maria_plugin i_s_innodb_vtq;
/** maximum number of buffer page info we would cache. */
#define MAX_BUF_INFO_CACHED 10000

View file

@ -325,6 +325,23 @@ enum dict_fld_sys_datafiles_enum {
DICT_FLD__SYS_DATAFILES__PATH = 3,
DICT_NUM_FIELDS__SYS_DATAFILES = 4
};
/* The columns in SYS_VTQ */
enum dict_col_sys_vtq_enum
{
DICT_COL__SYS_VTQ__TRX_ID = 0,
DICT_NUM_COLS__SYS_VTQ = 1
};
/* The field numbers in the SYS_VTQ clustered index */
enum dict_fld_sys_vtq_enum
{
DICT_FLD__SYS_VTQ__TRX_ID = 0,
DICT_FLD__SYS_VTQ__DB_TRX_ID = 1,
DICT_FLD__SYS_VTQ__DB_ROLL_PTR = 2,
DICT_FLD__SYS_VTQ__BEGIN_TS = 3,
DICT_FLD__SYS_VTQ__COMMIT_TS = 4,
DICT_FLD__SYS_VTQ__CONCURR_TRX = 5,
DICT_NUM_FIELDS__SYS_VTQ = 6
};
/* The columns in SYS_VIRTUAL */
enum dict_col_sys_virtual_enum {

View file

@ -312,6 +312,10 @@ struct tab_node_t{
storage */
};
UNIV_INTERN
dberr_t
dict_create_or_check_vtq_table(void);
/* Table create node states */
#define TABLE_BUILD_TABLE_DEF 1
#define TABLE_BUILD_COL_DEF 2

View file

@ -1738,6 +1738,7 @@ struct dict_sys_t{
dict_table_t* sys_indexes; /*!< SYS_INDEXES table */
dict_table_t* sys_fields; /*!< SYS_FIELDS table */
dict_table_t* sys_virtual; /*!< SYS_VIRTUAL table */
dict_table_t* sys_vtq; /*!< SYS_VTQ table */
/*=============================*/
UT_LIST_BASE_NODE_T(dict_table_t)

View file

@ -52,6 +52,7 @@ enum dict_system_id_t {
SYS_TABLESPACES,
SYS_DATAFILES,
SYS_VIRTUAL,
SYS_VTQ,
/* This must be last item. Defines the number of system tables. */
SYS_NUM_SYSTEM_TABLES
@ -314,6 +315,20 @@ dict_process_sys_datafiles(
const rec_t* rec, /*!< in: current SYS_DATAFILES rec */
ulint* space, /*!< out: pace id */
const char** path); /*!< out: datafile path */
/********************************************************************//**
This function parses a SYS_VTQ record, extracts necessary
information from the record and returns it to the caller.
@return error message, or NULL on success */
UNIV_INTERN
const char*
dict_process_sys_vtq(
/*=======================*/
mem_heap_t* heap, /*!< in/out: heap memory */
const rec_t* rec, /*!< in: current rec */
ullong* col_trx_id, /*!< out: field values */
ullong* col_begin_ts,
ullong* col_commit_ts,
ullong* col_concurr_trx);
/** Update the record for space_id in SYS_TABLESPACES to this filepath.
@param[in] space_id Tablespace ID
@ -338,4 +353,9 @@ dict_replace_tablespace_and_filepath(
const char* filepath,
ulint fsp_flags);
UNIV_INTERN
dict_table_t*
get_vtq_table();
#endif

View file

@ -300,7 +300,7 @@ ROW_FORMAT=REDUNDANT. InnoDB engines do not check these flags
for unknown bits in order to protect backward incompatibility. */
/* @{ */
/** Total number of bits in table->flags2. */
#define DICT_TF2_BITS 9
#define DICT_TF2_BITS 10
#define DICT_TF2_UNUSED_BIT_MASK (~0U << DICT_TF2_BITS | \
1U << DICT_TF_POS_SHARED_SPACE)
#define DICT_TF2_BIT_MASK ~DICT_TF2_UNUSED_BIT_MASK
@ -329,6 +329,8 @@ use its own tablespace instead of the system tablespace. */
index tables) of a FTS table are in HEX format. */
#define DICT_TF2_FTS_AUX_HEX_NAME 64U
#define DICT_TF2_VERSIONED 512
/* @} */
#define DICT_TF2_FLAG_SET(table, flag) \

View file

@ -380,9 +380,6 @@ struct que_thr_t{
UT_LIST_NODE_T(que_thr_t)
thrs; /*!< list of thread nodes of the fork
node */
UT_LIST_NODE_T(que_thr_t)
trx_thrs; /*!< lists of threads in wait list of
the trx */
UT_LIST_NODE_T(que_thr_t)
queue; /*!< list of runnable thread nodes in
the server task queue */

View file

@ -180,6 +180,11 @@ row_ins_step(
/*=========*/
que_thr_t* thr); /*!< in: query thread */
/***********************************************************//**
Inserts a row to SYS_VTQ table.
@return error state */
dberr_t vers_notify_vtq(que_thr_t * thr, mem_heap_t * heap);
/* Insert node structure */
struct ins_node_t{

View file

@ -1267,6 +1267,7 @@ struct trx_t {
os_event_t wsrep_event; /* event waited for in srv_conc_slot */
#endif /* WITH_WSREP */
bool vtq_notified;
ulint magic_n;
/** @return whether any persistent undo log has been generated */

View file

@ -494,6 +494,8 @@ typedef uint64_t ib_uint64_t;
typedef uint32_t ib_uint32_t;
#endif /* _WIN32 */
typedef ib_uint64_t ullong;
#ifdef _WIN64
typedef unsigned __int64 ulint;
typedef __int64 lint;

View file

@ -3798,3 +3798,67 @@ error_handling:
return(thr);
}
inline
void set_row_field_8(dtuple_t* row, int field_num, ib_uint64_t data, mem_heap_t* heap)
{
dfield_t* dfield = dtuple_get_nth_field(row, field_num);
byte* buf = static_cast<byte*>(mem_heap_alloc(heap, 8));
mach_write_to_8(buf, data);
dfield_set_data(dfield, buf, 8);
}
#include "my_time.h"
#include "sql_time.h"
/***********************************************************//**
Inserts a row to SYS_VTQ table.
@return error state */
UNIV_INTERN
dberr_t
vers_notify_vtq(que_thr_t* thr, mem_heap_t* heap)
{
dberr_t err;
trx_t* trx = thr_get_trx(thr);
dict_table_t* sys_vtq = dict_sys->sys_vtq;
ins_node_t* node = ins_node_create(INS_DIRECT, sys_vtq, heap);
node->select = NULL;
node->values_list = NULL; // for INS_VALUES
dtuple_t* row = dtuple_create(heap, dict_table_get_n_cols(sys_vtq));
dict_table_copy_types(row, sys_vtq);
struct tm unix_time;
MYSQL_TIME mysql_time;
localtime_r(&trx->start_time, &unix_time);
localtime_to_TIME(&mysql_time, &unix_time);
mysql_time.second_part = trx->start_time_micro;
ullong start_time = pack_time(&mysql_time);
set_row_field_8(row, DICT_FLD__SYS_VTQ__TRX_ID, trx->id, heap);
set_row_field_8(row, DICT_FLD__SYS_VTQ__BEGIN_TS - 2, start_time, heap);
set_row_field_8(row, DICT_FLD__SYS_VTQ__COMMIT_TS - 2, start_time, heap);
set_row_field_8(row, DICT_FLD__SYS_VTQ__CONCURR_TRX - 2, 3, heap);
ins_node_set_new_row(node, row);
trx_write_trx_id(node->trx_id_buf, trx->id);
err = lock_table(0, node->table, LOCK_IX, thr);
DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
err = DB_LOCK_WAIT;);
if (err != DB_SUCCESS) {
goto end_func;
}
node->trx_id = trx->id;
node->state = INS_NODE_ALLOC_ROW_ID;
err = row_ins(node, thr);
end_func:
trx->error_state = err;
if (err != DB_SUCCESS)
fprintf(stderr, "InnoDB: failed to insert VTQ record (see SQL error message)\n");
return err;
}

View file

@ -1537,6 +1537,14 @@ error_exit:
node->duplicate = NULL;
if (!trx->vtq_notified && DICT_TF2_FLAG_IS_SET(node->table, DICT_TF2_VERSIONED)) {
trx->vtq_notified = true;
err = vers_notify_vtq(thr, node->table->heap);
if (err != DB_SUCCESS) {
goto error_exit;
}
}
if (dict_table_has_fts_index(table)) {
doc_id_t doc_id;
@ -1981,7 +1989,7 @@ run_again:
err = trx->error_state;
if (err != DB_SUCCESS) {
error_exit:
que_thr_stop_for_mysql(thr);
if (err == DB_RECORD_NOT_FOUND) {
@ -2129,6 +2137,14 @@ run_again:
}
}
if (!trx->vtq_notified && DICT_TF2_FLAG_IS_SET(node->table, DICT_TF2_VERSIONED)) {
trx->vtq_notified = true;
err = vers_notify_vtq(thr, node->table->heap);
if (err != DB_SUCCESS) {
goto error;
}
}
trx->op_info = "";
que_thr_stop_for_mysql_no_error(thr, trx);

View file

@ -2534,6 +2534,10 @@ files_checked:
err = dict_create_or_check_sys_tablespace();
if (err == DB_SUCCESS) {
err = dict_create_or_check_sys_virtual();
if (err == DB_SUCCESS) {
/* Create the SYS_VTQ system table */
err = dict_create_or_check_vtq_table();
}
}
}
switch (err) {

View file

@ -1349,15 +1349,22 @@ trx_start_low(
}
}
if (trx->mysql_thd != NULL) {
trx->start_time = thd_start_time_in_secs(trx->mysql_thd);
trx->start_time_micro = thd_query_start_micro(trx->mysql_thd);
ut_usectime((ulong *)&trx->start_time,
(ulong *)&trx->start_time_micro);
} else {
trx->start_time = ut_time();
trx->start_time_micro = 0;
if (trx->mysql_thd != NULL) {
time_t start_time = thd_start_time_in_secs(trx->mysql_thd);
ib_uint64_t start_utime = thd_query_start_micro(trx->mysql_thd);
if (start_time < trx->start_time ||
(start_time == trx->start_time && start_utime < trx->start_time_micro))
{
trx->start_time = start_time;
trx->start_time_micro = start_utime;
}
}
trx->vtq_notified = false;
ut_a(trx->error_state == DB_SUCCESS);
MONITOR_INC(MONITOR_TRX_ACTIVE);