diff --git a/storage/maria/ma_loghandler.c b/storage/maria/ma_loghandler.c index 2ef3d3fc193..bde9360f06f 100644 --- a/storage/maria/ma_loghandler.c +++ b/storage/maria/ma_loghandler.c @@ -155,7 +155,7 @@ struct st_translog_descriptor /* maximum LSN of the current (not finished) file */ LSN max_lsn; - /* Last flushed LSN */ + /* Last flushed LSN (protected by log_flush_lock) */ LSN flushed; /* Last LSN sent to the disk (but maybe not written yet) */ LSN sent_to_file; @@ -220,6 +220,14 @@ LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES]; #ifndef DBUG_OFF + +#define translog_buffer_lock_assert_owner(B) \ + safe_mutex_assert_owner(&B->mutex); +void translog_lock_assert_owner() +{ + translog_buffer_lock_assert_owner(log_descriptor.bc.buffer); +} + /** @brief check the description table validity @@ -294,6 +302,9 @@ static void check_translog_description_table(int num) } DBUG_VOID_RETURN; } +#else +#define translog_buffer_lock_assert_owner(B) +#define translog_lock_assert_owner() #endif static LOG_DESC INIT_LOGREC_FIXED_RECORD_0LSN_EXAMPLE= @@ -3051,13 +3062,6 @@ void translog_destroy() -#define translog_buffer_lock_assert_owner(B) \ - safe_mutex_assert_owner(&B->mutex); -void translog_lock_assert_owner() -{ - translog_buffer_lock_assert_owner(log_descriptor.bc.buffer); -} - /* Start new page @@ -3314,6 +3318,7 @@ static inline void translog_buffer_increase_writers(struct st_translog_buffer *buffer) { DBUG_ENTER("translog_buffer_increase_writers"); + translog_buffer_lock_assert_owner(buffer); buffer->copy_to_buffer_in_progress++; DBUG_PRINT("info", ("copy_to_buffer_in_progress. Buffer #%u 0x%lx: %d", (uint) buffer->buffer_no, (ulong) buffer, @@ -3334,6 +3339,7 @@ translog_buffer_increase_writers(struct st_translog_buffer *buffer) static void translog_buffer_decrease_writers(struct st_translog_buffer *buffer) { DBUG_ENTER("translog_buffer_decrease_writers"); + translog_buffer_lock_assert_owner(buffer); buffer->copy_to_buffer_in_progress--; DBUG_PRINT("info", ("copy_to_buffer_in_progress. Buffer #%u 0x%lx: %d", (uint) buffer->buffer_no, (ulong) buffer, @@ -6147,6 +6153,30 @@ static void translog_force_current_buffer_to_finish() log_descriptor.bc.write_counter= write_counter; log_descriptor.bc.previous_offset= previous_offset; + /* + Advances this log pointer, increases writers and let other threads to + write to the log while we process old page content + */ + if (left) + { + log_descriptor.bc.ptr+= current_page_fill; + log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill= + current_page_fill; + new_buffer->overlay= old_buffer; + } + else + translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc); + translog_buffer_increase_writers(new_buffer); + translog_buffer_unlock(new_buffer); + + /* + We have to wait until all writers finish before start changing the + pages by applying protection and copying the page content in the + new buffer. + */ + translog_wait_for_writers(old_buffer); + + if (data[TRANSLOG_PAGE_FLAGS] & TRANSLOG_SECTOR_PROTECTION) { translog_put_sector_protection(data, &log_descriptor.bc); @@ -6175,19 +6205,17 @@ static void translog_force_current_buffer_to_finish() if (left) { /* - TODO: do not copy begining of the page if we have no CRC or sector + TODO: do not copy beginning of the page if we have no CRC or sector checks on */ memcpy(new_buffer->buffer, data, current_page_fill); - log_descriptor.bc.ptr+= current_page_fill; - log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill= - current_page_fill; - new_buffer->overlay= old_buffer; } - else - translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc); old_buffer->next_buffer_offset= new_buffer->offset; + translog_buffer_lock(new_buffer); + translog_buffer_decrease_writers(new_buffer); + translog_buffer_unlock(new_buffer); + DBUG_VOID_RETURN; } @@ -6246,6 +6274,7 @@ my_bool translog_flush(LSN lsn) { DBUG_PRINT("info", ("already flushed: (%lu,0x%lx)", LSN_IN_PARTS(log_descriptor.flushed))); + translog_unlock(); goto out; } /* send to the file if it is not sent */ @@ -6282,9 +6311,9 @@ my_bool translog_flush(LSN lsn) rc= 1; goto out; } - if (!full_circle) - translog_lock(); + translog_lock(); } + translog_unlock(); for (i= LSN_FILE_NO(old_flushed); i <= LSN_FILE_NO(lsn); i++) { @@ -6313,7 +6342,6 @@ my_bool translog_flush(LSN lsn) /** @todo LOG decide if syncing of directory is needed */ rc|= my_sync(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD)); out: - translog_unlock(); pthread_mutex_unlock(&log_descriptor.log_flush_lock); DBUG_RETURN(rc); }