MDEV-35189: Updating cache for INNODB_LOCKS et al is suboptimal

ha_storage_put_memlim(): Invoke my_crc32c() to "fold", and traverse
the hash table only once.

fold_lock(): Remove some redundant conditions and use my_crc32c()
instead of ut_fold_ulint_pair().

trx_i_s_cache_t::add(): Replaces add_lock_to_cache(),
search_innodb_locks(), and locks_row_eq_lock(). Avoid duplicated
traversal of the hash table.

Reviewed by: Debarun Banerjee
This commit is contained in:
Marko Mäkelä 2024-11-12 12:17:34 +02:00
parent 074831ec61
commit ccb6cd8053
4 changed files with 83 additions and 320 deletions

View file

@ -29,46 +29,6 @@ Created September 22, 2007 Vasil Dimov
#include "ha0storage.h"
#include "hash0hash.h"
#include "mem0mem.h"
#include "ut0rnd.h"
/*******************************************************************//**
Retrieves a data from a storage. If it is present, a pointer to the
stored copy of data is returned, otherwise NULL is returned. */
static
const void*
ha_storage_get(
/*===========*/
ha_storage_t* storage, /*!< in: hash storage */
const void* data, /*!< in: data to check for */
ulint data_len) /*!< in: data length */
{
ha_storage_node_t* node;
ulint fold;
/* avoid repetitive calls to ut_fold_binary() in the HASH_SEARCH
macro */
fold = ut_fold_binary(static_cast<const byte*>(data), data_len);
#define IS_FOUND \
node->data_len == data_len && memcmp(node->data, data, data_len) == 0
HASH_SEARCH(
next, /* node->"next" */
&storage->hash, /* the hash table */
fold, /* key */
ha_storage_node_t*, /* type of node->next */
node, /* auxiliary variable */
, /* assertion */
IS_FOUND); /* search criteria */
if (node == NULL) {
return(NULL);
}
/* else */
return(node->data);
}
/*******************************************************************//**
Copies data into the storage and returns a pointer to the copy. If the
@ -87,54 +47,29 @@ ha_storage_put_memlim(
ulint data_len, /*!< in: data length */
ulint memlim) /*!< in: memory limit to obey */
{
void* raw;
ha_storage_node_t* node;
const void* data_copy;
ulint fold;
const uint32_t fold= my_crc32c(0, data, data_len);
ha_storage_node_t** after = reinterpret_cast<ha_storage_node_t**>
(&storage->hash.cell_get(fold)->node);
for (; *after; after= &(*after)->next)
if ((*after)->data_len == data_len &&
!memcmp((*after)->data, data, data_len))
return (*after)->data;
/* check if data chunk is already present */
data_copy = ha_storage_get(storage, data, data_len);
if (data_copy != NULL) {
/* not present */
return(data_copy);
}
/* check if we are allowed to allocate data_len bytes */
if (memlim > 0 && ha_storage_get_size(storage) + data_len > memlim)
return nullptr;
/* not present */
/* check if we are allowed to allocate data_len bytes */
if (memlim > 0
&& ha_storage_get_size(storage) + data_len > memlim) {
return(NULL);
}
/* we put the auxiliary node struct and the data itself in one
continuous block */
raw = mem_heap_alloc(storage->heap,
sizeof(ha_storage_node_t) + data_len);
node = (ha_storage_node_t*) raw;
data_copy = (byte*) raw + sizeof(*node);
memcpy((byte*) raw + sizeof(*node), data, data_len);
node->data_len = data_len;
node->data = data_copy;
/* avoid repetitive calls to ut_fold_binary() in the HASH_INSERT
macro */
fold = ut_fold_binary(static_cast<const byte*>(data), data_len);
HASH_INSERT(
ha_storage_node_t, /* type used in the hash chain */
next, /* node->"next" */
&storage->hash, /* the hash table */
fold, /* key */
node); /* add this data to the hash */
/* the output should not be changed because it will spoil the
hash table */
return(data_copy);
/* we put the auxiliary node struct and the data itself in one
continuous block */
ha_storage_node_t *node= static_cast<ha_storage_node_t*>
(mem_heap_alloc(storage->heap, sizeof *node + data_len));
node->data_len= data_len;
node->data= &node[1];
memcpy(const_cast<void*>(node->data), data, data_len);
*after= node;
return node->data;
}
#ifdef UNIV_COMPILE_TEST_FUNCS

View file

@ -39,7 +39,7 @@ struct hash_cell_t
@param insert the being-inserted element
@param next the next-element pointer in T */
template<typename T>
void append(T &insert, T *T::*next)
void append(T &insert, T *T::*next) noexcept
{
void **after;
for (after= &node; *after;
@ -174,17 +174,21 @@ struct hash_table_t
/** Create the hash table.
@param n the lower bound of n_cells */
void create(ulint n)
void create(ulint n) noexcept
{
n_cells= ut_find_prime(n);
array= static_cast<hash_cell_t*>(ut_zalloc_nokey(n_cells * sizeof *array));
}
/** Clear the hash table. */
void clear() { memset(array, 0, n_cells * sizeof *array); }
void clear() noexcept { memset(array, 0, n_cells * sizeof *array); }
/** Free the hash table. */
void free() { ut_free(array); array= nullptr; }
void free() noexcept { ut_free(array); array= nullptr; }
ulint calc_hash(ulint fold) const { return ut_hash_ulint(fold, n_cells); }
ulint calc_hash(ulint fold) const noexcept
{ return ut_hash_ulint(fold, n_cells); }
hash_cell_t *cell_get(ulint fold) const noexcept
{ return &array[calc_hash(fold)]; }
};

View file

@ -70,20 +70,6 @@ do { \
} \
} while (0)
/** A row of INFORMATION_SCHEMA.innodb_locks */
struct i_s_locks_row_t;
/** Objects of trx_i_s_cache_t::locks_hash */
struct i_s_hash_chain_t;
/** Objects of this type are added to the hash table
trx_i_s_cache_t::locks_hash */
struct i_s_hash_chain_t {
i_s_locks_row_t* value; /*!< row of
INFORMATION_SCHEMA.innodb_locks*/
i_s_hash_chain_t* next; /*!< next item in the hash chain */
};
/** This structure represents INFORMATION_SCHEMA.innodb_locks row */
struct i_s_locks_row_t {
trx_id_t lock_trx_id; /*!< transaction identifier */
@ -106,7 +92,7 @@ struct i_s_locks_row_t {
table_id_t lock_table_id;
/*!< table identifier from
lock_get_table_id */
i_s_hash_chain_t hash_chain; /*!< hash table chain node for
i_s_locks_row_t *next; /*!< hash table chain node for
trx_i_s_cache_t::locks_hash */
/* @} */
};

View file

@ -59,47 +59,6 @@ now, then 39th chunk would accommodate 1677416425 rows and all chunks
would accommodate 3354832851 rows. */
#define MEM_CHUNKS_IN_TABLE_CACHE 39
/** The following are some testing auxiliary macros. Do not enable them
in a production environment. */
/* @{ */
#if 0
/** If this is enabled then lock folds will always be different
resulting in equal rows being put in a different cells of the hash
table. Checking for duplicates will be flawed because different
fold will be calculated when a row is searched in the hash table. */
#define TEST_LOCK_FOLD_ALWAYS_DIFFERENT
#endif
#if 0
/** This effectively kills the search-for-duplicate-before-adding-a-row
function, but searching in the hash is still performed. It will always
be assumed that lock is not present and insertion will be performed in
the hash table. */
#define TEST_NO_LOCKS_ROW_IS_EVER_EQUAL_TO_LOCK_T
#endif
#if 0
/** This aggressively repeats adding each row many times. Depending on
the above settings this may be noop or may result in lots of rows being
added. */
#define TEST_ADD_EACH_LOCKS_ROW_MANY_TIMES
#endif
#if 0
/** Very similar to TEST_NO_LOCKS_ROW_IS_EVER_EQUAL_TO_LOCK_T but hash
table search is not performed at all. */
#define TEST_DO_NOT_CHECK_FOR_DUPLICATE_ROWS
#endif
#if 0
/** Do not insert each row into the hash table, duplicates may appear
if this is enabled, also if this is enabled searching into the hash is
noop because it will be empty. */
#define TEST_DO_NOT_INSERT_INTO_THE_HASH_TABLE
#endif
/* @} */
/** Memory limit passed to ha_storage_put_memlim().
@param cache hash storage
@return maximum allowed allocation size */
@ -162,6 +121,13 @@ struct trx_i_s_cache_t {
bool is_truncated; /*!< this is true if the memory
limit was hit and thus the data
in the cache is truncated */
/** Adds an element.
@param lock element to be added
@param heap_no record lock heap number, or 0xFFFF for table lock
@return the existing or added lock
@retval nullptr if memory cannot be allocated */
i_s_locks_row_t *add(const lock_t &lock, uint16_t heap_no) noexcept;
};
/** This is the intermediate buffer where data needed to fill the
@ -778,7 +744,7 @@ static bool fill_locks_row(
row->lock_table_id = table->id;
row->hash_chain.value = row;
row->next = nullptr;
ut_ad(i_s_locks_row_validate(row));
return true;
@ -819,112 +785,18 @@ static
ulint
fold_lock(
/*======*/
const lock_t* lock, /*!< in: lock object to fold */
ulint heap_no)/*!< in: lock's record number
const lock_t& lock, /*!< in: lock object to fold */
uint16_t heap_no)/*!< in: lock's record number
or 0xFFFF if the lock
is a table lock */
{
#ifdef TEST_LOCK_FOLD_ALWAYS_DIFFERENT
static ulint fold = 0;
return(fold++);
#else
ulint ret;
if (!lock->is_table()) {
ut_a(heap_no != 0xFFFF);
ret = ut_fold_ulint_pair((ulint) lock->trx->id,
lock->un_member.rec_lock.page_id.
fold());
ret = ut_fold_ulint_pair(ret, heap_no);
} else {
/* this check is actually not necessary for continuing
correct operation, but something must have gone wrong if
it fails. */
ut_a(heap_no == 0xFFFF);
ret = (ulint) lock_get_table(*lock)->id;
}
return(ret);
#endif
}
/*******************************************************************//**
Checks whether i_s_locks_row_t object represents a lock_t object.
@return TRUE if they match */
static
ibool
locks_row_eq_lock(
/*==============*/
const i_s_locks_row_t* row, /*!< in: innodb_locks row */
const lock_t* lock, /*!< in: lock object */
ulint heap_no)/*!< in: lock's record number
or 0xFFFF if the lock
is a table lock */
{
ut_ad(i_s_locks_row_validate(row));
#ifdef TEST_NO_LOCKS_ROW_IS_EVER_EQUAL_TO_LOCK_T
return(0);
#else
if (!lock->is_table()) {
ut_a(heap_no != 0xFFFF);
return(row->lock_trx_id == lock->trx->id
&& row->lock_page == lock->un_member.rec_lock.page_id
&& row->lock_rec == heap_no);
} else {
/* this check is actually not necessary for continuing
correct operation, but something must have gone wrong if
it fails. */
ut_a(heap_no == 0xFFFF);
return(row->lock_trx_id == lock->trx->id
&& row->lock_table_id == lock_get_table(*lock)->id);
}
#endif
}
/*******************************************************************//**
Searches for a row in the innodb_locks cache that has a specified id.
This happens in O(1) time since a hash table is used. Returns pointer to
the row or NULL if none is found.
@return row or NULL */
static
i_s_locks_row_t*
search_innodb_locks(
/*================*/
trx_i_s_cache_t* cache, /*!< in: cache */
const lock_t* lock, /*!< in: lock to search for */
uint16_t heap_no)/*!< in: lock's record number
or 0xFFFF if the lock
is a table lock */
{
i_s_hash_chain_t* hash_chain;
HASH_SEARCH(
/* hash_chain->"next" */
next,
/* the hash table */
&cache->locks_hash,
/* fold */
fold_lock(lock, heap_no),
/* the type of the next variable */
i_s_hash_chain_t*,
/* auxiliary variable */
hash_chain,
/* assertion on every traversed item */
ut_ad(i_s_locks_row_validate(hash_chain->value)),
/* this determines if we have found the lock */
locks_row_eq_lock(hash_chain->value, lock, heap_no));
if (hash_chain == NULL) {
return(NULL);
}
/* else */
return(hash_chain->value);
ut_ad((heap_no == 0xFFFF) == lock.is_table());
if (heap_no == 0xFFFF)
return ulint(lock.un_member.tab_lock.table->id);
char buf[8 + 8];
memcpy(buf, &lock.trx->id, 8);
memcpy(buf + 8, &lock.un_member.rec_lock.page_id, 8);
return my_crc32c(heap_no, buf, sizeof buf);
}
/*******************************************************************//**
@ -933,67 +805,40 @@ Returns a pointer to the added row. If the row is already present then
no row is added and a pointer to the existing row is returned.
If row can not be allocated then NULL is returned.
@return row */
static
i_s_locks_row_t*
add_lock_to_cache(
/*==============*/
trx_i_s_cache_t* cache, /*!< in/out: cache */
const lock_t* lock, /*!< in: the element to add */
uint16_t heap_no)/*!< in: lock's record number
or 0 if the lock
is a table lock */
i_s_locks_row_t *
trx_i_s_cache_t::add(const lock_t &lock, uint16_t heap_no) noexcept
{
i_s_locks_row_t* dst_row;
ut_ad(lock.is_table() == (heap_no == 0xFFFF));
i_s_locks_row_t** after= reinterpret_cast<i_s_locks_row_t**>
(&locks_hash.cell_get(fold_lock(lock, heap_no))->node);
while (i_s_locks_row_t *row= *after)
{
ut_ad(i_s_locks_row_validate(row));
if (row->lock_trx_id == lock.trx->id &&
(heap_no == 0xFFFF
? row->lock_table_id == lock.un_member.tab_lock.table->id
: (row->lock_rec == heap_no &&
row->lock_page == lock.un_member.rec_lock.page_id)))
return row;
after= &row->next;
}
i_s_locks_row_t *dst_row= static_cast<i_s_locks_row_t*>
(table_cache_create_empty_row(&innodb_locks, this));
if (dst_row)
{
if (!fill_locks_row(dst_row, &lock, heap_no, this))
{
innodb_locks.rows_used--;
dst_row= nullptr;
}
else
{
*after= dst_row;
ut_ad(i_s_locks_row_validate(dst_row));
}
}
#ifdef TEST_ADD_EACH_LOCKS_ROW_MANY_TIMES
ulint i;
for (i = 0; i < 10000; i++) {
#endif
#ifndef TEST_DO_NOT_CHECK_FOR_DUPLICATE_ROWS
/* quit if this lock is already present */
dst_row = search_innodb_locks(cache, lock, heap_no);
if (dst_row != NULL) {
ut_ad(i_s_locks_row_validate(dst_row));
return(dst_row);
}
#endif
dst_row = (i_s_locks_row_t*)
table_cache_create_empty_row(&cache->innodb_locks, cache);
/* memory could not be allocated */
if (dst_row == NULL) {
return(NULL);
}
if (!fill_locks_row(dst_row, lock, heap_no, cache)) {
/* memory could not be allocated */
cache->innodb_locks.rows_used--;
return(NULL);
}
#ifndef TEST_DO_NOT_INSERT_INTO_THE_HASH_TABLE
HASH_INSERT(
/* the type used in the hash chain */
i_s_hash_chain_t,
/* hash_chain->"next" */
next,
/* the hash table */
&cache->locks_hash,
/* fold */
fold_lock(lock, heap_no),
/* add this data to the hash */
&dst_row->hash_chain);
#endif
#ifdef TEST_ADD_EACH_LOCKS_ROW_MANY_TIMES
} /* for()-loop */
#endif
ut_ad(i_s_locks_row_validate(dst_row));
return(dst_row);
return dst_row;
}
/*******************************************************************//**
@ -1057,12 +902,10 @@ add_trx_relevant_locks_to_cache(
i_s_locks_row_t* blocking_lock_row;
lock_queue_iterator_t iter;
uint16_t wait_lock_heap_no
= wait_lock_get_heap_no(wait_lock);
uint16_t heap_no = wait_lock_get_heap_no(wait_lock);
/* add the requested lock */
*requested_lock_row = add_lock_to_cache(cache, wait_lock,
wait_lock_heap_no);
*requested_lock_row = cache->add(*wait_lock, heap_no);
/* memory could not be allocated */
if (*requested_lock_row == NULL) {
@ -1083,13 +926,8 @@ add_trx_relevant_locks_to_cache(
/* add the lock that is
blocking wait_lock */
blocking_lock_row
= add_lock_to_cache(
cache, curr_lock,
/* heap_no is the same
for the wait and waited
locks */
wait_lock_heap_no);
blocking_lock_row = cache->add(*curr_lock,
heap_no);
/* memory could not be allocated */
if (blocking_lock_row == NULL) {