MDEV-34515: Contention between purge and workload

In a Sysbench oltp_update_index workload that involves 1 table,
a serious contention between the workload and the purge of history
was observed. This was the worst when the table contained only 1 record.

This turned out to be fixed by setting innodb_purge_batch_size=128,
which corresponds to the number of usable persistent rollback segments.
When we go above that, there would be contention between row_purge_poss_sec()
and the workload, typically on the clustered index page latch, sometimes
also on a secondary index page latch. It might be that with smaller
batches, trx_sys.history_size() will end up pausing all concurrent
transaction start/commit frequently enough so that purge will be able
to make some progress, so that there would be less contention on the
index page latches between purge and SQL execution.

In commit aa719b5010 (part of MDEV-32050)
the interpretation of the parameter innodb_purge_batch_size was slightly
changed. It would correspond to the maximum desired size of the
purge_sys.pages cache. Before that change, the parameter was referring to
a number of undo log pages, but the accounting might have been inaccurate.

To avoid a regression, we will reduce the default value to
innodb_purge_batch_size=127, which will also be compatible with
innodb_undo_tablespaces>1 (which will disable rollback segment 0).

Additionally, some logic in the purge and MVCC checks is simplified.
The purge tasks will make use of purge_sys.pages when accessing undo
log pages to find out if a secondary index record can be removed.
If an undo page needs to be looked up in buf_pool.page_hash, we will
merely buffer-fix it. This is correct, because the undo pages are
append-only in nature. Holding purge_sys.latch or purge_sys.end_latch
or the fact that the current thread is executing as a part of an
in-progress purge batch will prevent the contents of the undo page from
being freed and subsequently reused. The buffer-fix will prevent the
page from being evicted form the buffer pool. Thanks to this logic,
we can refer to the undo log record directly in the buffer pool page
and avoid copying the record.

buf_pool_t::page_fix(): Look up and buffer-fix a page. This is useful
for accessing undo log pages, which are append-only by nature.
There will be no need to deal with change buffer or ROW_FORMAT=COMPRESSED
in that case.

purge_sys_t::view_guard::view_guard(): Allow the type of guard to be
acquired: end_latch, latch, or no latch (in case we are a purge thread).

purge_sys_t::view_guard::get(): Read-only accessor to purge_sys.pages.

purge_sys_t::get_page(): Invoke buf_pool_t::page_fix().

row_vers_old_has_index_entry(): Replaced with row_purge_is_unsafe()
and row_undo_mod_sec_unsafe().

trx_undo_get_undo_rec(): Merged to trx_undo_prev_version_build().

row_purge_poss_sec(): Add the parameter mtr and remove redundant
or unused parameters sec_pcur, sec_mtr, is_tree. We will use the
caller's mtr object but release any acquired page latches before
returning.

btr_cur_get_page(), page_cur_get_page(): Do not invoke page_align().

row_purge_remove_sec_if_poss_leaf(): Return the value of PAGE_MAX_TRX_ID
to be checked against the page in row_purge_remove_sec_if_poss_tree().
If the secondary index page was not changed meanwhile, it will be
unnecessary to invoke row_purge_poss_sec() again.

trx_undo_prev_version_build(): Access any undo log pages using
the caller's mini-transaction object.

row_purge_vc_matches_cluster(): Moved to the only compilation unit that
needs it.

Reviewed by: Debarun Banerjee
This commit is contained in:
Marko Mäkelä 2024-08-26 12:23:06 +03:00
parent d58734d781
commit b7b9f3ce82
23 changed files with 958 additions and 887 deletions

View file

@ -1,19 +1,19 @@
SET @global_start_value = @@global.innodb_purge_batch_size; SET @global_start_value = @@global.innodb_purge_batch_size;
SELECT @global_start_value; SELECT @global_start_value;
@global_start_value @global_start_value
1000 127
'#--------------------FN_DYNVARS_046_01------------------------#' '#--------------------FN_DYNVARS_046_01------------------------#'
SET @@global.innodb_purge_batch_size = 1; SET @@global.innodb_purge_batch_size = 1;
SET @@global.innodb_purge_batch_size = DEFAULT; SET @@global.innodb_purge_batch_size = DEFAULT;
SELECT @@global.innodb_purge_batch_size; SELECT @@global.innodb_purge_batch_size;
@@global.innodb_purge_batch_size @@global.innodb_purge_batch_size
1000 127
'#---------------------FN_DYNVARS_046_02-------------------------#' '#---------------------FN_DYNVARS_046_02-------------------------#'
SET innodb_purge_batch_size = 1; SET innodb_purge_batch_size = 1;
ERROR HY000: Variable 'innodb_purge_batch_size' is a GLOBAL variable and should be set with SET GLOBAL ERROR HY000: Variable 'innodb_purge_batch_size' is a GLOBAL variable and should be set with SET GLOBAL
SELECT @@innodb_purge_batch_size; SELECT @@innodb_purge_batch_size;
@@innodb_purge_batch_size @@innodb_purge_batch_size
1000 127
SELECT local.innodb_purge_batch_size; SELECT local.innodb_purge_batch_size;
ERROR 42S02: Unknown table 'local' in field list ERROR 42S02: Unknown table 'local' in field list
SET global innodb_purge_batch_size = 1; SET global innodb_purge_batch_size = 1;
@ -112,4 +112,4 @@ SELECT @@global.innodb_purge_batch_size;
SET @@global.innodb_purge_batch_size = @global_start_value; SET @@global.innodb_purge_batch_size = @global_start_value;
SELECT @@global.innodb_purge_batch_size; SELECT @@global.innodb_purge_batch_size;
@@global.innodb_purge_batch_size @@global.innodb_purge_batch_size
1000 127

View file

@ -1293,7 +1293,7 @@ READ_ONLY NO
COMMAND_LINE_ARGUMENT OPTIONAL COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME INNODB_PURGE_BATCH_SIZE VARIABLE_NAME INNODB_PURGE_BATCH_SIZE
SESSION_VALUE NULL SESSION_VALUE NULL
DEFAULT_VALUE 1000 DEFAULT_VALUE 127
VARIABLE_SCOPE GLOBAL VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BIGINT UNSIGNED VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Number of UNDO log pages to purge in one batch from the history list. VARIABLE_COMMENT Number of UNDO log pages to purge in one batch from the history list.

View file

@ -1277,7 +1277,7 @@ dberr_t btr_cur_t::search_leaf(const dtuple_t *tuple, page_cur_mode_t mode,
ut_ad(buf_mode == BUF_GET_IF_IN_POOL_OR_WATCH); ut_ad(buf_mode == BUF_GET_IF_IN_POOL_OR_WATCH);
auto& chain = buf_pool.page_hash.cell_get(page_id.fold()); auto& chain = buf_pool.page_hash.cell_get(page_id.fold());
if (!row_purge_poss_sec(purge_node, index(), tuple)) if (!row_purge_poss_sec(purge_node, index(), tuple, mtr))
/* The record cannot be purged yet. */ /* The record cannot be purged yet. */
flag= BTR_CUR_DELETE_REF; flag= BTR_CUR_DELETE_REF;
else if (ibuf_insert(IBUF_OP_DELETE, tuple, index(), else if (ibuf_insert(IBUF_OP_DELETE, tuple, index(),

View file

@ -2476,6 +2476,51 @@ static bool buf_page_ibuf_merge_try(buf_block_t *block, ulint rw_latch,
return false; return false;
} }
buf_block_t* buf_pool_t::page_fix(const page_id_t id)
{
ha_handler_stats *const stats= mariadb_stats;
buf_inc_get(stats);
auto& chain= page_hash.cell_get(id.fold());
page_hash_latch &hash_lock= page_hash.lock_get(chain);
for (;;)
{
hash_lock.lock_shared();
buf_page_t *b= page_hash.get(id, chain);
if (b)
{
uint32_t state= b->fix();
hash_lock.unlock_shared();
ut_ad(!b->in_zip_hash);
ut_ad(b->frame);
ut_ad(state >= buf_page_t::FREED);
if (state >= buf_page_t::READ_FIX && state < buf_page_t::WRITE_FIX)
{
b->lock.s_lock();
state= b->state();
ut_ad(state < buf_page_t::READ_FIX || state >= buf_page_t::WRITE_FIX);
b->lock.s_unlock();
}
if (UNIV_UNLIKELY(state < buf_page_t::UNFIXED))
{
/* The page was marked as freed or corrupted. */
b->unfix();
b= nullptr;
}
return reinterpret_cast<buf_block_t*>(b);
}
hash_lock.unlock_shared();
switch (buf_read_page(id, 0)) {
default:
return nullptr;
case DB_SUCCESS:
case DB_SUCCESS_LOCKED_REC:
mariadb_increment_pages_read(stats);
buf_read_ahead_random(id, 0, false);
}
}
}
/** Low level function used to get access to a database page. /** Low level function used to get access to a database page.
@param[in] page_id page id @param[in] page_id page id
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0

View file

@ -18946,7 +18946,7 @@ static MYSQL_SYSVAR_ULONG(purge_batch_size, srv_purge_batch_size,
PLUGIN_VAR_OPCMDARG, PLUGIN_VAR_OPCMDARG,
"Number of UNDO log pages to purge in one batch from the history list.", "Number of UNDO log pages to purge in one batch from the history list.",
NULL, NULL, NULL, NULL,
1000, /* Default setting */ 127, /* Default setting */
1, /* Minimum value */ 1, /* Minimum value */
innodb_purge_batch_size_MAX, 0); innodb_purge_batch_size_MAX, 0);

View file

@ -78,14 +78,10 @@ page_zip_des_t*
btr_cur_get_page_zip( btr_cur_get_page_zip(
/*=================*/ /*=================*/
btr_cur_t* cursor);/*!< in: tree cursor */ btr_cur_t* cursor);/*!< in: tree cursor */
/*********************************************************//** /** Returns the page of a tree cursor.
Returns the page of a tree cursor.
@return pointer to page */ @return pointer to page */
UNIV_INLINE #define btr_cur_get_page(cursor) (cursor)->block()->page.frame
page_t*
btr_cur_get_page(
/*=============*/
btr_cur_t* cursor);/*!< in: tree cursor */
/*********************************************************//** /*********************************************************//**
Returns the index of a cursor. Returns the index of a cursor.
@param cursor b-tree cursor @param cursor b-tree cursor

View file

@ -48,18 +48,6 @@ btr_cur_get_page_zip(
return(buf_block_get_page_zip(btr_cur_get_block(cursor))); return(buf_block_get_page_zip(btr_cur_get_block(cursor)));
} }
/*********************************************************//**
Returns the page of a tree cursor.
@return pointer to page */
UNIV_INLINE
page_t*
btr_cur_get_page(
/*=============*/
btr_cur_t* cursor) /*!< in: tree cursor */
{
return(page_align(page_cur_get_rec(&(cursor->page_cur))));
}
/*********************************************************//** /*********************************************************//**
Positions a tree cursor at a given record. */ Positions a tree cursor at a given record. */
UNIV_INLINE UNIV_INLINE

View file

@ -1416,6 +1416,12 @@ public:
} }
public: public:
/** Look up and buffer-fix a page.
@param id page identifier
@return undo log page, buffer-fixed
@retval nullptr if the undo page was corrupted or freed */
buf_block_t *page_fix(const page_id_t id);
/** @return whether the buffer pool contains a page /** @return whether the buffer pool contains a page
@tparam allow_watch whether to allow watch_is_sentinel() @tparam allow_watch whether to allow watch_is_sentinel()
@param page_id page identifier @param page_id page identifier

View file

@ -31,14 +31,6 @@ Created 10/4/1994 Heikki Tuuri
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/*********************************************************//** /*********************************************************//**
Gets pointer to the page frame where the cursor is positioned.
@return page */
UNIV_INLINE
page_t*
page_cur_get_page(
/*==============*/
page_cur_t* cur); /*!< in: page cursor */
/*********************************************************//**
Gets pointer to the buffer block where the cursor is positioned. Gets pointer to the buffer block where the cursor is positioned.
@return page */ @return page */
UNIV_INLINE UNIV_INLINE
@ -60,12 +52,12 @@ page_cur_get_page_zip(
UNIV_INLINE UNIV_INLINE
rec_t *page_cur_get_rec(const page_cur_t *cur); rec_t *page_cur_get_rec(const page_cur_t *cur);
#else /* UNIV_DEBUG */ #else /* UNIV_DEBUG */
# define page_cur_get_page(cur) page_align((cur)->rec)
# define page_cur_get_block(cur) (cur)->block # define page_cur_get_block(cur) (cur)->block
# define page_cur_get_page_zip(cur) buf_block_get_page_zip((cur)->block) # define page_cur_get_page_zip(cur) buf_block_get_page_zip((cur)->block)
# define page_cur_get_rec(cur) (cur)->rec # define page_cur_get_rec(cur) (cur)->rec
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
# define is_page_cur_get_page_zip(cur) is_buf_block_get_page_zip((cur)->block) #define page_cur_get_page(cur) page_cur_get_block(cur)->page.frame
#define is_page_cur_get_page_zip(cur) is_buf_block_get_page_zip((cur)->block)
/*********************************************************//** /*********************************************************//**
Sets the cursor object to point before the first user record Sets the cursor object to point before the first user record
on the page. */ on the page. */

View file

@ -25,18 +25,6 @@ Created 10/4/1994 Heikki Tuuri
*************************************************************************/ *************************************************************************/
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/*********************************************************//**
Gets pointer to the page frame where the cursor is positioned.
@return page */
UNIV_INLINE
page_t*
page_cur_get_page(
/*==============*/
page_cur_t* cur) /*!< in: page cursor */
{
return page_align(page_cur_get_rec(cur));
}
/*********************************************************//** /*********************************************************//**
Gets pointer to the buffer block where the cursor is positioned. Gets pointer to the buffer block where the cursor is positioned.
@return page */ @return page */

View file

@ -50,26 +50,13 @@ inserts a record that the secondary index entry would refer to.
However, in that case, the user transaction would also re-insert the However, in that case, the user transaction would also re-insert the
secondary index entry after purge has removed it and released the leaf secondary index entry after purge has removed it and released the leaf
page latch. page latch.
@param[in,out] node row purge node @param node row purge node
@param[in] index secondary index @param index secondary index
@param[in] entry secondary index entry @param entry secondary index entry
@param[in,out] sec_pcur secondary index cursor or NULL @param mtr mini-transaction for looking up clustered index
if it is called for purge buffering @return whether the secondary index record can be purged */
operation. bool row_purge_poss_sec(purge_node_t *node, dict_index_t *index,
@param[in,out] sec_mtr mini-transaction which holds const dtuple_t *entry, mtr_t *mtr);
secondary index entry or NULL if it is
called for purge buffering operation.
@param[in] is_tree true=pessimistic purge,
false=optimistic (leaf-page only)
@return true if the secondary index record can be purged */
bool
row_purge_poss_sec(
purge_node_t* node,
dict_index_t* index,
const dtuple_t* entry,
btr_pcur_t* sec_pcur=NULL,
mtr_t* sec_mtr=NULL,
bool is_tree=false);
/*************************************************************** /***************************************************************
Does the purge operation. Does the purge operation.

View file

@ -54,32 +54,47 @@ row_vers_impl_x_locked(
dict_index_t* index, dict_index_t* index,
const rec_offs* offsets); const rec_offs* offsets);
/** Finds out if a version of the record, where the version >= the current /** Find out whether data tuple has missing data type
purge_sys.view, should have ientry as its secondary index entry. We check for indexed virtual column.
if there is any not delete marked version of the record where the trx @param tuple data tuple
id >= purge view, and the secondary index entry == ientry; exactly in @param index virtual index
this case we return TRUE. @return true if tuple has missing column type */
@param[in] also_curr TRUE if also rec is included in the versions bool dtuple_vcol_data_missing(const dtuple_t &tuple,
to search; otherwise only versions prior const dict_index_t &index);
to it are searched /** build virtual column value from current cluster index record data
@param[in] rec record in the clustered index; the caller @param[in,out] row the cluster index row in dtuple form
must have a latch on the page @param[in] clust_index clustered index
@param[in] mtr mtr holding the latch on rec; it will @param[in] index the secondary index
also hold the latch on purge_view @param[in] heap heap used to build virtual dtuple. */
@param[in] index secondary index
@param[in] ientry secondary index entry
@param[in] roll_ptr roll_ptr for the purge record
@param[in] trx_id transaction ID on the purging record
@return TRUE if earlier version should have */
bool bool
row_vers_old_has_index_entry( row_vers_build_clust_v_col(
bool also_curr, dtuple_t* row,
const rec_t* rec, dict_index_t* clust_index,
mtr_t* mtr,
dict_index_t* index, dict_index_t* index,
const dtuple_t* ientry, mem_heap_t* heap);
/** Build a dtuple contains virtual column data for current cluster index
@param[in] rec cluster index rec
@param[in] clust_index cluster index
@param[in] clust_offsets cluster rec offset
@param[in] index secondary index
@param[in] trx_id transaction ID on the purging record,
or 0 if called outside purge
@param[in] roll_ptr roll_ptr for the purge record
@param[in,out] heap heap memory
@param[in,out] v_heap heap memory to keep virtual column tuple
@param[in,out] mtr mini-transaction
@return dtuple contains virtual column data */
dtuple_t*
row_vers_build_cur_vrow(
const rec_t* rec,
dict_index_t* clust_index,
rec_offs** clust_offsets,
dict_index_t* index,
trx_id_t trx_id,
roll_ptr_t roll_ptr, roll_ptr_t roll_ptr,
trx_id_t trx_id); mem_heap_t* heap,
mem_heap_t* v_heap,
mtr_t* mtr);
/*****************************************************************//** /*****************************************************************//**
Constructs the version of a clustered index record which a consistent Constructs the version of a clustered index record which a consistent

View file

@ -438,10 +438,17 @@ public:
struct view_guard struct view_guard
{ {
inline view_guard(); enum guard { END_VIEW= -1, PURGE= 0, VIEW= 1};
guard latch;
inline view_guard(guard latch);
inline ~view_guard(); inline ~view_guard();
/** Fetch an undo log page.
@param id page identifier
@param mtr mini-transaction
@return reference to buffer page, possibly buffer-fixed in mtr */
inline const buf_block_t *get(const page_id_t id, mtr_t *mtr);
/** @return purge_sys.view */ /** @return purge_sys.view or purge_sys.end_view */
inline const ReadViewBase &view() const; inline const ReadViewBase &view() const;
}; };
@ -470,14 +477,39 @@ public:
/** The global data structure coordinating a purge */ /** The global data structure coordinating a purge */
extern purge_sys_t purge_sys; extern purge_sys_t purge_sys;
purge_sys_t::view_guard::view_guard() purge_sys_t::view_guard::view_guard(purge_sys_t::view_guard::guard latch) :
{ purge_sys.latch.rd_lock(SRW_LOCK_CALL); } latch(latch)
{
switch (latch) {
case VIEW:
purge_sys.latch.rd_lock(SRW_LOCK_CALL);
break;
case END_VIEW:
purge_sys.end_latch.rd_lock();
break;
case PURGE:
/* the access is within a purge batch; purge_coordinator_task
will wait for all workers to complete before updating the views */
break;
}
}
purge_sys_t::view_guard::~view_guard() purge_sys_t::view_guard::~view_guard()
{ purge_sys.latch.rd_unlock(); } {
switch (latch) {
case VIEW:
purge_sys.latch.rd_unlock();
break;
case END_VIEW:
purge_sys.end_latch.rd_unlock();
break;
case PURGE:
break;
}
}
const ReadViewBase &purge_sys_t::view_guard::view() const const ReadViewBase &purge_sys_t::view_guard::view() const
{ return purge_sys.view; } { return latch == END_VIEW ? purge_sys.end_view : purge_sys.view; }
purge_sys_t::end_view_guard::end_view_guard() purge_sys_t::end_view_guard::end_view_guard()
{ purge_sys.end_latch.rd_lock(); } { purge_sys.end_latch.rd_lock(); }

View file

@ -157,50 +157,44 @@ trx_undo_report_row_operation(
/** TRX_UNDO_PREV_IN_PURGE tells trx_undo_prev_version_build() that it /** TRX_UNDO_PREV_IN_PURGE tells trx_undo_prev_version_build() that it
is being called purge view and we would like to get the purge record is being called purge view and we would like to get the purge record
even it is in the purge view (in normal case, it will return without even it is in the purge view (in normal case, it will return without
fetching the purge record */ fetching the purge record) */
static constexpr ulint TRX_UNDO_PREV_IN_PURGE = 1; static constexpr ulint TRX_UNDO_PREV_IN_PURGE = 1;
/** This tells trx_undo_prev_version_build() to fetch the old value in /** This tells trx_undo_prev_version_build() to fetch the old value in
the undo log (which is the after image for an update) */ the undo log (which is the after image for an update) */
static constexpr ulint TRX_UNDO_GET_OLD_V_VALUE = 2; static constexpr ulint TRX_UNDO_GET_OLD_V_VALUE = 2;
/** indicate a call from row_vers_old_has_index_entry() */ /** indicate a call from row_undo_mod_sec_is_unsafe() */
static constexpr ulint TRX_UNDO_CHECK_PURGEABILITY = 4; static constexpr ulint TRX_UNDO_CHECK_PURGEABILITY = 4;
/** indicate a call from row_purge_is_unsafe() */
static constexpr ulint TRX_UNDO_CHECK_PURGE_PAGES = 8;
/** Build a previous version of a clustered index record. The caller /** Build a previous version of a clustered index record. The caller
must hold a latch on the index page of the clustered index record. must hold a latch on the index page of the clustered index record.
@param rec version of a clustered index record @param rec version of a clustered index record
@param index clustered index @param index clustered index
@param offsets rec_get_offsets(rec, index) @param offsets rec_get_offsets(rec, index)
@param heap memory heap from which the memory needed is @param heap memory heap from which the memory needed is allocated
allocated @param old_vers previous version, or NULL if rec is the first inserted
@param old_vers previous version or NULL if rec is the version, or if history data has been deleted (an error),
first inserted version, or if history data or if the purge could have removed the version though
has been deleted (an error), or if the purge it has not yet done so
could have removed the version @param mtr mini-transaction
though it has not yet done so @param v_status TRX_UNDO_PREV_IN_PURGE, ...
@param v_heap memory heap used to create vrow @param v_heap memory heap used to create vrow dtuple if it is not yet
dtuple if it is not yet created. This heap created. This heap diffs from "heap" above in that it could be
diffs from "heap" above in that it could be
prebuilt->old_vers_heap for selection prebuilt->old_vers_heap for selection
@param vrow virtual column info, if any @param vrow virtual column info, if any
@param v_status status determine if it is going into this
function by purge thread or not.
And if we read "after image" of undo log
@return error code @return error code
@retval DB_SUCCESS if previous version was successfully built, @retval DB_SUCCESS if previous version was successfully built,
or if it was an insert or the undo record refers to the table before rebuild or if it was an insert or the undo record refers to the table before rebuild
@retval DB_MISSING_HISTORY if the history is missing */ @retval DB_MISSING_HISTORY if the history is missing */
dberr_t dberr_t trx_undo_prev_version_build(const rec_t *rec, dict_index_t *index,
trx_undo_prev_version_build( rec_offs *offsets, mem_heap_t *heap,
const rec_t *rec, rec_t **old_vers, mtr_t *mtr,
dict_index_t *index, ulint v_status,
rec_offs *offsets, mem_heap_t *v_heap, dtuple_t **vrow);
mem_heap_t *heap,
rec_t **old_vers,
mem_heap_t *v_heap,
dtuple_t **vrow,
ulint v_status);
/** Read from an undo log record a non-virtual column value. /** Read from an undo log record a non-virtual column value.
@param ptr pointer to remaining part of the undo record @param ptr pointer to remaining part of the undo record

View file

@ -160,7 +160,7 @@ may be pointing to garbage (an undo log record discarded by purge),
but it will never be dereferenced, because the purge view is older but it will never be dereferenced, because the purge view is older
than any active transaction. than any active transaction.
For details see: row_vers_old_has_index_entry() and row_purge_poss_sec() For details see: row_undo_mod_sec_is_unsafe() and row_purge_poss_sec()
*/ */

View file

@ -3857,7 +3857,7 @@ UndorecApplier::get_old_rec(const dtuple_t &tuple, dict_index_t *index,
if (is_same(roll_ptr)) if (is_same(roll_ptr))
return version; return version;
trx_undo_prev_version_build(version, index, *offsets, heap, &prev_version, trx_undo_prev_version_build(version, index, *offsets, heap, &prev_version,
nullptr, nullptr, 0); &mtr, 0, nullptr, nullptr);
version= prev_version; version= prev_version;
} }
while (version); while (version);
@ -4026,7 +4026,7 @@ void UndorecApplier::log_update(const dtuple_t &tuple,
copy_rec= rec_copy(mem_heap_alloc( copy_rec= rec_copy(mem_heap_alloc(
heap, rec_offs_size(offsets)), match_rec, offsets); heap, rec_offs_size(offsets)), match_rec, offsets);
trx_undo_prev_version_build(match_rec, clust_index, offsets, heap, trx_undo_prev_version_build(match_rec, clust_index, offsets, heap,
&prev_version, nullptr, nullptr, 0); &prev_version, &mtr, 0, nullptr, nullptr);
prev_offsets= rec_get_offsets(prev_version, clust_index, prev_offsets, prev_offsets= rec_get_offsets(prev_version, clust_index, prev_offsets,
clust_index->n_core_fields, clust_index->n_core_fields,

View file

@ -271,6 +271,433 @@ row_purge_remove_clust_if_poss(
return(false); return(false);
} }
/** Check a virtual column value index secondary virtual index matches
that of current cluster index record, which is recreated from information
stored in undo log
@param[in] rec record in the clustered index
@param[in] icentry the index entry built from a cluster row
@param[in] clust_index cluster index
@param[in] clust_offsets offsets on the cluster record
@param[in] index the secondary index
@param[in] ientry the secondary index entry
@param[in] roll_ptr the rollback pointer for the purging record
@param[in] trx_id trx id for the purging record
@param[in,out] mtr mini-transaction
@param[in,out] v_row dtuple holding the virtual rows (if needed)
@return true if matches, false otherwise */
static
bool
row_purge_vc_matches_cluster(
const rec_t* rec,
const dtuple_t* icentry,
dict_index_t* clust_index,
rec_offs* clust_offsets,
dict_index_t* index,
const dtuple_t* ientry,
roll_ptr_t roll_ptr,
trx_id_t trx_id,
mtr_t* mtr,
dtuple_t** vrow)
{
const rec_t* version;
rec_t* prev_version;
mem_heap_t* heap2;
mem_heap_t* heap = NULL;
mem_heap_t* tuple_heap;
ulint num_v = dict_table_get_n_v_cols(index->table);
bool compare[REC_MAX_N_FIELDS];
ulint n_fields = dtuple_get_n_fields(ientry);
ulint n_non_v_col = 0;
ulint n_cmp_v_col = 0;
const dfield_t* field1;
dfield_t* field2;
ulint i;
/* First compare non-virtual columns (primary keys) */
ut_ad(index->n_fields == n_fields);
ut_ad(n_fields == dtuple_get_n_fields(icentry));
ut_ad(mtr->memo_contains_page_flagged(rec,
MTR_MEMO_PAGE_S_FIX
| MTR_MEMO_PAGE_X_FIX));
{
const dfield_t* a = ientry->fields;
const dfield_t* b = icentry->fields;
for (const dict_field_t *ifield = index->fields,
*const end = &index->fields[index->n_fields];
ifield != end; ifield++, a++, b++) {
if (!ifield->col->is_virtual()) {
if (cmp_dfield_dfield(a, b)) {
return false;
}
n_non_v_col++;
}
}
}
tuple_heap = mem_heap_create(1024);
ut_ad(n_fields > n_non_v_col);
*vrow = dtuple_create_with_vcol(tuple_heap, 0, num_v);
dtuple_init_v_fld(*vrow);
for (i = 0; i < num_v; i++) {
dfield_get_type(dtuple_get_nth_v_field(*vrow, i))->mtype
= DATA_MISSING;
compare[i] = false;
}
version = rec;
while (n_cmp_v_col < n_fields - n_non_v_col) {
heap2 = heap;
heap = mem_heap_create(1024);
roll_ptr_t cur_roll_ptr = row_get_rec_roll_ptr(
version, clust_index, clust_offsets);
ut_ad(cur_roll_ptr != 0);
ut_ad(roll_ptr != 0);
trx_undo_prev_version_build(
version, clust_index, clust_offsets,
heap, &prev_version, mtr,
TRX_UNDO_PREV_IN_PURGE | TRX_UNDO_GET_OLD_V_VALUE,
nullptr, vrow);
if (heap2) {
mem_heap_free(heap2);
}
if (!prev_version) {
/* Versions end here */
goto func_exit;
}
clust_offsets = rec_get_offsets(prev_version, clust_index,
NULL,
clust_index->n_core_fields,
ULINT_UNDEFINED, &heap);
ulint entry_len = dict_index_get_n_fields(index);
for (i = 0; i < entry_len; i++) {
const dict_field_t* ind_field
= dict_index_get_nth_field(index, i);
const dict_col_t* col = ind_field->col;
field1 = dtuple_get_nth_field(ientry, i);
if (!col->is_virtual()) {
continue;
}
const dict_v_col_t* v_col
= reinterpret_cast<const dict_v_col_t*>(col);
field2
= dtuple_get_nth_v_field(*vrow, v_col->v_pos);
if ((dfield_get_type(field2)->mtype != DATA_MISSING)
&& (!compare[v_col->v_pos])) {
if (ind_field->prefix_len != 0
&& !dfield_is_null(field2)) {
field2->len = unsigned(
dtype_get_at_most_n_mbchars(
field2->type.prtype,
field2->type.mbminlen,
field2->type.mbmaxlen,
ind_field->prefix_len,
field2->len,
static_cast<char*>
(field2->data)));
}
/* The index field mismatch */
if (cmp_dfield_dfield(field2, field1)) {
mem_heap_free(tuple_heap);
mem_heap_free(heap);
return(false);
}
compare[v_col->v_pos] = true;
n_cmp_v_col++;
}
}
trx_id_t rec_trx_id = row_get_rec_trx_id(
prev_version, clust_index, clust_offsets);
if (rec_trx_id < trx_id || roll_ptr == cur_roll_ptr) {
break;
}
version = prev_version;
}
func_exit:
if (n_cmp_v_col == 0) {
*vrow = NULL;
}
mem_heap_free(tuple_heap);
mem_heap_free(heap);
/* FIXME: In the case of n_cmp_v_col is not the same as
n_fields - n_non_v_col, callback is needed to compare the rest
columns. At the timebeing, we will need to return true */
return (true);
}
/** Finds out if a version of the record, where the version >= the current
purge_sys.view, should have ientry as its secondary index entry. We check
if there is any not delete marked version of the record where the trx
id >= purge view, and the secondary index entry == ientry; exactly in
this case we return TRUE.
@param node purge node
@param index secondary index
@param ientry secondary index entry
@param mtr mini-transaction
@return whether ientry cannot be purged */
static bool row_purge_is_unsafe(const purge_node_t &node,
dict_index_t *index,
const dtuple_t *ientry, mtr_t *mtr)
{
const rec_t* rec = btr_pcur_get_rec(&node.pcur);
roll_ptr_t roll_ptr = node.roll_ptr;
trx_id_t trx_id = node.trx_id;
const rec_t* version;
rec_t* prev_version;
dict_index_t* clust_index = node.pcur.index();
rec_offs* clust_offsets;
mem_heap_t* heap;
dtuple_t* row;
const dtuple_t* entry;
dtuple_t* vrow = NULL;
mem_heap_t* v_heap = NULL;
dtuple_t* cur_vrow = NULL;
ut_ad(index->table == clust_index->table);
heap = mem_heap_create(1024);
clust_offsets = rec_get_offsets(rec, clust_index, NULL,
clust_index->n_core_fields,
ULINT_UNDEFINED, &heap);
if (dict_index_has_virtual(index)) {
v_heap = mem_heap_create(100);
}
if (!rec_get_deleted_flag(rec, rec_offs_comp(clust_offsets))) {
row_ext_t* ext;
/* The top of the stack of versions is locked by the
mtr holding a latch on the page containing the
clustered index record. The bottom of the stack is
locked by the fact that the purge_sys.view must
'overtake' any read view of an active transaction.
Thus, it is safe to fetch the prefixes for
externally stored columns. */
row = row_build(ROW_COPY_POINTERS, clust_index,
rec, clust_offsets,
NULL, NULL, NULL, &ext, heap);
if (dict_index_has_virtual(index)) {
#ifdef DBUG_OFF
# define dbug_v_purge false
#else /* DBUG_OFF */
bool dbug_v_purge = false;
#endif /* DBUG_OFF */
DBUG_EXECUTE_IF(
"ib_purge_virtual_index_callback",
dbug_v_purge = true;);
roll_ptr_t t_roll_ptr = row_get_rec_roll_ptr(
rec, clust_index, clust_offsets);
/* if the row is newly inserted, then the virtual
columns need to be computed */
if (trx_undo_roll_ptr_is_insert(t_roll_ptr)
|| dbug_v_purge) {
if (!row_vers_build_clust_v_col(
row, clust_index, index, heap)) {
goto unsafe_to_purge;
}
entry = row_build_index_entry(
row, ext, index, heap);
if (entry && !dtuple_coll_cmp(ientry, entry)) {
goto unsafe_to_purge;
}
} else {
/* Build index entry out of row */
entry = row_build_index_entry(row, ext, index, heap);
/* entry could only be NULL if
the clustered index record is an uncommitted
inserted record whose BLOBs have not been
written yet. The secondary index record
can be safely removed, because it cannot
possibly refer to this incomplete
clustered index record. (Insert would
always first be completed for the
clustered index record, then proceed to
secondary indexes.) */
if (entry && row_purge_vc_matches_cluster(
rec, entry,
clust_index, clust_offsets,
index, ientry, roll_ptr,
trx_id, mtr, &vrow)) {
goto unsafe_to_purge;
}
}
clust_offsets = rec_get_offsets(rec, clust_index, NULL,
clust_index
->n_core_fields,
ULINT_UNDEFINED, &heap);
} else {
entry = row_build_index_entry(
row, ext, index, heap);
/* If entry == NULL, the record contains unset BLOB
pointers. This must be a freshly inserted record. If
this is called from
row_purge_remove_sec_if_poss_low(), the thread will
hold latches on the clustered index and the secondary
index. Because the insert works in three steps:
(1) insert the record to clustered index
(2) store the BLOBs and update BLOB pointers
(3) insert records to secondary indexes
the purge thread can safely ignore freshly inserted
records and delete the secondary index record. The
thread that inserted the new record will be inserting
the secondary index records. */
/* NOTE that we cannot do the comparison as binary
fields because the row is maybe being modified so that
the clustered index record has already been updated to
a different binary value in a char field, but the
collation identifies the old and new value anyway! */
if (entry && !dtuple_coll_cmp(ientry, entry)) {
unsafe_to_purge:
mem_heap_free(heap);
if (v_heap) {
mem_heap_free(v_heap);
}
return true;
}
}
} else if (dict_index_has_virtual(index)) {
/* The current cluster index record could be
deleted, but the previous version of it might not. We will
need to get the virtual column data from undo record
associated with current cluster index */
cur_vrow = row_vers_build_cur_vrow(
rec, clust_index, &clust_offsets,
index, trx_id, roll_ptr, heap, v_heap, mtr);
}
version = rec;
for (;;) {
mem_heap_t* heap2 = heap;
heap = mem_heap_create(1024);
vrow = NULL;
trx_undo_prev_version_build(version,
clust_index, clust_offsets,
heap, &prev_version, mtr,
TRX_UNDO_CHECK_PURGE_PAGES,
nullptr,
dict_index_has_virtual(index)
? &vrow : nullptr);
mem_heap_free(heap2); /* free version and clust_offsets */
if (!prev_version) {
/* Versions end here */
mem_heap_free(heap);
if (v_heap) {
mem_heap_free(v_heap);
}
return false;
}
clust_offsets = rec_get_offsets(prev_version, clust_index,
NULL,
clust_index->n_core_fields,
ULINT_UNDEFINED, &heap);
if (dict_index_has_virtual(index)) {
if (vrow) {
if (dtuple_vcol_data_missing(*vrow, *index)) {
goto nochange_index;
}
/* Keep the virtual row info for the next
version, unless it is changed */
mem_heap_empty(v_heap);
cur_vrow = dtuple_copy(vrow, v_heap);
dtuple_dup_v_fld(cur_vrow, v_heap);
}
if (!cur_vrow) {
/* Nothing for this index has changed,
continue */
nochange_index:
version = prev_version;
continue;
}
}
if (!rec_get_deleted_flag(prev_version,
rec_offs_comp(clust_offsets))) {
row_ext_t* ext;
/* The stack of versions is locked by mtr.
Thus, it is safe to fetch the prefixes for
externally stored columns. */
row = row_build(ROW_COPY_POINTERS, clust_index,
prev_version, clust_offsets,
NULL, NULL, NULL, &ext, heap);
if (dict_index_has_virtual(index)) {
ut_ad(cur_vrow);
ut_ad(row->n_v_fields == cur_vrow->n_v_fields);
dtuple_copy_v_fields(row, cur_vrow);
}
entry = row_build_index_entry(row, ext, index, heap);
/* If entry == NULL, the record contains unset
BLOB pointers. This must be a freshly
inserted record that we can safely ignore.
For the justification, see the comments after
the previous row_build_index_entry() call. */
/* NOTE that we cannot do the comparison as binary
fields because maybe the secondary index record has
already been updated to a different binary value in
a char field, but the collation identifies the old
and new value anyway! */
if (entry && !dtuple_coll_cmp(ientry, entry)) {
goto unsafe_to_purge;
}
}
version = prev_version;
}
}
/** Determines if it is possible to remove a secondary index entry. /** Determines if it is possible to remove a secondary index entry.
Removal is possible if the secondary index entry does not refer to any Removal is possible if the secondary index entry does not refer to any
not delete marked version of a clustered index record where DB_TRX_ID not delete marked version of a clustered index record where DB_TRX_ID
@ -284,66 +711,45 @@ inserts a record that the secondary index entry would refer to.
However, in that case, the user transaction would also re-insert the However, in that case, the user transaction would also re-insert the
secondary index entry after purge has removed it and released the leaf secondary index entry after purge has removed it and released the leaf
page latch. page latch.
@param[in,out] node row purge node @param node row purge node
@param[in] index secondary index @param index secondary index
@param[in] entry secondary index entry @param entry secondary index entry
@param[in,out] sec_pcur secondary index cursor or NULL @param mtr mini-transaction for looking up clustered index
if it is called for purge buffering @return whether the secondary index record can be purged */
operation. bool row_purge_poss_sec(purge_node_t *node, dict_index_t *index,
@param[in,out] sec_mtr mini-transaction which holds const dtuple_t *entry, mtr_t *mtr)
secondary index entry or NULL if it is
called for purge buffering operation.
@param[in] is_tree true=pessimistic purge,
false=optimistic (leaf-page only)
@return true if the secondary index record can be purged */
bool
row_purge_poss_sec(
purge_node_t* node,
dict_index_t* index,
const dtuple_t* entry,
btr_pcur_t* sec_pcur,
mtr_t* sec_mtr,
bool is_tree)
{ {
bool can_delete; ut_ad(!index->is_clust());
mtr_t mtr; const auto savepoint= mtr->get_savepoint();
bool can_delete= !row_purge_reposition_pcur(BTR_SEARCH_LEAF, node, mtr);
ut_ad(!dict_index_is_clust(index)); if (!can_delete)
{
mtr_start(&mtr); ut_ad(node->pcur.pos_state == BTR_PCUR_IS_POSITIONED);
can_delete= !row_purge_is_unsafe(*node, index, entry, mtr);
can_delete = !row_purge_reposition_pcur(BTR_SEARCH_LEAF, node, &mtr) node->pcur.pos_state = BTR_PCUR_WAS_POSITIONED;
|| !row_vers_old_has_index_entry(true, node->pcur.latch_mode= BTR_NO_LATCHES;
btr_pcur_get_rec(&node->pcur),
&mtr, index, entry,
node->roll_ptr, node->trx_id);
/* Persistent cursor is closed if reposition fails. */
if (node->found_clust) {
btr_pcur_commit_specify_mtr(&node->pcur, &mtr);
} else {
mtr.commit();
} }
ut_ad(mtr.has_committed()); mtr->rollback_to_savepoint(savepoint);
return can_delete; return can_delete;
} }
/*************************************************************** __attribute__((nonnull, warn_unused_result))
Removes a secondary index entry if possible, by modifying the /** Remove a secondary index entry if possible, by modifying the index tree.
index tree. Does not try to buffer the delete. @param node purge node
@return TRUE if success or if not found */ @param index secondary index
static MY_ATTRIBUTE((nonnull, warn_unused_result)) @param entry index entry
ibool @param page_max_trx_id the PAGE_MAX_TRX_ID
row_purge_remove_sec_if_poss_tree( when row_purge_remove_sec_if_poss_leaf() was invoked
/*==============================*/ @return whether the operation succeeded */
purge_node_t* node, /*!< in: row purge node */ static bool row_purge_remove_sec_if_poss_tree(purge_node_t *node,
dict_index_t* index, /*!< in: index */ dict_index_t *index,
const dtuple_t* entry) /*!< in: index entry */ const dtuple_t *entry,
trx_id_t page_max_trx_id)
{ {
btr_pcur_t pcur; btr_pcur_t pcur;
ibool success = TRUE; bool success = true;
dberr_t err; dberr_t err;
mtr_t mtr; mtr_t mtr;
@ -389,7 +795,9 @@ row_purge_remove_sec_if_poss_tree(
we should do nothing. */ we should do nothing. */
found: found:
if (row_purge_poss_sec(node, index, entry, &pcur, &mtr, true)) { if (page_max_trx_id
== page_get_max_trx_id(btr_cur_get_page(&pcur.btr_cur))
|| row_purge_poss_sec(node, index, entry, &mtr)) {
/* Remove the index record, which should have been /* Remove the index record, which should have been
marked for deletion. */ marked for deletion. */
@ -428,26 +836,23 @@ found:
func_exit: func_exit:
btr_pcur_close(&pcur); // FIXME: need this? btr_pcur_close(&pcur); // FIXME: need this?
mtr.commit(); mtr.commit();
return success;
return(success);
} }
/*************************************************************** __attribute__((nonnull, warn_unused_result))
Removes a secondary index entry without modifying the index tree, /** Remove a secondary index entry if possible, without modifying the tree.
if possible. @param node purge node
@retval true if success or if not found @param index secondary index
@retval false if row_purge_remove_sec_if_poss_tree() should be invoked */ @param entry index entry
static MY_ATTRIBUTE((nonnull, warn_unused_result)) @return PAGE_MAX_TRX_ID for row_purge_remove_sec_if_poss_tree()
bool @retval 0 if success or if not found */
row_purge_remove_sec_if_poss_leaf( static trx_id_t row_purge_remove_sec_if_poss_leaf(purge_node_t *node,
/*==============================*/ dict_index_t *index,
purge_node_t* node, /*!< in: row purge node */ const dtuple_t *entry)
dict_index_t* index, /*!< in: index */
const dtuple_t* entry) /*!< in: index entry */
{ {
mtr_t mtr; mtr_t mtr;
btr_pcur_t pcur; btr_pcur_t pcur;
bool success = true; trx_id_t page_max_trx_id = 0;
log_free_check(); log_free_check();
ut_ad(index->table == node->table); ut_ad(index->table == node->table);
@ -478,7 +883,7 @@ row_purge_remove_sec_if_poss_leaf(
found: found:
/* Before attempting to purge a record, check /* Before attempting to purge a record, check
if it is safe to do so. */ if it is safe to do so. */
if (row_purge_poss_sec(node, index, entry, &pcur, &mtr, false)) { if (row_purge_poss_sec(node, index, entry, &mtr)) {
btr_cur_t* btr_cur = btr_pcur_get_btr_cur(&pcur); btr_cur_t* btr_cur = btr_pcur_get_btr_cur(&pcur);
/* Only delete-marked records should be purged. */ /* Only delete-marked records should be purged. */
@ -526,8 +931,11 @@ found:
} }
} }
success = btr_cur_optimistic_delete(btr_cur, 0, &mtr) if (btr_cur_optimistic_delete(btr_cur, 0, &mtr)
!= DB_FAIL; == DB_FAIL) {
page_max_trx_id = page_get_max_trx_id(
btr_cur_get_page(btr_cur));
}
} }
/* (The index entry is still needed, /* (The index entry is still needed,
@ -539,15 +947,15 @@ found:
/* The deletion was buffered. */ /* The deletion was buffered. */
case ROW_NOT_FOUND: case ROW_NOT_FOUND:
/* The index entry does not exist, nothing to do. */ /* The index entry does not exist, nothing to do. */
goto func_exit;
}
ut_ad("invalid state" == 0);
func_exit: func_exit:
mtr.commit(); mtr.commit();
cleanup: cleanup:
btr_pcur_close(&pcur); // FIXME: do we need these? when is btr_cur->rtr_info set? btr_pcur_close(&pcur); // FIXME: remove? when is btr_cur->rtr_info set?
return(success); return page_max_trx_id;
}
ut_error;
return(false);
} }
/***********************************************************//** /***********************************************************//**
@ -560,38 +968,21 @@ row_purge_remove_sec_if_poss(
dict_index_t* index, /*!< in: index */ dict_index_t* index, /*!< in: index */
const dtuple_t* entry) /*!< in: index entry */ const dtuple_t* entry) /*!< in: index entry */
{ {
ibool success; if (UNIV_UNLIKELY(!entry))
ulint n_tries = 0; /* The node->row must have lacked some fields of this index. This
is possible when the undo log record was written before this index
/* fputs("Purge: Removing secondary record\n", stderr); */ was created. */
if (!entry) {
/* The node->row must have lacked some fields of this
index. This is possible when the undo log record was
written before this index was created. */
return; return;
}
if (row_purge_remove_sec_if_poss_leaf(node, index, entry)) { if (trx_id_t page_max_trx_id=
row_purge_remove_sec_if_poss_leaf(node, index, entry))
return; for (auto n_tries= BTR_CUR_RETRY_DELETE_N_TIMES;
} !row_purge_remove_sec_if_poss_tree(node, index, entry,
retry: page_max_trx_id);
success = row_purge_remove_sec_if_poss_tree(node, index, entry); std::this_thread::sleep_for(BTR_CUR_RETRY_SLEEP_TIME))
/* The delete operation may fail if we have little /* The delete operation may fail if we have little
file space left: TODO: easiest to crash the database file space left (if innodb_file_per_table=0?) */
and restart with more file space */ ut_a(--n_tries);
if (!success && n_tries < BTR_CUR_RETRY_DELETE_N_TIMES) {
n_tries++;
std::this_thread::sleep_for(BTR_CUR_RETRY_SLEEP_TIME);
goto retry;
}
ut_a(success);
} }
/***********************************************************//** /***********************************************************//**

View file

@ -6614,7 +6614,7 @@ rec_loop:
err= trx_undo_prev_version_build(clust_rec, err= trx_undo_prev_version_build(clust_rec,
clust_index, clust_offsets, clust_index, clust_offsets,
vers_heap, &old_vers, vers_heap, &old_vers,
nullptr, nullptr, 0); &mtr, 0, nullptr, nullptr);
if (prev_heap) if (prev_heap)
mem_heap_free(prev_heap); mem_heap_free(prev_heap);
if (err != DB_SUCCESS) if (err != DB_SUCCESS)

View file

@ -470,6 +470,144 @@ func_exit:
return(err); return(err);
} }
/** Find out if an accessible version of a clustered index record
corresponds to a secondary index entry.
@param rec record in a latched clustered index page
@param index secondary index
@param ientry secondary index entry
@param mtr mini-transaction
@return whether an accessible non-dete-marked version of rec
corresponds to ientry */
static bool row_undo_mod_sec_is_unsafe(const rec_t *rec, dict_index_t *index,
const dtuple_t *ientry, mtr_t *mtr)
{
const rec_t* version;
rec_t* prev_version;
dict_index_t* clust_index;
rec_offs* clust_offsets;
mem_heap_t* heap;
mem_heap_t* heap2;
dtuple_t* row;
const dtuple_t* entry;
ulint comp;
dtuple_t* vrow = NULL;
mem_heap_t* v_heap = NULL;
dtuple_t* cur_vrow = NULL;
clust_index = dict_table_get_first_index(index->table);
comp = page_rec_is_comp(rec);
ut_ad(!dict_table_is_comp(index->table) == !comp);
heap = mem_heap_create(1024);
clust_offsets = rec_get_offsets(rec, clust_index, NULL,
clust_index->n_core_fields,
ULINT_UNDEFINED, &heap);
if (dict_index_has_virtual(index)) {
v_heap = mem_heap_create(100);
/* The current cluster index record could be
deleted, but the previous version of it might not. We will
need to get the virtual column data from undo record
associated with current cluster index */
cur_vrow = row_vers_build_cur_vrow(
rec, clust_index, &clust_offsets,
index, 0, 0, heap, v_heap, mtr);
}
version = rec;
for (;;) {
heap2 = heap;
heap = mem_heap_create(1024);
vrow = NULL;
trx_undo_prev_version_build(version,
clust_index, clust_offsets,
heap, &prev_version,
mtr, TRX_UNDO_CHECK_PURGEABILITY,
nullptr,
dict_index_has_virtual(index)
? &vrow : nullptr);
mem_heap_free(heap2); /* free version and clust_offsets */
if (!prev_version) {
break;
}
clust_offsets = rec_get_offsets(prev_version, clust_index,
NULL,
clust_index->n_core_fields,
ULINT_UNDEFINED, &heap);
if (dict_index_has_virtual(index)) {
if (vrow) {
if (dtuple_vcol_data_missing(*vrow, *index)) {
goto nochange_index;
}
/* Keep the virtual row info for the next
version, unless it is changed */
mem_heap_empty(v_heap);
cur_vrow = dtuple_copy(vrow, v_heap);
dtuple_dup_v_fld(cur_vrow, v_heap);
}
if (!cur_vrow) {
/* Nothing for this index has changed,
continue */
nochange_index:
version = prev_version;
continue;
}
}
if (!rec_get_deleted_flag(prev_version, comp)) {
row_ext_t* ext;
/* The stack of versions is locked by mtr.
Thus, it is safe to fetch the prefixes for
externally stored columns. */
row = row_build(ROW_COPY_POINTERS, clust_index,
prev_version, clust_offsets,
NULL, NULL, NULL, &ext, heap);
if (dict_index_has_virtual(index)) {
ut_ad(cur_vrow);
ut_ad(row->n_v_fields == cur_vrow->n_v_fields);
dtuple_copy_v_fields(row, cur_vrow);
}
entry = row_build_index_entry(row, ext, index, heap);
/* If entry == NULL, the record contains unset
BLOB pointers. This must be a freshly
inserted record that we can safely ignore.
For the justification, see the comments after
the previous row_build_index_entry() call. */
/* NOTE that we cannot do the comparison as binary
fields because maybe the secondary index record has
already been updated to a different binary value in
a char field, but the collation identifies the old
and new value anyway! */
if (entry && !dtuple_coll_cmp(ientry, entry)) {
break;
}
}
version = prev_version;
}
mem_heap_free(heap);
if (v_heap) {
mem_heap_free(v_heap);
}
return !!prev_version;
}
/***********************************************************//** /***********************************************************//**
Delete marks or removes a secondary index entry if found. Delete marks or removes a secondary index entry if found.
@return DB_SUCCESS, DB_FAIL, or DB_OUT_OF_FILE_SPACE */ @return DB_SUCCESS, DB_FAIL, or DB_OUT_OF_FILE_SPACE */
@ -488,7 +626,6 @@ row_undo_mod_del_mark_or_remove_sec_low(
btr_cur_t* btr_cur; btr_cur_t* btr_cur;
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
mtr_t mtr; mtr_t mtr;
mtr_t mtr_vers;
const bool modify_leaf = mode == BTR_MODIFY_LEAF; const bool modify_leaf = mode == BTR_MODIFY_LEAF;
row_mtr_start(&mtr, index, !modify_leaf); row_mtr_start(&mtr, index, !modify_leaf);
@ -555,17 +692,14 @@ found:
which cannot be purged yet, requires its existence. If some requires, which cannot be purged yet, requires its existence. If some requires,
we should delete mark the record. */ we should delete mark the record. */
mtr_vers.start(); ut_a(node->pcur.restore_position(BTR_SEARCH_LEAF, &mtr) ==
ut_a(node->pcur.restore_position(BTR_SEARCH_LEAF, &mtr_vers) ==
btr_pcur_t::SAME_ALL); btr_pcur_t::SAME_ALL);
/* For temporary table, we can skip to check older version of /* For temporary table, we can skip to check older version of
clustered index entry, because there is no MVCC or purge. */ clustered index entry, because there is no MVCC or purge. */
if (node->table->is_temporary() if (node->table->is_temporary()
|| row_vers_old_has_index_entry( || row_undo_mod_sec_is_unsafe(
false, btr_pcur_get_rec(&node->pcur), btr_pcur_get_rec(&node->pcur), index, entry, &mtr)) {
&mtr_vers, index, entry, 0, 0)) {
btr_rec_set_deleted<true>(btr_cur_get_block(btr_cur), btr_rec_set_deleted<true>(btr_cur_get_block(btr_cur),
btr_cur_get_rec(btr_cur), &mtr); btr_cur_get_rec(btr_cur), &mtr);
} else { } else {
@ -599,7 +733,9 @@ found:
} }
} }
btr_pcur_commit_specify_mtr(&(node->pcur), &mtr_vers); ut_ad(node->pcur.pos_state == BTR_PCUR_IS_POSITIONED);
node->pcur.pos_state = BTR_PCUR_WAS_POSITIONED;
node->pcur.latch_mode = BTR_NO_LATCHES;
func_exit: func_exit:
btr_pcur_close(&pcur); btr_pcur_close(&pcur);

View file

@ -702,7 +702,7 @@ fetch; output: fetched length of the prefix
@param[in,out] heap heap where to allocate @param[in,out] heap heap where to allocate
@return BLOB prefix @return BLOB prefix
@retval NULL if the record is incomplete (should only happen @retval NULL if the record is incomplete (should only happen
in row_vers_vc_matches_cluster() executed concurrently with another purge) */ in row_purge_vc_matches_cluster() executed concurrently with another purge) */
static static
byte* byte*
row_upd_ext_fetch( row_upd_ext_fetch(

View file

@ -194,8 +194,8 @@ row_vers_impl_x_locked_low(
trx_undo_prev_version_build( trx_undo_prev_version_build(
version, clust_index, clust_offsets, version, clust_index, clust_offsets,
heap, &prev_version, NULL, heap, &prev_version, mtr, 0, NULL,
dict_index_has_virtual(index) ? &vrow : NULL, 0); dict_index_has_virtual(index) ? &vrow : NULL);
ut_d(trx->mutex_lock()); ut_d(trx->mutex_lock());
const bool committed = trx_state_eq( const bool committed = trx_state_eq(
@ -446,7 +446,6 @@ row_vers_impl_x_locked(
@param[in] clust_index clustered index @param[in] clust_index clustered index
@param[in] index the secondary index @param[in] index the secondary index
@param[in] heap heap used to build virtual dtuple. */ @param[in] heap heap used to build virtual dtuple. */
static
bool bool
row_vers_build_clust_v_col( row_vers_build_clust_v_col(
dtuple_t* row, dtuple_t* row,
@ -490,26 +489,25 @@ row_vers_build_clust_v_col(
} }
/** Build latest virtual column data from undo log /** Build latest virtual column data from undo log
@param[in] in_purge whether this is the purge thread
@param[in] rec clustered index record @param[in] rec clustered index record
@param[in] clust_index clustered index @param[in] clust_index clustered index
@param[in,out] clust_offsets offsets on the clustered index record @param[in,out] clust_offsets offsets on the clustered index record
@param[in] index the secondary index @param[in] index the secondary index
@param[in] trx_id transaction ID on the purging record,
or 0 if called outside purge
@param[in] roll_ptr the rollback pointer for the purging record @param[in] roll_ptr the rollback pointer for the purging record
@param[in] trx_id trx id for the purging record
@param[in,out] v_heap heap used to build vrow @param[in,out] v_heap heap used to build vrow
@param[out] v_row dtuple holding the virtual rows @param[out] v_row dtuple holding the virtual rows
@param[in,out] mtr mtr holding the latch on rec */ @param[in,out] mtr mtr holding the latch on rec */
static static
void void
row_vers_build_cur_vrow_low( row_vers_build_cur_vrow_low(
bool in_purge,
const rec_t* rec, const rec_t* rec,
dict_index_t* clust_index, dict_index_t* clust_index,
rec_offs* clust_offsets, rec_offs* clust_offsets,
dict_index_t* index, dict_index_t* index,
roll_ptr_t roll_ptr,
trx_id_t trx_id, trx_id_t trx_id,
roll_ptr_t roll_ptr,
mem_heap_t* v_heap, mem_heap_t* v_heap,
dtuple_t** vrow, dtuple_t** vrow,
mtr_t* mtr) mtr_t* mtr)
@ -539,7 +537,7 @@ row_vers_build_cur_vrow_low(
/* If this is called by purge thread, set TRX_UNDO_PREV_IN_PURGE /* If this is called by purge thread, set TRX_UNDO_PREV_IN_PURGE
bit to search the undo log until we hit the current undo log with bit to search the undo log until we hit the current undo log with
roll_ptr */ roll_ptr */
const ulint status = in_purge const ulint status = trx_id
? TRX_UNDO_PREV_IN_PURGE | TRX_UNDO_GET_OLD_V_VALUE ? TRX_UNDO_PREV_IN_PURGE | TRX_UNDO_GET_OLD_V_VALUE
: TRX_UNDO_GET_OLD_V_VALUE; : TRX_UNDO_GET_OLD_V_VALUE;
@ -551,7 +549,7 @@ row_vers_build_cur_vrow_low(
trx_undo_prev_version_build( trx_undo_prev_version_build(
version, clust_index, clust_offsets, version, clust_index, clust_offsets,
heap, &prev_version, NULL, vrow, status); heap, &prev_version, mtr, status, nullptr, vrow);
if (heap2) { if (heap2) {
mem_heap_free(heap2); mem_heap_free(heap2);
@ -603,212 +601,27 @@ row_vers_build_cur_vrow_low(
mem_heap_free(heap); mem_heap_free(heap);
} }
/** Check a virtual column value index secondary virtual index matches
that of current cluster index record, which is recreated from information
stored in undo log
@param[in] rec record in the clustered index
@param[in] icentry the index entry built from a cluster row
@param[in] clust_index cluster index
@param[in] clust_offsets offsets on the cluster record
@param[in] index the secondary index
@param[in] ientry the secondary index entry
@param[in] roll_ptr the rollback pointer for the purging record
@param[in] trx_id trx id for the purging record
@param[in,out] v_heap heap used to build virtual dtuple
@param[in,out] v_row dtuple holding the virtual rows (if needed)
@param[in] mtr mtr holding the latch on rec
@return true if matches, false otherwise */
static
bool
row_vers_vc_matches_cluster(
const rec_t* rec,
const dtuple_t* icentry,
dict_index_t* clust_index,
rec_offs* clust_offsets,
dict_index_t* index,
const dtuple_t* ientry,
roll_ptr_t roll_ptr,
trx_id_t trx_id,
mem_heap_t* v_heap,
dtuple_t** vrow,
mtr_t* mtr)
{
const rec_t* version;
rec_t* prev_version;
mem_heap_t* heap2;
mem_heap_t* heap = NULL;
mem_heap_t* tuple_heap;
ulint num_v = dict_table_get_n_v_cols(index->table);
bool compare[REC_MAX_N_FIELDS];
ulint n_fields = dtuple_get_n_fields(ientry);
ulint n_non_v_col = 0;
ulint n_cmp_v_col = 0;
const dfield_t* field1;
dfield_t* field2;
ulint i;
/* First compare non-virtual columns (primary keys) */
ut_ad(index->n_fields == n_fields);
ut_ad(n_fields == dtuple_get_n_fields(icentry));
ut_ad(mtr->memo_contains_page_flagged(rec,
MTR_MEMO_PAGE_S_FIX
| MTR_MEMO_PAGE_X_FIX));
{
const dfield_t* a = ientry->fields;
const dfield_t* b = icentry->fields;
for (const dict_field_t *ifield = index->fields,
*const end = &index->fields[index->n_fields];
ifield != end; ifield++, a++, b++) {
if (!ifield->col->is_virtual()) {
if (cmp_dfield_dfield(a, b)) {
return false;
}
n_non_v_col++;
}
}
}
tuple_heap = mem_heap_create(1024);
ut_ad(n_fields > n_non_v_col);
*vrow = dtuple_create_with_vcol(v_heap ? v_heap : tuple_heap, 0, num_v);
dtuple_init_v_fld(*vrow);
for (i = 0; i < num_v; i++) {
dfield_get_type(dtuple_get_nth_v_field(*vrow, i))->mtype
= DATA_MISSING;
compare[i] = false;
}
version = rec;
while (n_cmp_v_col < n_fields - n_non_v_col) {
heap2 = heap;
heap = mem_heap_create(1024);
roll_ptr_t cur_roll_ptr = row_get_rec_roll_ptr(
version, clust_index, clust_offsets);
ut_ad(cur_roll_ptr != 0);
ut_ad(roll_ptr != 0);
trx_undo_prev_version_build(
version, clust_index, clust_offsets,
heap, &prev_version, NULL, vrow,
TRX_UNDO_PREV_IN_PURGE | TRX_UNDO_GET_OLD_V_VALUE);
if (heap2) {
mem_heap_free(heap2);
}
if (!prev_version) {
/* Versions end here */
goto func_exit;
}
clust_offsets = rec_get_offsets(prev_version, clust_index,
NULL,
clust_index->n_core_fields,
ULINT_UNDEFINED, &heap);
ulint entry_len = dict_index_get_n_fields(index);
for (i = 0; i < entry_len; i++) {
const dict_field_t* ind_field
= dict_index_get_nth_field(index, i);
const dict_col_t* col = ind_field->col;
field1 = dtuple_get_nth_field(ientry, i);
if (!col->is_virtual()) {
continue;
}
const dict_v_col_t* v_col
= reinterpret_cast<const dict_v_col_t*>(col);
field2
= dtuple_get_nth_v_field(*vrow, v_col->v_pos);
if ((dfield_get_type(field2)->mtype != DATA_MISSING)
&& (!compare[v_col->v_pos])) {
if (ind_field->prefix_len != 0
&& !dfield_is_null(field2)) {
field2->len = unsigned(
dtype_get_at_most_n_mbchars(
field2->type.prtype,
field2->type.mbminlen,
field2->type.mbmaxlen,
ind_field->prefix_len,
field2->len,
static_cast<char*>
(field2->data)));
}
/* The index field mismatch */
if (v_heap
|| cmp_dfield_dfield(field2, field1) != 0) {
if (v_heap) {
dtuple_dup_v_fld(*vrow, v_heap);
}
mem_heap_free(tuple_heap);
mem_heap_free(heap);
return(false);
}
compare[v_col->v_pos] = true;
n_cmp_v_col++;
}
}
trx_id_t rec_trx_id = row_get_rec_trx_id(
prev_version, clust_index, clust_offsets);
if (rec_trx_id < trx_id || roll_ptr == cur_roll_ptr) {
break;
}
version = prev_version;
}
func_exit:
if (n_cmp_v_col == 0) {
*vrow = NULL;
}
mem_heap_free(tuple_heap);
mem_heap_free(heap);
/* FIXME: In the case of n_cmp_v_col is not the same as
n_fields - n_non_v_col, callback is needed to compare the rest
columns. At the timebeing, we will need to return true */
return (true);
}
/** Build a dtuple contains virtual column data for current cluster index /** Build a dtuple contains virtual column data for current cluster index
@param[in] in_purge called by purge thread @param[in] in_purge called by purge thread
@param[in] rec cluster index rec @param[in] rec cluster index rec
@param[in] clust_index cluster index @param[in] clust_index cluster index
@param[in] clust_offsets cluster rec offset @param[in] clust_offsets cluster rec offset
@param[in] index secondary index @param[in] index secondary index
@param[in] trx_id transaction ID on the purging record,
or 0 if called outside purge
@param[in] roll_ptr roll_ptr for the purge record @param[in] roll_ptr roll_ptr for the purge record
@param[in] trx_id transaction ID on the purging record
@param[in,out] heap heap memory @param[in,out] heap heap memory
@param[in,out] v_heap heap memory to keep virtual colum dtuple @param[in,out] v_heap heap memory to keep virtual column tuple
@param[in] mtr mtr holding the latch on rec @param[in,out] mtr mini-transaction
@return dtuple contains virtual column data */ @return dtuple contains virtual column data */
static
dtuple_t* dtuple_t*
row_vers_build_cur_vrow( row_vers_build_cur_vrow(
bool in_purge,
const rec_t* rec, const rec_t* rec,
dict_index_t* clust_index, dict_index_t* clust_index,
rec_offs** clust_offsets, rec_offs** clust_offsets,
dict_index_t* index, dict_index_t* index,
roll_ptr_t roll_ptr,
trx_id_t trx_id, trx_id_t trx_id,
roll_ptr_t roll_ptr,
mem_heap_t* heap, mem_heap_t* heap,
mem_heap_t* v_heap, mem_heap_t* v_heap,
mtr_t* mtr) mtr_t* mtr)
@ -841,8 +654,8 @@ row_vers_build_cur_vrow(
} else { } else {
/* Try to fetch virtual column data from undo log */ /* Try to fetch virtual column data from undo log */
row_vers_build_cur_vrow_low( row_vers_build_cur_vrow_low(
in_purge, rec, clust_index, *clust_offsets, rec, clust_index, *clust_offsets,
index, roll_ptr, trx_id, v_heap, &cur_vrow, mtr); index, trx_id, roll_ptr, v_heap, &cur_vrow, mtr);
} }
*clust_offsets = rec_get_offsets(rec, clust_index, NULL, *clust_offsets = rec_get_offsets(rec, clust_index, NULL,
@ -856,292 +669,23 @@ for indexed virtual column.
@param tuple data tuple @param tuple data tuple
@param index virtual index @param index virtual index
@return true if tuple has missing column type */ @return true if tuple has missing column type */
static bool dtuple_vcol_data_missing(const dtuple_t &tuple, bool dtuple_vcol_data_missing(const dtuple_t &tuple,
dict_index_t *index) const dict_index_t &index)
{ {
for (ulint i= 0; i < index->n_uniq; i++) for (ulint i= 0; i < index.n_uniq; i++)
{ {
dict_col_t *col= index->fields[i].col; dict_col_t *col= index.fields[i].col;
if (!col->is_virtual()) if (!col->is_virtual())
continue; continue;
dict_v_col_t *vcol= reinterpret_cast<dict_v_col_t*>(col); dict_v_col_t *vcol= reinterpret_cast<dict_v_col_t*>(col);
for (ulint j= 0; j < index->table->n_v_cols; j++) for (ulint j= 0; j < index.table->n_v_cols; j++)
{ if (vcol == &index.table->v_cols[j] &&
if (vcol == &index->table->v_cols[j] tuple.v_fields[j].type.mtype == DATA_MISSING)
&& tuple.v_fields[j].type.mtype == DATA_MISSING)
return true; return true;
} }
}
return false; return false;
} }
/** Finds out if a version of the record, where the version >= the current
purge_sys.view, should have ientry as its secondary index entry. We check
if there is any not delete marked version of the record where the trx
id >= purge view, and the secondary index entry == ientry; exactly in
this case we return TRUE.
@param[in] also_curr TRUE if also rec is included in the versions
to search; otherwise only versions prior
to it are searched
@param[in] rec record in the clustered index; the caller
must have a latch on the page
@param[in] mtr mtr holding the latch on rec; it will
also hold the latch on purge_view
@param[in] index secondary index
@param[in] ientry secondary index entry
@param[in] roll_ptr roll_ptr for the purge record
@param[in] trx_id transaction ID on the purging record
@return TRUE if earlier version should have */
bool
row_vers_old_has_index_entry(
bool also_curr,
const rec_t* rec,
mtr_t* mtr,
dict_index_t* index,
const dtuple_t* ientry,
roll_ptr_t roll_ptr,
trx_id_t trx_id)
{
const rec_t* version;
rec_t* prev_version;
dict_index_t* clust_index;
rec_offs* clust_offsets;
mem_heap_t* heap;
mem_heap_t* heap2;
dtuple_t* row;
const dtuple_t* entry;
ulint comp;
dtuple_t* vrow = NULL;
mem_heap_t* v_heap = NULL;
dtuple_t* cur_vrow = NULL;
ut_ad(mtr->memo_contains_page_flagged(rec, MTR_MEMO_PAGE_X_FIX
| MTR_MEMO_PAGE_S_FIX));
clust_index = dict_table_get_first_index(index->table);
comp = page_rec_is_comp(rec);
ut_ad(!dict_table_is_comp(index->table) == !comp);
heap = mem_heap_create(1024);
clust_offsets = rec_get_offsets(rec, clust_index, NULL,
clust_index->n_core_fields,
ULINT_UNDEFINED, &heap);
if (dict_index_has_virtual(index)) {
v_heap = mem_heap_create(100);
}
DBUG_EXECUTE_IF("ib_purge_virtual_index_crash",
DBUG_SUICIDE(););
if (also_curr && !rec_get_deleted_flag(rec, comp)) {
row_ext_t* ext;
/* The top of the stack of versions is locked by the
mtr holding a latch on the page containing the
clustered index record. The bottom of the stack is
locked by the fact that the purge_sys.view must
'overtake' any read view of an active transaction.
Thus, it is safe to fetch the prefixes for
externally stored columns. */
row = row_build(ROW_COPY_POINTERS, clust_index,
rec, clust_offsets,
NULL, NULL, NULL, &ext, heap);
if (dict_index_has_virtual(index)) {
#ifdef DBUG_OFF
# define dbug_v_purge false
#else /* DBUG_OFF */
bool dbug_v_purge = false;
#endif /* DBUG_OFF */
DBUG_EXECUTE_IF(
"ib_purge_virtual_index_callback",
dbug_v_purge = true;);
roll_ptr_t t_roll_ptr = row_get_rec_roll_ptr(
rec, clust_index, clust_offsets);
/* if the row is newly inserted, then the virtual
columns need to be computed */
if (trx_undo_roll_ptr_is_insert(t_roll_ptr)
|| dbug_v_purge) {
if (!row_vers_build_clust_v_col(
row, clust_index, index, heap)) {
goto unsafe_to_purge;
}
entry = row_build_index_entry(
row, ext, index, heap);
if (entry && !dtuple_coll_cmp(ientry, entry)) {
goto unsafe_to_purge;
}
} else {
/* Build index entry out of row */
entry = row_build_index_entry(row, ext, index, heap);
/* entry could only be NULL if
the clustered index record is an uncommitted
inserted record whose BLOBs have not been
written yet. The secondary index record
can be safely removed, because it cannot
possibly refer to this incomplete
clustered index record. (Insert would
always first be completed for the
clustered index record, then proceed to
secondary indexes.) */
if (entry && row_vers_vc_matches_cluster(
rec, entry,
clust_index, clust_offsets,
index, ientry, roll_ptr,
trx_id, NULL, &vrow, mtr)) {
goto unsafe_to_purge;
}
}
clust_offsets = rec_get_offsets(rec, clust_index, NULL,
clust_index
->n_core_fields,
ULINT_UNDEFINED, &heap);
} else {
entry = row_build_index_entry(
row, ext, index, heap);
/* If entry == NULL, the record contains unset BLOB
pointers. This must be a freshly inserted record. If
this is called from
row_purge_remove_sec_if_poss_low(), the thread will
hold latches on the clustered index and the secondary
index. Because the insert works in three steps:
(1) insert the record to clustered index
(2) store the BLOBs and update BLOB pointers
(3) insert records to secondary indexes
the purge thread can safely ignore freshly inserted
records and delete the secondary index record. The
thread that inserted the new record will be inserting
the secondary index records. */
/* NOTE that we cannot do the comparison as binary
fields because the row is maybe being modified so that
the clustered index record has already been updated to
a different binary value in a char field, but the
collation identifies the old and new value anyway! */
if (entry && !dtuple_coll_cmp(ientry, entry)) {
unsafe_to_purge:
mem_heap_free(heap);
if (v_heap) {
mem_heap_free(v_heap);
}
return true;
}
}
} else if (dict_index_has_virtual(index)) {
/* The current cluster index record could be
deleted, but the previous version of it might not. We will
need to get the virtual column data from undo record
associated with current cluster index */
cur_vrow = row_vers_build_cur_vrow(
also_curr, rec, clust_index, &clust_offsets,
index, roll_ptr, trx_id, heap, v_heap, mtr);
}
version = rec;
for (;;) {
heap2 = heap;
heap = mem_heap_create(1024);
vrow = NULL;
trx_undo_prev_version_build(version,
clust_index, clust_offsets,
heap, &prev_version, nullptr,
dict_index_has_virtual(index)
? &vrow : nullptr,
TRX_UNDO_CHECK_PURGEABILITY);
mem_heap_free(heap2); /* free version and clust_offsets */
if (!prev_version) {
/* Versions end here */
mem_heap_free(heap);
if (v_heap) {
mem_heap_free(v_heap);
}
return false;
}
clust_offsets = rec_get_offsets(prev_version, clust_index,
NULL,
clust_index->n_core_fields,
ULINT_UNDEFINED, &heap);
if (dict_index_has_virtual(index)) {
if (vrow) {
if (dtuple_vcol_data_missing(*vrow, index)) {
goto nochange_index;
}
/* Keep the virtual row info for the next
version, unless it is changed */
mem_heap_empty(v_heap);
cur_vrow = dtuple_copy(vrow, v_heap);
dtuple_dup_v_fld(cur_vrow, v_heap);
}
if (!cur_vrow) {
/* Nothing for this index has changed,
continue */
nochange_index:
version = prev_version;
continue;
}
}
if (!rec_get_deleted_flag(prev_version, comp)) {
row_ext_t* ext;
/* The stack of versions is locked by mtr.
Thus, it is safe to fetch the prefixes for
externally stored columns. */
row = row_build(ROW_COPY_POINTERS, clust_index,
prev_version, clust_offsets,
NULL, NULL, NULL, &ext, heap);
if (dict_index_has_virtual(index)) {
ut_ad(cur_vrow);
ut_ad(row->n_v_fields == cur_vrow->n_v_fields);
dtuple_copy_v_fields(row, cur_vrow);
}
entry = row_build_index_entry(row, ext, index, heap);
/* If entry == NULL, the record contains unset
BLOB pointers. This must be a freshly
inserted record that we can safely ignore.
For the justification, see the comments after
the previous row_build_index_entry() call. */
/* NOTE that we cannot do the comparison as binary
fields because maybe the secondary index record has
already been updated to a different binary value in
a char field, but the collation identifies the old
and new value anyway! */
if (entry && !dtuple_coll_cmp(ientry, entry)) {
goto unsafe_to_purge;
}
}
version = prev_version;
}
}
/*****************************************************************//** /*****************************************************************//**
Constructs the version of a clustered index record which a consistent Constructs the version of a clustered index record which a consistent
read should see. We assume that the trx id stored in rec is such that read should see. We assume that the trx id stored in rec is such that
@ -1208,7 +752,7 @@ row_vers_build_for_consistent_read(
err = trx_undo_prev_version_build( err = trx_undo_prev_version_build(
version, index, *offsets, heap, version, index, *offsets, heap,
&prev_version, NULL, vrow, 0); &prev_version, mtr, 0, NULL, vrow);
if (prev_heap != NULL) { if (prev_heap != NULL) {
mem_heap_free(prev_heap); mem_heap_free(prev_heap);
@ -1370,8 +914,8 @@ committed_version_trx:
heap = mem_heap_create(1024); heap = mem_heap_create(1024);
if (trx_undo_prev_version_build(version, index, *offsets, heap, if (trx_undo_prev_version_build(version, index, *offsets, heap,
&prev_version, in_heap, vrow, &prev_version, mtr, 0,
0) != DB_SUCCESS) { in_heap, vrow) != DB_SUCCESS) {
mem_heap_free(heap); mem_heap_free(heap);
heap = heap2; heap = heap2;
heap2 = NULL; heap2 = NULL;

View file

@ -776,26 +776,18 @@ not_free:
buf_block_t *purge_sys_t::get_page(page_id_t id) buf_block_t *purge_sys_t::get_page(page_id_t id)
{ {
ut_ad(!recv_sys.recovery_on);
buf_block_t*& undo_page= pages[id]; buf_block_t*& undo_page= pages[id];
if (undo_page) if (!undo_page)
return undo_page;
mtr_t mtr;
mtr.start();
undo_page=
buf_page_get_gen(id, 0, RW_S_LATCH, nullptr, BUF_GET_POSSIBLY_FREED, &mtr);
if (UNIV_LIKELY(undo_page != nullptr))
{ {
undo_page->fix(); undo_page= buf_pool.page_fix(id); // batch_cleanup() will unfix()
mtr.commit(); if (!undo_page)
return undo_page; pages.erase(id);
} }
mtr.commit(); return undo_page;
pages.erase(id);
return nullptr;
} }
bool purge_sys_t::rseg_get_next_history_log() bool purge_sys_t::rseg_get_next_history_log()

View file

@ -2038,91 +2038,38 @@ err_exit:
/*============== BUILDING PREVIOUS VERSION OF A RECORD ===============*/ /*============== BUILDING PREVIOUS VERSION OF A RECORD ===============*/
/** Copy an undo record to heap. static dberr_t trx_undo_prev_version(const rec_t *rec, dict_index_t *index,
@param[in] roll_ptr roll pointer to a record that exists rec_offs *offsets, mem_heap_t *heap,
@param[in,out] heap memory heap where copied */ rec_t **old_vers, mem_heap_t *v_heap,
static dtuple_t **vrow, ulint v_status,
trx_undo_rec_t* const trx_undo_rec_t *undo_rec);
trx_undo_get_undo_rec_low(
roll_ptr_t roll_ptr, inline const buf_block_t *
mem_heap_t* heap) purge_sys_t::view_guard::get(const page_id_t id, mtr_t *mtr)
{ {
ulint rseg_id; buf_block_t *block;
uint32_t page_no; ut_ad(mtr->is_active());
uint16_t offset; if (!latch)
bool is_insert;
mtr_t mtr;
trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no, &offset);
ut_ad(page_no > FSP_FIRST_INODE_PAGE_NO);
ut_ad(offset >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
trx_rseg_t *rseg= &trx_sys.rseg_array[rseg_id];
ut_ad(rseg->is_persistent());
mtr.start();
trx_undo_rec_t *undo_rec= nullptr;
if (buf_block_t* undo_page=
buf_page_get(page_id_t(rseg->space->id, page_no), 0, RW_S_LATCH, &mtr))
{ {
buf_page_make_young_if_needed(&undo_page->page); decltype(purge_sys.pages)::const_iterator i= purge_sys.pages.find(id);
undo_rec= undo_page->page.frame + offset; if (i != purge_sys.pages.end())
const size_t end= mach_read_from_2(undo_rec);
if (UNIV_UNLIKELY(end <= offset ||
end >= srv_page_size - FIL_PAGE_DATA_END))
undo_rec= nullptr;
else
{ {
size_t len{end - offset}; block= i->second;
undo_rec= ut_ad(block);
static_cast<trx_undo_rec_t*>(mem_heap_dup(heap, undo_rec, len)); return block;
mach_write_to_2(undo_rec, len);
} }
} }
block= buf_pool.page_fix(id);
mtr.commit(); if (block)
return undo_rec;
}
/** Copy an undo record to heap, to check if a secondary index record
can be safely purged.
@param trx_id DB_TRX_ID corresponding to roll_ptr
@param name table name
@param roll_ptr DB_ROLL_PTR pointing to the undo log record
@param heap memory heap for allocation
@return copy of the record
@retval nullptr if the version is visible to purge_sys.view */
static trx_undo_rec_t *trx_undo_get_rec_if_purgeable(trx_id_t trx_id,
const table_name_t &name,
roll_ptr_t roll_ptr,
mem_heap_t* heap)
{
{ {
purge_sys_t::view_guard check; mtr->memo_push(block, MTR_MEMO_BUF_FIX);
if (!check.view().changes_visible(trx_id)) if (latch)
return trx_undo_get_undo_rec_low(roll_ptr, heap); /* In MVCC operations (outside purge tasks), we will refresh the
buf_pool.LRU position. In purge, we expect the page to be freed
soon, at the end of the current batch. */
buf_page_make_young_if_needed(&block->page);
} }
return nullptr; return block;
}
/** Copy an undo record to heap.
@param trx_id DB_TRX_ID corresponding to roll_ptr
@param name table name
@param roll_ptr DB_ROLL_PTR pointing to the undo log record
@param heap memory heap for allocation
@return copy of the record
@retval nullptr if the undo log is not available */
static trx_undo_rec_t *trx_undo_get_undo_rec(trx_id_t trx_id,
const table_name_t &name,
roll_ptr_t roll_ptr,
mem_heap_t *heap)
{
{
purge_sys_t::end_view_guard check;
if (!check.view().changes_visible(trx_id))
return trx_undo_get_undo_rec_low(roll_ptr, heap);
}
return nullptr;
} }
/** Build a previous version of a clustered index record. The caller /** Build a previous version of a clustered index record. The caller
@ -2130,78 +2077,89 @@ must hold a latch on the index page of the clustered index record.
@param rec version of a clustered index record @param rec version of a clustered index record
@param index clustered index @param index clustered index
@param offsets rec_get_offsets(rec, index) @param offsets rec_get_offsets(rec, index)
@param heap memory heap from which the memory needed is @param heap memory heap from which the memory needed is allocated
allocated @param old_vers previous version, or NULL if rec is the first inserted
@param old_vers previous version or NULL if rec is the version, or if history data has been deleted (an error),
first inserted version, or if history data or if the purge could have removed the version though
has been deleted (an error), or if the purge it has not yet done so
could have removed the version @param mtr mini-transaction
though it has not yet done so @param v_status TRX_UNDO_PREV_IN_PURGE, ...
@param v_heap memory heap used to create vrow @param v_heap memory heap used to create vrow dtuple if it is not yet
dtuple if it is not yet created. This heap created. This heap diffs from "heap" above in that it could be
diffs from "heap" above in that it could be
prebuilt->old_vers_heap for selection prebuilt->old_vers_heap for selection
@param v_row virtual column info, if any @param vrow virtual column info, if any
@param v_status status determine if it is going into this
function by purge thread or not.
And if we read "after image" of undo log
@param undo_block undo log block which was cached during
online dml apply or nullptr
@return error code @return error code
@retval DB_SUCCESS if previous version was successfully built, @retval DB_SUCCESS if previous version was successfully built,
or if it was an insert or the undo record refers to the table before rebuild or if it was an insert or the undo record refers to the table before rebuild
@retval DB_MISSING_HISTORY if the history is missing */ @retval DB_MISSING_HISTORY if the history is missing */
TRANSACTIONAL_TARGET TRANSACTIONAL_TARGET
dberr_t dberr_t trx_undo_prev_version_build(const rec_t *rec, dict_index_t *index,
trx_undo_prev_version_build( rec_offs *offsets, mem_heap_t *heap,
const rec_t *rec, rec_t **old_vers, mtr_t *mtr,
dict_index_t *index, ulint v_status,
rec_offs *offsets, mem_heap_t *v_heap, dtuple_t **vrow)
mem_heap_t *heap,
rec_t **old_vers,
mem_heap_t *v_heap,
dtuple_t **vrow,
ulint v_status)
{ {
dtuple_t* entry;
trx_id_t rec_trx_id;
undo_no_t undo_no;
table_id_t table_id;
trx_id_t trx_id;
roll_ptr_t roll_ptr;
upd_t* update;
byte type;
byte info_bits;
byte cmpl_info;
bool dummy_extern;
byte* buf;
ut_ad(!index->table->is_temporary()); ut_ad(!index->table->is_temporary());
ut_ad(rec_offs_validate(rec, index, offsets)); ut_ad(rec_offs_validate(rec, index, offsets));
roll_ptr = row_get_rec_roll_ptr(rec, index, offsets); const roll_ptr_t roll_ptr= row_get_rec_roll_ptr(rec, index, offsets);
*old_vers= nullptr;
*old_vers = NULL; if (trx_undo_roll_ptr_is_insert(roll_ptr))
if (trx_undo_roll_ptr_is_insert(roll_ptr)) {
/* The record rec is the first inserted version */ /* The record rec is the first inserted version */
return DB_SUCCESS; return DB_SUCCESS;
}
mariadb_increment_undo_records_read(); ut_ad(roll_ptr < 1ULL << 55);
rec_trx_id = row_get_rec_trx_id(rec, index, offsets); ut_ad(uint16_t(roll_ptr) >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
ut_ad(uint32_t(roll_ptr >> 16) >= FSP_FIRST_INODE_PAGE_NO);
const trx_id_t rec_trx_id= row_get_rec_trx_id(rec, index, offsets);
ut_ad(!index->table->skip_alter_undo); ut_ad(!index->table->skip_alter_undo);
trx_undo_rec_t* undo_rec = v_status == TRX_UNDO_CHECK_PURGEABILITY mariadb_increment_undo_records_read();
? trx_undo_get_rec_if_purgeable(rec_trx_id, index->table->name, const auto savepoint= mtr->get_savepoint();
roll_ptr, heap) dberr_t err= DB_MISSING_HISTORY;
: trx_undo_get_undo_rec(rec_trx_id, index->table->name, purge_sys_t::view_guard check{v_status == TRX_UNDO_CHECK_PURGE_PAGES
roll_ptr, heap); ? purge_sys_t::view_guard::PURGE
if (!undo_rec) { : v_status == TRX_UNDO_CHECK_PURGEABILITY
return DB_MISSING_HISTORY; ? purge_sys_t::view_guard::VIEW
: purge_sys_t::view_guard::END_VIEW};
if (!check.view().changes_visible(rec_trx_id))
{
trx_undo_rec_t *undo_rec= nullptr;
static_assert(ROLL_PTR_RSEG_ID_POS == 48, "");
static_assert(ROLL_PTR_PAGE_POS == 16, "");
if (const buf_block_t *undo_page=
check.get(page_id_t{trx_sys.rseg_array[(roll_ptr >> 48) & 0x7f].
space->id,
uint32_t(roll_ptr >> 16)}, mtr))
{
static_assert(ROLL_PTR_BYTE_POS == 0, "");
const uint16_t offset{uint16_t(roll_ptr)};
undo_rec= undo_page->page.frame + offset;
const size_t end= mach_read_from_2(undo_rec);
if (UNIV_UNLIKELY(end > offset &&
end < srv_page_size - FIL_PAGE_DATA_END))
err= trx_undo_prev_version(rec, index, offsets, heap,
old_vers, v_heap, vrow, v_status, undo_rec);
}
} }
mtr->rollback_to_savepoint(savepoint);
return err;
}
static dberr_t trx_undo_prev_version(const rec_t *rec, dict_index_t *index,
rec_offs *offsets, mem_heap_t *heap,
rec_t **old_vers, mem_heap_t *v_heap,
dtuple_t **vrow, ulint v_status,
const trx_undo_rec_t *undo_rec)
{
byte type, cmpl_info;
bool dummy_extern;
undo_no_t undo_no;
table_id_t table_id;
const byte *ptr = const byte *ptr =
trx_undo_rec_get_pars(undo_rec, &type, &cmpl_info, trx_undo_rec_get_pars(undo_rec, &type, &cmpl_info,
&dummy_extern, &undo_no, &table_id); &dummy_extern, &undo_no, &table_id);
@ -2213,6 +2171,10 @@ trx_undo_prev_version_build(
return DB_SUCCESS; return DB_SUCCESS;
} }
trx_id_t trx_id;
roll_ptr_t roll_ptr;
byte info_bits;
ptr = trx_undo_update_rec_get_sys_cols(ptr, &trx_id, &roll_ptr, ptr = trx_undo_update_rec_get_sys_cols(ptr, &trx_id, &roll_ptr,
&info_bits); &info_bits);
@ -2240,10 +2202,12 @@ trx_undo_prev_version_build(
ptr = trx_undo_rec_skip_row_ref(ptr, index); ptr = trx_undo_rec_skip_row_ref(ptr, index);
upd_t* update;
ptr = trx_undo_update_rec_get_update(ptr, index, type, trx_id, ptr = trx_undo_update_rec_get_update(ptr, index, type, trx_id,
roll_ptr, info_bits, roll_ptr, info_bits,
heap, &update); heap, &update);
ut_a(ptr); ut_a(ptr);
byte* buf;
if (row_upd_changes_field_size_or_external(index, offsets, update)) { if (row_upd_changes_field_size_or_external(index, offsets, update)) {
/* We should confirm the existence of disowned external data, /* We should confirm the existence of disowned external data,
@ -2269,9 +2233,10 @@ trx_undo_prev_version_build(
those fields that update updates to become externally stored those fields that update updates to become externally stored
fields. Store the info: */ fields. Store the info: */
entry = row_rec_to_index_entry(rec, index, offsets, heap); dtuple_t* entry = row_rec_to_index_entry(rec, index, offsets,
heap);
/* The page containing the clustered index record /* The page containing the clustered index record
corresponding to entry is latched in mtr. Thus the corresponding to entry is latched. Thus the
following call is safe. */ following call is safe. */
if (!row_upd_index_replace_new_col_vals(entry, *index, update, if (!row_upd_index_replace_new_col_vals(entry, *index, update,
heap)) { heap)) {