This commit is contained in:
unknown 2008-10-31 15:06:56 +02:00
commit 24d1df1a56

View file

@ -121,6 +121,8 @@ struct st_translog_buffer
in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE)
*/
TRANSLOG_ADDRESS next_buffer_offset;
/* Previous buffer offset to detect it flush finish */
TRANSLOG_ADDRESS prev_buffer_offset;
/*
How much is written (or will be written when copy_to_buffer_in_progress
become 0) to this buffer
@ -135,12 +137,12 @@ struct st_translog_buffer
/* list of waiting buffer ready threads */
struct st_my_thread_var *waiting_flush;
/*
Pointer on the buffer which overlap with this one (due to flush of
If true then previous buffer overlap with this one (due to flush of
loghandler, the last page of that buffer is the same as the first page
of this buffer) and have to be written first (because contain old
content of page which present in both buffers)
*/
struct st_translog_buffer *overlay;
my_bool overlay;
uint buffer_no;
/*
Lock for the buffer.
@ -175,6 +177,14 @@ struct st_translog_buffer
With file and offset it allow detect buffer changes
*/
uint8 ver;
/*
When previous buffer sent to disk it set its address here to allow
to detect when it is done
(we have to keep it in this buffer to lock buffers only in one direction).
*/
TRANSLOG_ADDRESS prev_sent_to_disk;
pthread_cond_t prev_sent_to_disk_cond;
};
@ -1421,9 +1431,12 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
/* list of waiting buffer ready threads */
buffer->waiting_flush= 0;
/* lock for the buffer. Current buffer also lock the handler */
if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST))
if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST) ||
pthread_cond_init(&buffer->prev_sent_to_disk_cond, 0))
DBUG_RETURN(1);
buffer->is_closing_buffer= 0;
buffer->prev_sent_to_disk= LSN_IMPOSSIBLE;
buffer->prev_buffer_offset= LSN_IMPOSSIBLE;
buffer->ver= 0;
DBUG_RETURN(0);
}
@ -2100,10 +2113,12 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
{
translog_lock_assert_owner();
translog_start_buffer(new_buffer, cursor, new_buffer_no);
new_buffer->prev_buffer_offset=
log_descriptor.buffers[old_buffer_no].offset;
new_buffer->prev_last_lsn=
BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no);
}
log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset;
new_buffer->prev_last_lsn=
BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no);
DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(new_buffer->prev_last_lsn),
(ulong) new_buffer));
@ -2117,14 +2132,16 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
SYNOPSIS
translog_set_sent_to_disk()
lsn LSN to assign
in_buffers to assign to in_buffers_only
buffer buffer which we have sent to disk
TODO: use atomic operations if possible (64bit architectures?)
*/
static void translog_set_sent_to_disk(LSN lsn, TRANSLOG_ADDRESS in_buffers)
static void translog_set_sent_to_disk(struct st_translog_buffer *buffer)
{
LSN lsn= buffer->last_lsn;
TRANSLOG_ADDRESS in_buffers= buffer->next_buffer_offset;
DBUG_ENTER("translog_set_sent_to_disk");
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) "
@ -2415,6 +2432,51 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset)
}
}
/*
@brief Waits previous buffer flush finish
@param buffer buffer for check
@retval 0 previous buffer flushed and this thread have to flush this one
@retval 1 previous buffer flushed and this buffer flushed by other thread too
*/
my_bool translog_prev_buffer_flush_wait(struct st_translog_buffer *buffer)
{
TRANSLOG_ADDRESS offset= buffer->offset;
TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver;
DBUG_ENTER("translog_prev_buffer_flush_wait");
DBUG_PRINT("enter", ("buffer: 0x%lx #%u offset: (%lu,0x%lx) "
"prev sent: (%lu,0x%lx) prev offset: (%lu,0x%lx)",
(ulong) buffer, (uint) buffer->buffer_no,
LSN_IN_PARTS(buffer->offset),
LSN_IN_PARTS(buffer->prev_sent_to_disk),
LSN_IN_PARTS(buffer->prev_buffer_offset)));
translog_buffer_lock_assert_owner(buffer);
/*
if prev_sent_to_disk == LSN_IMPOSSIBLE then
prev_buffer_offset should be LSN_IMPOSSIBLE
because it means that this buffer was never used
*/
DBUG_ASSERT((buffer->prev_sent_to_disk == LSN_IMPOSSIBLE &&
buffer->prev_buffer_offset == LSN_IMPOSSIBLE) ||
buffer->prev_sent_to_disk != LSN_IMPOSSIBLE);
if (buffer->prev_buffer_offset != buffer->prev_sent_to_disk)
{
do {
pthread_cond_wait(&buffer->prev_sent_to_disk_cond, &buffer->mutex);
if (buffer->file != file || buffer->offset != offset ||
buffer->ver != ver)
{
translog_buffer_unlock(buffer);
DBUG_RETURN(1); /* some the thread flushed the buffer already */
}
} while(buffer->prev_buffer_offset != buffer->prev_sent_to_disk);
}
DBUG_RETURN(0);
}
/*
Flush given buffer
@ -2460,39 +2522,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
DBUG_RETURN(0); /* some the thread flushed the buffer already */
if (buffer->overlay && buffer->overlay->file == buffer->file &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer->offset) > 0)
{
/*
This can't happen for normal translog_flush,
only during destroying the loghandler
*/
struct st_translog_buffer *overlay= buffer->overlay;
TRANSLOG_ADDRESS buffer_offset= buffer->offset;
TRANSLOG_FILE *fl= buffer->file;
uint8 ver= buffer->ver;
translog_buffer_unlock(buffer);
translog_buffer_lock(overlay);
/* rechecks under mutex protection that overlay is still our overlay */
if (buffer->overlay->file == fl &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer_offset) > 0)
{
translog_wait_for_buffer_free(overlay);
}
translog_buffer_unlock(overlay);
translog_buffer_lock(buffer);
if (buffer->file != fl || buffer_offset != buffer->offset ||
ver != buffer->ver)
{
/*
This means that somebody else flushed the buffer while we was
waiting for overlay then for locking buffer again.
*/
DBUG_RETURN(0);
}
}
if (buffer->overlay && translog_prev_buffer_flush_wait(buffer))
DBUG_RETURN(0); /* some the thread flushed the buffer already */
/*
Send page by page in the pagecache what we are going to write on the
@ -2553,10 +2584,34 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
file->is_sync= 0;
if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */
translog_set_sent_to_disk(buffer->last_lsn,
buffer->next_buffer_offset);
{
if (translog_prev_buffer_flush_wait(buffer))
DBUG_RETURN(0); /* some the thread flushed the buffer already */
translog_set_sent_to_disk(buffer);
}
else
translog_set_only_in_buffers(buffer->next_buffer_offset);
/* say to next buffer that we are finished */
{
struct st_translog_buffer *next_buffer=
log_descriptor.buffers + ((buffer->buffer_no + 1) % TRANSLOG_BUFFERS_NO);
if (likely(translog_status == TRANSLOG_OK)){
translog_buffer_lock(next_buffer);
next_buffer->prev_sent_to_disk= buffer->offset;
translog_buffer_unlock(next_buffer);
pthread_cond_broadcast(&next_buffer->prev_sent_to_disk_cond);
}
else
{
/*
It is shutdown =>
1) there is only one thread
2) mutexes of other buffers can be destroyed => we can't use them
*/
next_buffer->prev_sent_to_disk= buffer->offset;
}
}
/* Free buffer */
buffer->file= NULL;
buffer->overlay= 0;
@ -4640,6 +4695,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
}
translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no);
old_buffer->next_buffer_offset= new_buffer->offset;
new_buffer->prev_buffer_offset= old_buffer->offset;
translog_buffer_unlock(old_buffer);
offset-= min_offset;
}
@ -7355,7 +7411,7 @@ static void translog_force_current_buffer_to_finish()
log_descriptor.bc.ptr+= current_page_fill;
log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill=
current_page_fill;
new_buffer->overlay= old_buffer;
new_buffer->overlay= 1;
}
else
translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
@ -7428,8 +7484,8 @@ static void translog_force_current_buffer_to_finish()
memcpy(new_buffer->buffer, data, current_page_fill);
}
old_buffer->next_buffer_offset= new_buffer->offset;
translog_buffer_lock(new_buffer);
new_buffer->prev_buffer_offset= old_buffer->offset;
translog_buffer_decrease_writers(new_buffer);
translog_buffer_unlock(new_buffer);