MDEV-29911 InnoDB recovery and mariadb-backup --prepare fail to report detailed progress

The progress reporting of InnoDB crash recovery was rather intermittent.
Nothing was reported during the single-threaded log record parsing, which
could consume minutes when parsing a large log. During log application,
there only was progress reporting in background threads that would be
invoked on data page read completion.

The progress reporting here will be detailed like this:

InnoDB: Starting crash recovery from checkpoint LSN=503549688
InnoDB: Parsed redo log up to LSN=1990840177; to recover: 124806 pages
InnoDB: Parsed redo log up to LSN=2729777071; to recover: 186123 pages
InnoDB: Parsed redo log up to LSN=3488599173; to recover: 248397 pages
InnoDB: Parsed redo log up to LSN=4177856618; to recover: 306469 pages
InnoDB: Multi-batch recovery needed at LSN 4189599815
InnoDB: End of log at LSN=4483551634
InnoDB: To recover: LSN 4189599815/4483551634; 307490 pages
InnoDB: To recover: LSN 4189599815/4483551634; 197159 pages
InnoDB: To recover: LSN 4189599815/4483551634; 67623 pages
InnoDB: Parsed redo log up to LSN=4353924218; to recover: 102083 pages
...
InnoDB: log sequence number 4483551634 ...

The previous messages "Starting a batch to recover" or
"Starting a final batch to recover" will be replaced by
"To recover: ... pages" messages.

If a batch lasts longer than 15 seconds, then there will be
progress reports every 15 seconds, showing the number of remaining pages.
For the non-final batch, the "To recover:" message includes two end LSN:
that of the batch, and of the recovered log. This is the primary measure
of progress. The batch will end once the number of pages to recover
reaches 0.

If recovery is possible in a single batch, the output will look like this,
with a shorter "To recover:" message that counts only the remaining pages:

InnoDB: Starting crash recovery from checkpoint LSN=503549688
InnoDB: Parsed redo log up to LSN=1998701027; to recover: 125560 pages
InnoDB: Parsed redo log up to LSN=2734136874; to recover: 186446 pages
InnoDB: Parsed redo log up to LSN=3499505504; to recover: 249378 pages
InnoDB: Parsed redo log up to LSN=4183247844; to recover: 306964 pages
InnoDB: End of log at LSN=4483551634
...
InnoDB: To recover: 331797 pages
...
InnoDB: log sequence number 4483551634 ...

We will also speed up recovery by improving the memory management and
implementing multi-threaded recovery of data pages that will not need
to be read into the buffer pool ("fake read"). Log application in the
"fake read" threads will be protected by an atomic being_recovered field
and exclusive buf_page_t::latch.

Recovery will reserve for data pages two thirds of the buffer pool,
or 256 pages, whichever is smaller. Previously, we could only use at most
one third of the buffer pool for buffered log records. This would typically
mean that with large buffer pools, recovery unnecessary consisted of
multiple batches.

If recovery runs out of memory, it will "roll back" or "rewind" the current
mini-transaction. The recv_sys.lsn and recv_sys.pages will correspond
to the "out of memory LSN", at the end of the previous complete
mini-transaction.

If recovery runs out of memory while executing the final recovery batch,
we can simply invoke recv_sys.apply(false) to make room, and resume
parsing.

If recovery runs out of memory before the final batch, we will scan
the redo log to the end (recv_sys.scanned_lsn) and check for any missing
or inconsistent files. If recv_init_crash_recovery_spaces() does not
report any potentially missing tablespaces, we can make use of the
already stored recv_sys.pages and only rewind to the "out of memory LSN".
Else, we must keep parsing and invoking recv_validate_tablespace()
until an error has been found or everything has been resolved, and
ultimatily rewind to to the checkpoint LSN.

recv_sys_t::pages_it: A cached iterator to recv_sys.pages

recv_sys_t::parse_mtr(): Remove an ATTRIBUTE_NOINLINE that would
prevent tail call optimization in recv_sys_t::parse_pmem().

recv_sys_t::parse(), recv_sys_t::parse_mtr(), recv_sys_t::parse_pmem():
Add template<bool store> parameter. Redo log record parsing
(store=false) is better specialized from store=true
(with bool if_exists) so that we can avoid some conditional branches
in frequently invoked low-level code.

recv_sys_t::is_memory_exhausted(): Remove. The special parse() status
GOT_OOM will report out-of-memory situation at the low level.

recv_sys_t::rewind(), page_recv_t::recs_t::rewind():
Remove all log starting with a specific LSN.

recv_scan_log(): Separate some code for only parsing, not storing log.
In rewound_lsn, remember the LSN at which last_phase=false recovery
ran out of memory. This is where the next call to recv_scan_log()
will resume storing the log. This replaces recv_sys.last_stored_lsn.

recv_sys_t::parse(): Evaluate the template parameter store in a few more
cases, to allow dead code to be eliminated at compile time.

recv_sys_t::scanned_lsn: The end of the log found by recv_scan_log().
The special value 1 means that recv_sys has been initialized but
no log has been parsed.

IORequest::write_complete(), IORequest::read_complete():
Replaces fil_aio_callback().

read_io_callback(), write_io_callback(): Replaces io_callback().

IORequest::fake_read_complete(), fake_io_callback(), os_fake_read():
Process a "fake read" request for concurrent recovery.

recv_sys_t::apply_batch(): Choose a number of successive pages
for a recovery batch.

recv_sys_t::erase(recv_sys_t::map::iterator): Remove log records for a
page whose recovery is not in progress. Log application threads
will not invoke this; they will only set being_recovered=-1 to indicate
that the entry is no longer needed.

recv_sys_t::garbage_collect(): Remove all being_recovered=-1 entries.

recv_sys_t::wait_for_pool(): Wait for some space to become available
in the buffer pool.

mlog_init_t::mark_ibuf_exist(): Avoid calls to
recv_sys::recover_low() via ibuf_page_exists() and buf_page_get_low().
Such calls would lead to double locking of recv_sys.mutex, which
depending on implementation could cause a deadlock. We will use
lower-level calls to look up index pages.

buf_LRU_block_remove_hashed(): Disable consistency checks for freed
ROW_FORMAT=COMPRESSED pages. Their contents could be uninitialized garbage.
This fixes an occasional failure of the test
innodb.innodb_bulk_create_index_debug.

Tested by: Matthias Leich
This commit is contained in:
Marko Mäkelä 2023-05-19 15:15:38 +03:00
parent 2ec42f793d
commit 2f9e264781
12 changed files with 1332 additions and 927 deletions

View file

@ -3146,7 +3146,7 @@ static bool xtrabackup_copy_logfile()
if (log_sys.buf[recv_sys.offset] <= 1) if (log_sys.buf[recv_sys.offset] <= 1)
break; break;
if (recv_sys.parse_mtr(STORE_NO) == recv_sys_t::OK) if (recv_sys.parse_mtr<false>(false) == recv_sys_t::OK)
{ {
do do
{ {
@ -3156,7 +3156,7 @@ static bool xtrabackup_copy_logfile()
sequence_offset)); sequence_offset));
*seq= 1; *seq= 1;
} }
while ((r= recv_sys.parse_mtr(STORE_NO)) == recv_sys_t::OK); while ((r= recv_sys.parse_mtr<false>(false)) == recv_sys_t::OK);
if (ds_write(dst_log_file, log_sys.buf + start_offset, if (ds_write(dst_log_file, log_sys.buf + start_offset,
recv_sys.offset - start_offset)) recv_sys.offset - start_offset))

View file

@ -372,7 +372,7 @@ void buf_dblwr_t::recover()
const uint32_t space_id= page_get_space_id(page); const uint32_t space_id= page_get_space_id(page);
const page_id_t page_id(space_id, page_no); const page_id_t page_id(space_id, page_no);
if (recv_sys.lsn < lsn) if (recv_sys.scanned_lsn < lsn)
{ {
ib::info() << "Ignoring a doublewrite copy of page " << page_id ib::info() << "Ignoring a doublewrite copy of page " << page_id
<< " with future log sequence number " << lsn; << " with future log sequence number " << lsn;

View file

@ -2591,6 +2591,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init()
/** Flush the buffer pool on shutdown. */ /** Flush the buffer pool on shutdown. */
ATTRIBUTE_COLD void buf_flush_buffer_pool() ATTRIBUTE_COLD void buf_flush_buffer_pool()
{ {
ut_ad(!os_aio_pending_reads());
ut_ad(!buf_page_cleaner_is_active); ut_ad(!buf_page_cleaner_is_active);
ut_ad(!buf_flush_sync_lsn); ut_ad(!buf_flush_sync_lsn);

View file

@ -1093,7 +1093,11 @@ static bool buf_LRU_block_remove_hashed(buf_page_t *bpage, const page_id_t id,
ut_a(!zip || !bpage->oldest_modification()); ut_a(!zip || !bpage->oldest_modification());
ut_ad(bpage->zip_size()); ut_ad(bpage->zip_size());
/* Skip consistency checks if the page was freed.
In recovery, we could get a sole FREE_PAGE record
and nothing else, for a ROW_FORMAT=COMPRESSED page.
Its contents would be garbage. */
if (!bpage->is_freed())
switch (fil_page_get_type(page)) { switch (fil_page_get_type(page)) {
case FIL_PAGE_TYPE_ALLOCATED: case FIL_PAGE_TYPE_ALLOCATED:
case FIL_PAGE_INODE: case FIL_PAGE_INODE:
@ -1224,6 +1228,7 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold()); buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold());
page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain); page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain);
recv_sys.free_corrupted_page(id);
mysql_mutex_lock(&mutex); mysql_mutex_lock(&mutex);
hash_lock.lock(); hash_lock.lock();
@ -1248,8 +1253,6 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage)); buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage));
mysql_mutex_unlock(&mutex); mysql_mutex_unlock(&mutex);
recv_sys.free_corrupted_page(id);
} }
/** Update buf_pool.LRU_old_ratio. /** Update buf_pool.LRU_old_ratio.

View file

@ -655,60 +655,35 @@ failed:
return count; return count;
} }
/** @return whether a page has been freed */ /** Schedule a page for recovery.
inline bool fil_space_t::is_freed(uint32_t page) @param space tablespace
@param page_id page identifier
@param recs log records
@param init page initialization, or nullptr if the page needs to be read */
void buf_read_recover(fil_space_t *space, const page_id_t page_id,
page_recv_t &recs, recv_init *init)
{ {
std::lock_guard<std::mutex> freed_lock(freed_range_mutex); ut_ad(space->id == page_id.space());
return freed_ranges.contains(page);
}
/** Issues read requests for pages which recovery wants to read in.
@param space_id tablespace identifier
@param page_nos page numbers to read, in ascending order */
void buf_read_recv_pages(uint32_t space_id, st_::span<uint32_t> page_nos)
{
fil_space_t* space = fil_space_t::get(space_id);
if (!space) {
/* The tablespace is missing or unreadable: do nothing */
return;
}
const ulint zip_size = space->zip_size();
for (ulint i = 0; i < page_nos.size(); i++) {
/* Ignore if the page already present in freed ranges. */
if (space->is_freed(page_nos[i])) {
continue;
}
const page_id_t cur_page_id(space_id, page_nos[i]);
ulint limit = 0;
for (ulint j = 0; j < buf_pool.n_chunks; j++) {
limit += buf_pool.chunks[j].size / 2;
}
if (os_aio_pending_reads() >= limit) {
os_aio_wait_until_no_pending_reads(false);
}
space->reacquire(); space->reacquire();
switch (buf_read_page_low(space, false, BUF_READ_ANY_PAGE, const ulint zip_size= space->zip_size();
cur_page_id, zip_size, true)) {
case DB_SUCCESS: case DB_SUCCESS_LOCKED_REC: if (init)
break; {
default: if (buf_page_t *bpage= buf_page_init_for_read(BUF_READ_ANY_PAGE, page_id,
zip_size, true))
{
ut_ad(bpage->in_file());
os_fake_read(IORequest{bpage, (buf_tmp_buffer_t*) &recs,
UT_LIST_GET_FIRST(space->chain),
IORequest::READ_ASYNC}, ptrdiff_t(init));
}
}
else if (dberr_t err= buf_read_page_low(space, false, BUF_READ_ANY_PAGE,
page_id, zip_size, true))
{
if (err != DB_SUCCESS_LOCKED_REC)
sql_print_error("InnoDB: Recovery failed to read page " sql_print_error("InnoDB: Recovery failed to read page "
UINT32PF " from %s", UINT32PF " from %s",
cur_page_id.page_no(), page_id.page_no(), space->chain.start->name);
space->chain.start->name);
} }
}
DBUG_PRINT("ib_buf", ("recovery read (%zu pages) for %s",
page_nos.size(), space->chain.start->name));
space->release();
} }

View file

@ -2775,38 +2775,41 @@ func_exit:
#include <tpool.h> #include <tpool.h>
/** Callback for AIO completion */ void IORequest::write_complete() const
void fil_aio_callback(const IORequest &request)
{ {
ut_ad(fil_validate_skip()); ut_ad(fil_validate_skip());
ut_ad(request.node); ut_ad(node);
ut_ad(is_write());
if (!request.bpage) if (!bpage)
{ {
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
if (request.type == IORequest::DBLWR_BATCH) if (type == IORequest::DBLWR_BATCH)
buf_dblwr.flush_buffered_writes_completed(request); buf_dblwr.flush_buffered_writes_completed(*this);
else else
ut_ad(request.type == IORequest::WRITE_ASYNC); ut_ad(type == IORequest::WRITE_ASYNC);
write_completed:
request.node->complete_write();
}
else if (request.is_write())
{
buf_page_write_complete(request);
goto write_completed;
} }
else else
{ buf_page_write_complete(*this);
ut_ad(request.is_read());
node->complete_write();
node->space->release();
}
void IORequest::read_complete() const
{
ut_ad(fil_validate_skip());
ut_ad(node);
ut_ad(is_read());
ut_ad(bpage);
/* IMPORTANT: since i/o handling for reads will read also the insert /* IMPORTANT: since i/o handling for reads will read also the insert
buffer in fil_system.sys_space, we have to be very careful not to buffer in fil_system.sys_space, we have to be very careful not to
introduce deadlocks. We never close fil_system.sys_space data introduce deadlocks. We never close fil_system.sys_space data files
files and never issue asynchronous reads of change buffer pages. */ and never issue asynchronous reads of change buffer pages. */
const page_id_t id(request.bpage->id()); const page_id_t id(bpage->id());
if (dberr_t err= request.bpage->read_complete(*request.node)) if (dberr_t err= bpage->read_complete(*node))
{ {
if (recv_recovery_is_on() && !srv_force_recovery) if (recv_recovery_is_on() && !srv_force_recovery)
{ {
@ -2817,11 +2820,10 @@ write_completed:
if (err != DB_FAIL) if (err != DB_FAIL)
ib::error() << "Failed to read page " << id.page_no() ib::error() << "Failed to read page " << id.page_no()
<< " from file '" << request.node->name << "': " << err; << " from file '" << node->name << "': " << err;
}
} }
request.node->space->release(); node->space->release();
} }
/** Flush to disk the writes in file spaces of the given type /** Flush to disk the writes in file spaces of the given type

View file

@ -75,8 +75,7 @@ struct buf_pool_info_t
ulint flush_list_len; /*!< Length of buf_pool.flush_list */ ulint flush_list_len; /*!< Length of buf_pool.flush_list */
ulint n_pend_unzip; /*!< buf_pool.n_pend_unzip, pages ulint n_pend_unzip; /*!< buf_pool.n_pend_unzip, pages
pending decompress */ pending decompress */
ulint n_pend_reads; /*!< buf_pool.n_pend_reads, pages ulint n_pend_reads; /*!< os_aio_pending_reads() */
pending read */
ulint n_pending_flush_lru; /*!< Pages pending flush in LRU */ ulint n_pending_flush_lru; /*!< Pages pending flush in LRU */
ulint n_pending_flush_list; /*!< Pages pending flush in FLUSH ulint n_pending_flush_list; /*!< Pages pending flush in FLUSH
LIST */ LIST */

View file

@ -102,10 +102,13 @@ which could result in a deadlock if the OS does not support asynchronous io.
ulint ulint
buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf); buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf);
/** Issue read requests for pages that need to be recovered. /** Schedule a page for recovery.
@param space_id tablespace identifier @param space tablespace
@param page_nos page numbers to read, in ascending order */ @param page_id page identifier
void buf_read_recv_pages(uint32_t space_id, st_::span<uint32_t> page_nos); @param recs log records
@param init page initialization, or nullptr if the page needs to be read */
void buf_read_recover(fil_space_t *space, const page_id_t page_id,
page_recv_t &recs, recv_init *init);
/** @name Modes used in read-ahead @{ */ /** @name Modes used in read-ahead @{ */
/** read only pages belonging to the insert buffer tree */ /** read only pages belonging to the insert buffer tree */

View file

@ -38,9 +38,9 @@ Created 9/20/1997 Heikki Tuuri
#define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on) #define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on)
ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result)) ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result))
/** Apply any buffered redo log to a page that was just read from a data file. /** Apply any buffered redo log to a page.
@param[in,out] space tablespace @param space tablespace
@param[in,out] bpage buffer pool page @param bpage buffer pool page
@return whether the page was recovered correctly */ @return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage); bool recv_recover_page(fil_space_t* space, buf_page_t* bpage);
@ -49,17 +49,6 @@ of first system tablespace page
@return error code or DB_SUCCESS */ @return error code or DB_SUCCESS */
dberr_t recv_recovery_from_checkpoint_start(); dberr_t recv_recovery_from_checkpoint_start();
/** Whether to store redo log records in recv_sys.pages */
enum store_t {
/** Do not store redo log records. */
STORE_NO,
/** Store redo log records. */
STORE_YES,
/** Store redo log records if the tablespace exists. */
STORE_IF_EXISTS
};
/** Report an operation to create, delete, or rename a file during backup. /** Report an operation to create, delete, or rename a file during backup.
@param[in] space_id tablespace identifier @param[in] space_id tablespace identifier
@param[in] type file operation redo log type @param[in] type file operation redo log type
@ -125,21 +114,15 @@ struct recv_dblwr_t
list pages; list pages;
}; };
/** the recovery state and buffered records for a page */ /** recv_sys.pages entry; protected by recv_sys.mutex */
struct page_recv_t struct page_recv_t
{ {
/** Recovery state; protected by recv_sys.mutex */ /** Recovery status: 0=not in progress, 1=log is being applied,
enum -1=log has been applied and the entry may be erased.
{ Transitions from 1 to -1 are NOT protected by recv_sys.mutex. */
/** not yet processed */ Atomic_relaxed<int8_t> being_processed{0};
RECV_NOT_PROCESSED, /** Whether reading the page will be skipped */
/** not processed; the page will be reinitialized */ bool skip_read= false;
RECV_WILL_NOT_READ,
/** page is being read */
RECV_BEING_READ,
/** log records are being applied on the page */
RECV_BEING_PROCESSED
} state= RECV_NOT_PROCESSED;
/** Latest written byte offset when applying the log records. /** Latest written byte offset when applying the log records.
@see mtr_t::m_last_offset */ @see mtr_t::m_last_offset */
uint16_t last_offset= 1; uint16_t last_offset= 1;
@ -162,6 +145,9 @@ struct page_recv_t
head= recs; head= recs;
tail= recs; tail= recs;
} }
/** Remove the last records for the page
@param start_lsn start of the removed log */
ATTRIBUTE_COLD void rewind(lsn_t start_lsn);
/** @return the last log snippet */ /** @return the last log snippet */
const log_rec_t* last() const { return tail; } const log_rec_t* last() const { return tail; }
@ -180,8 +166,8 @@ struct page_recv_t
iterator begin() { return head; } iterator begin() { return head; }
iterator end() { return NULL; } iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; } bool empty() const { ut_ad(!head == !tail); return !head; }
/** Clear and free the records; @see recv_sys_t::alloc() */ /** Clear and free the records; @see recv_sys_t::add() */
inline void clear(); void clear();
} log; } log;
/** Trim old log records for a page. /** Trim old log records for a page.
@ -190,21 +176,27 @@ struct page_recv_t
inline bool trim(lsn_t start_lsn); inline bool trim(lsn_t start_lsn);
/** Ignore any earlier redo log records for this page. */ /** Ignore any earlier redo log records for this page. */
inline void will_not_read(); inline void will_not_read();
/** @return whether the log records for the page are being processed */ };
bool is_being_processed() const { return state == RECV_BEING_PROCESSED; }
/** A page initialization operation that was parsed from the redo log */
struct recv_init
{
/** log sequence number of the page initialization */
lsn_t lsn;
/** Whether btr_page_create() avoided a read of the page.
At the end of the last recovery batch, mark_ibuf_exist()
will mark pages for which this flag is set. */
bool created;
}; };
/** Recovery system data structure */ /** Recovery system data structure */
struct recv_sys_t struct recv_sys_t
{ {
/** mutex protecting apply_log_recs and page_recv_t::state */ using init= recv_init;
mysql_mutex_t mutex;
/** mutex protecting this as well as some of page_recv_t */
alignas(CPU_LEVEL1_DCACHE_LINESIZE) mysql_mutex_t mutex;
private: private:
/** condition variable for
!apply_batch_on || pages.empty() || found_corrupt_log || found_corrupt_fs */
pthread_cond_t cond;
/** whether recv_apply_hashed_log_recs() is running */
bool apply_batch_on;
/** set when finding a corrupt log block or record, or there is a /** set when finding a corrupt log block or record, or there is a
log parsing buffer overflow */ log parsing buffer overflow */
bool found_corrupt_log; bool found_corrupt_log;
@ -226,6 +218,8 @@ public:
size_t offset; size_t offset;
/** log sequence number of the first non-parsed record */ /** log sequence number of the first non-parsed record */
lsn_t lsn; lsn_t lsn;
/** log sequence number of the last parsed mini-transaction */
lsn_t scanned_lsn;
/** log sequence number at the end of the FILE_CHECKPOINT record, or 0 */ /** log sequence number at the end of the FILE_CHECKPOINT record, or 0 */
lsn_t file_checkpoint; lsn_t file_checkpoint;
/** the time when progress was last reported */ /** the time when progress was last reported */
@ -238,6 +232,9 @@ public:
map pages; map pages;
private: private:
/** iterator to pages, used by parse() */
map::iterator pages_it;
/** Process a record that indicates that a tablespace size is being shrunk. /** Process a record that indicates that a tablespace size is being shrunk.
@param page_id first page that is not in the file @param page_id first page that is not in the file
@param lsn log sequence number of the shrink operation */ @param lsn log sequence number of the shrink operation */
@ -257,30 +254,42 @@ public:
/** The contents of the doublewrite buffer */ /** The contents of the doublewrite buffer */
recv_dblwr_t dblwr; recv_dblwr_t dblwr;
/** Last added LSN to pages, before switching to STORE_NO */
lsn_t last_stored_lsn= 0;
inline void read(os_offset_t offset, span<byte> buf); inline void read(os_offset_t offset, span<byte> buf);
inline size_t files_size(); inline size_t files_size();
void close_files() { files.clear(); files.shrink_to_fit(); } void close_files() { files.clear(); files.shrink_to_fit(); }
/** Advance pages_it if it matches the iterator */
void pages_it_invalidate(const map::iterator &p)
{
mysql_mutex_assert_owner(&mutex);
if (pages_it == p)
pages_it++;
}
/** Invalidate pages_it if it points to the given tablespace */
void pages_it_invalidate(uint32_t space_id)
{
mysql_mutex_assert_owner(&mutex);
if (pages_it != pages.end() && pages_it->first.space() == space_id)
pages_it= pages.end();
}
private: private:
/** Attempt to initialize a page based on redo log records. /** Attempt to initialize a page based on redo log records.
@param page_id page identifier @param p iterator
@param p iterator pointing to page_id
@param mtr mini-transaction @param mtr mini-transaction
@param b pre-allocated buffer pool block @param b pre-allocated buffer pool block
@param init page initialization
@return the recovered block @return the recovered block
@retval nullptr if the page cannot be initialized based on log records @retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */ @retval -1 if the page cannot be recovered due to corruption */
inline buf_block_t *recover_low(const page_id_t page_id, map::iterator &p, inline buf_block_t *recover_low(const map::iterator &p, mtr_t &mtr,
mtr_t &mtr, buf_block_t *b); buf_block_t *b, init &init);
/** Attempt to initialize a page based on redo log records. /** Attempt to initialize a page based on redo log records.
@param page_id page identifier @param page_id page identifier
@return the recovered block @return the recovered block
@retval nullptr if the page cannot be initialized based on log records @retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */ @retval -1 if the page cannot be recovered due to corruption */
buf_block_t *recover_low(const page_id_t page_id); ATTRIBUTE_COLD buf_block_t *recover_low(const page_id_t page_id);
/** All found log files (multiple ones are possible if we are upgrading /** All found log files (multiple ones are possible if we are upgrading
from before MariaDB Server 10.5.1) */ from before MariaDB Server 10.5.1) */
@ -289,10 +298,27 @@ private:
/** Base node of the redo block list. /** Base node of the redo block list.
List elements are linked via buf_block_t::unzip_LRU. */ List elements are linked via buf_block_t::unzip_LRU. */
UT_LIST_BASE_NODE_T(buf_block_t) blocks; UT_LIST_BASE_NODE_T(buf_block_t) blocks;
/** Allocate a block from the buffer pool for recv_sys.pages */
ATTRIBUTE_COLD buf_block_t *add_block();
/** Wait for buffer pool to become available.
@param pages number of buffer pool pages needed */
ATTRIBUTE_COLD void wait_for_pool(size_t pages);
/** Free log for processed pages. */
void garbage_collect();
/** Apply a recovery batch.
@param space_id current tablespace identifier
@param space current tablespace
@param free_block spare buffer block
@param last_batch whether it is possible to write more redo log
@return whether the caller must provide a new free_block */
bool apply_batch(uint32_t space_id, fil_space_t *&space,
buf_block_t *&free_block, bool last_batch);
public: public:
/** Check whether the number of read redo log blocks exceeds the maximum.
@return whether the memory is exhausted */
inline bool is_memory_exhausted();
/** Apply buffered log to persistent data pages. /** Apply buffered log to persistent data pages.
@param last_batch whether it is possible to write more redo log */ @param last_batch whether it is possible to write more redo log */
void apply(bool last_batch); void apply(bool last_batch);
@ -310,7 +336,7 @@ public:
/** Clean up after create() */ /** Clean up after create() */
void close(); void close();
bool is_initialised() const { return last_stored_lsn != 0; } bool is_initialised() const { return scanned_lsn != 0; }
/** Find the latest checkpoint. /** Find the latest checkpoint.
@return error code or DB_SUCCESS */ @return error code or DB_SUCCESS */
@ -321,60 +347,76 @@ public:
@param start_lsn start LSN of the mini-transaction @param start_lsn start LSN of the mini-transaction
@param lsn @see mtr_t::commit_lsn() @param lsn @see mtr_t::commit_lsn()
@param l redo log snippet @param l redo log snippet
@param len length of l, in bytes */ @param len length of l, in bytes
inline void add(map::iterator it, lsn_t start_lsn, lsn_t lsn, @return whether we ran out of memory */
bool add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
const byte *l, size_t len); const byte *l, size_t len);
enum parse_mtr_result { OK, PREMATURE_EOF, GOT_EOF }; /** Parsing result */
enum parse_mtr_result {
/** a record was successfully parsed */
OK,
/** the log ended prematurely (need to read more) */
PREMATURE_EOF,
/** the end of the log was reached */
GOT_EOF,
/** parse<true>(l, false) ran out of memory */
GOT_OOM
};
private: private:
/** Parse and register one log_t::FORMAT_10_8 mini-transaction. /** Parse and register one log_t::FORMAT_10_8 mini-transaction.
@param store whether to store the records @tparam store whether to store the records
@param l log data source */ @param l log data source
@param if_exists if store: whether to check if the tablespace exists */
template<typename source,bool store>
inline parse_mtr_result parse(source &l, bool if_exists) noexcept;
/** Rewind a mini-transaction when parse() runs out of memory.
@param l log data source
@param begin start of the mini-transaction */
template<typename source> template<typename source>
inline parse_mtr_result parse(store_t store, source& l) noexcept; ATTRIBUTE_COLD void rewind(source &l, source &begin) noexcept;
/** Report progress in terms of LSN or pages remaining */
ATTRIBUTE_COLD void report_progress() const;
public: public:
/** Parse and register one log_t::FORMAT_10_8 mini-transaction, /** Parse and register one log_t::FORMAT_10_8 mini-transaction,
handling log_sys.is_pmem() buffer wrap-around. handling log_sys.is_pmem() buffer wrap-around.
@param store whether to store the records */ @tparam store whether to store the records
static parse_mtr_result parse_mtr(store_t store) noexcept; @param if_exists if store: whether to check if the tablespace exists */
template<bool store>
static parse_mtr_result parse_mtr(bool if_exists) noexcept;
/** Parse and register one log_t::FORMAT_10_8 mini-transaction, /** Parse and register one log_t::FORMAT_10_8 mini-transaction,
handling log_sys.is_pmem() buffer wrap-around. handling log_sys.is_pmem() buffer wrap-around.
@param store whether to store the records */ @tparam store whether to store the records
static parse_mtr_result parse_pmem(store_t store) noexcept @param if_exists if store: whether to check if the tablespace exists */
template<bool store>
static parse_mtr_result parse_pmem(bool if_exists) noexcept
#ifdef HAVE_PMEM #ifdef HAVE_PMEM
; ;
#else #else
{ return parse_mtr(store); } { return parse_mtr<store>(if_exists); }
#endif #endif
/** Erase log records for a page. */
void erase(map::iterator p);
/** Clear a fully processed set of stored redo log records. */ /** Clear a fully processed set of stored redo log records. */
inline void clear(); void clear();
/** Determine whether redo log recovery progress should be reported. /** Determine whether redo log recovery progress should be reported.
@param time the current time @param time the current time
@return whether progress should be reported @return whether progress should be reported
(the last report was at least 15 seconds ago) */ (the last report was at least 15 seconds ago) */
bool report(time_t time) bool report(time_t time);
{
if (time - progress_time < 15)
return false;
progress_time= time;
return true;
}
/** The alloc() memory alignment, in bytes */ /** The alloc() memory alignment, in bytes */
static constexpr size_t ALIGNMENT= sizeof(size_t); static constexpr size_t ALIGNMENT= sizeof(size_t);
/** Allocate memory for log_rec_t
@param len allocation size, in bytes
@return pointer to len bytes of memory (never NULL) */
inline void *alloc(size_t len);
/** Free a redo log snippet. /** Free a redo log snippet.
@param data buffer returned by alloc() */ @param data buffer allocated in add() */
inline void free(const void *data); inline void free(const void *data);
/** Remove records for a corrupted page. /** Remove records for a corrupted page.
@ -386,8 +428,6 @@ public:
ATTRIBUTE_COLD void set_corrupt_fs(); ATTRIBUTE_COLD void set_corrupt_fs();
/** Flag log file corruption during recovery. */ /** Flag log file corruption during recovery. */
ATTRIBUTE_COLD void set_corrupt_log(); ATTRIBUTE_COLD void set_corrupt_log();
/** Possibly finish a recovery batch. */
inline void maybe_finish_batch();
/** @return whether data file corruption was found */ /** @return whether data file corruption was found */
bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); } bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); }
@ -405,12 +445,13 @@ public:
} }
/** Try to recover a tablespace that was not readable earlier /** Try to recover a tablespace that was not readable earlier
@param p iterator, initially pointing to page_id_t{space_id,0}; @param p iterator
the records will be freed and the iterator advanced
@param name tablespace file name @param name tablespace file name
@param free_block spare buffer block @param free_block spare buffer block
@return whether recovery failed */ @return recovered tablespace
bool recover_deferred(map::iterator &p, const std::string &name, @retval nullptr if recovery failed */
fil_space_t *recover_deferred(const map::iterator &p,
const std::string &name,
buf_block_t *&free_block); buf_block_t *&free_block);
}; };

View file

@ -212,6 +212,10 @@ public:
bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; } bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; }
bool is_async() const { return (type & (READ_SYNC ^ READ_ASYNC)) != 0; } bool is_async() const { return (type & (READ_SYNC ^ READ_ASYNC)) != 0; }
void write_complete() const;
void read_complete() const;
void fake_read_complete(os_offset_t offset) const;
/** If requested, free storage space associated with a section of the file. /** If requested, free storage space associated with a section of the file.
@param off byte offset from the start (SEEK_SET) @param off byte offset from the start (SEEK_SET)
@param len size of the hole in bytes @param len size of the hole in bytes
@ -1040,6 +1044,11 @@ int os_aio_init();
Frees the asynchronous io system. */ Frees the asynchronous io system. */
void os_aio_free(); void os_aio_free();
/** Submit a fake read request during crash recovery.
@param type fake read request
@param offset additional context */
void os_fake_read(const IORequest &type, os_offset_t offset);
/** Request a read or write. /** Request a read or write.
@param type I/O request @param type I/O request
@param buf buffer @param buf buffer

File diff suppressed because it is too large Load diff

View file

@ -3411,15 +3411,12 @@ os_file_get_status(
return(ret); return(ret);
} }
static void io_callback_errorcheck(const tpool::aiocb *cb)
extern void fil_aio_callback(const IORequest &request);
static void io_callback(tpool::aiocb *cb)
{ {
const IORequest &request= *static_cast<const IORequest*>
(static_cast<const void*>(cb->m_userdata));
if (cb->m_err != DB_SUCCESS) if (cb->m_err != DB_SUCCESS)
{ {
const IORequest &request= *static_cast<const IORequest*>
(static_cast<const void*>(cb->m_userdata));
ib::fatal() << "IO Error: " << cb->m_err << " during " << ib::fatal() << "IO Error: " << cb->m_err << " during " <<
(request.is_async() ? "async " : "sync ") << (request.is_async() ? "async " : "sync ") <<
(request.is_LRU() ? "lru " : "") << (request.is_LRU() ? "lru " : "") <<
@ -3427,19 +3424,36 @@ static void io_callback(tpool::aiocb *cb)
" of " << cb->m_len << " bytes, for file " << cb->m_fh << ", returned " << " of " << cb->m_len << " bytes, for file " << cb->m_fh << ", returned " <<
cb->m_ret_len; cb->m_ret_len;
} }
/* Return cb back to cache*/ }
if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD)
{ static void fake_io_callback(void *c)
{
tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
ut_ad(read_slots->contains(cb)); ut_ad(read_slots->contains(cb));
fil_aio_callback(request); static_cast<const IORequest*>(static_cast<const void*>(cb->m_userdata))->
fake_read_complete(cb->m_offset);
read_slots->release(cb); read_slots->release(cb);
} }
else
{ static void read_io_callback(void *c)
{
tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PREAD);
io_callback_errorcheck(cb);
ut_ad(read_slots->contains(cb));
static_cast<const IORequest*>
(static_cast<const void*>(cb->m_userdata))->read_complete();
read_slots->release(cb);
}
static void write_io_callback(void *c)
{
tpool::aiocb *cb= static_cast<tpool::aiocb*>(c);
ut_ad(cb->m_opcode == tpool::aio_opcode::AIO_PWRITE);
ut_ad(write_slots->contains(cb)); ut_ad(write_slots->contains(cb));
fil_aio_callback(request); static_cast<const IORequest*>
(static_cast<const void*>(cb->m_userdata))->write_complete();
write_slots->release(cb); write_slots->release(cb);
}
} }
#ifdef LINUX_NATIVE_AIO #ifdef LINUX_NATIVE_AIO
@ -3684,6 +3698,28 @@ void os_aio_wait_until_no_pending_reads(bool declare)
tpool::tpool_wait_end(); tpool::tpool_wait_end();
} }
/** Submit a fake read request during crash recovery.
@param type fake read request
@param offset additional context */
void os_fake_read(const IORequest &type, os_offset_t offset)
{
tpool::aiocb *cb= read_slots->acquire();
cb->m_group= read_slots->get_task_group();
cb->m_fh= type.node->handle.m_file;
cb->m_buffer= nullptr;
cb->m_len= 0;
cb->m_offset= offset;
cb->m_opcode= tpool::aio_opcode::AIO_PREAD;
new (cb->m_userdata) IORequest{type};
cb->m_internal_task.m_func= fake_io_callback;
cb->m_internal_task.m_arg= cb;
cb->m_internal_task.m_group= cb->m_group;
srv_thread_pool->submit_task(&cb->m_internal_task);
}
/** Request a read or write. /** Request a read or write.
@param type I/O request @param type I/O request
@param buf buffer @param buf buffer
@ -3729,23 +3765,32 @@ func_exit:
return err; return err;
} }
io_slots* slots;
tpool::callback_func callback;
tpool::aio_opcode opcode;
if (type.is_read()) { if (type.is_read()) {
++os_n_file_reads; ++os_n_file_reads;
slots = read_slots;
callback = read_io_callback;
opcode = tpool::aio_opcode::AIO_PREAD;
} else { } else {
++os_n_file_writes; ++os_n_file_writes;
slots = write_slots;
callback = write_io_callback;
opcode = tpool::aio_opcode::AIO_PWRITE;
} }
compile_time_assert(sizeof(IORequest) <= tpool::MAX_AIO_USERDATA_LEN); compile_time_assert(sizeof(IORequest) <= tpool::MAX_AIO_USERDATA_LEN);
io_slots* slots= type.is_read() ? read_slots : write_slots;
tpool::aiocb* cb = slots->acquire(); tpool::aiocb* cb = slots->acquire();
cb->m_buffer = buf; cb->m_buffer = buf;
cb->m_callback = (tpool::callback_func)io_callback; cb->m_callback = callback;
cb->m_group = slots->get_task_group(); cb->m_group = slots->get_task_group();
cb->m_fh = type.node->handle.m_file; cb->m_fh = type.node->handle.m_file;
cb->m_len = (int)n; cb->m_len = (int)n;
cb->m_offset = offset; cb->m_offset = offset;
cb->m_opcode = type.is_read() ? tpool::aio_opcode::AIO_PREAD : tpool::aio_opcode::AIO_PWRITE; cb->m_opcode = opcode;
new (cb->m_userdata) IORequest{type}; new (cb->m_userdata) IORequest{type};
if (srv_thread_pool->submit_io(cb)) { if (srv_thread_pool->submit_io(cb)) {
@ -3753,6 +3798,7 @@ func_exit:
os_file_handle_error(type.node->name, type.is_read() os_file_handle_error(type.node->name, type.is_read()
? "aio read" : "aio write"); ? "aio read" : "aio write");
err = DB_IO_ERROR; err = DB_IO_ERROR;
type.node->space->release();
} }
goto func_exit; goto func_exit;