mirror of
https://github.com/MariaDB/server.git
synced 2025-01-23 15:24:16 +01:00
41d68cabee
Introduce Log_event_writer() that encapsulates writing data to an IO_CACHE with automatic checksum calculation. Now all events properly checksum themselves as needed. Use Log_event_writer in MYSQL_BIN_LOG::write_cache() instead of copy-pasting its logic all over. Later Log_event_writer will also do encryption.
493 lines
14 KiB
C++
493 lines
14 KiB
C++
/* Copyright (C) 2013 Codership Oy <info@codership.com>
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License along
|
|
with this program; if not, write to the Free Software Foundation, Inc.,
|
|
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
|
|
|
|
#include "wsrep_binlog.h"
|
|
#include "wsrep_priv.h"
|
|
#include "log.h"
|
|
#include "log_event.h"
|
|
#include "wsrep_applier.h"
|
|
|
|
extern handlerton *binlog_hton;
|
|
/*
|
|
Write the contents of a cache to a memory buffer.
|
|
|
|
This function quite the same as MYSQL_BIN_LOG::write_cache(),
|
|
with the exception that here we write in buffer instead of log file.
|
|
*/
|
|
int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
|
|
{
|
|
*buf= NULL;
|
|
*buf_len= 0;
|
|
|
|
my_off_t const saved_pos(my_b_tell(cache));
|
|
|
|
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
|
|
{
|
|
WSREP_ERROR("failed to initialize io-cache");
|
|
return ER_ERROR_ON_WRITE;
|
|
}
|
|
|
|
uint length = my_b_bytes_in_cache(cache);
|
|
if (unlikely(0 == length)) length = my_b_fill(cache);
|
|
|
|
size_t total_length = 0;
|
|
|
|
if (likely(length > 0)) do
|
|
{
|
|
total_length += length;
|
|
/*
|
|
Bail out if buffer grows too large.
|
|
A temporary fix to avoid allocating indefinitely large buffer,
|
|
not a real limit on a writeset size which includes other things
|
|
like header and keys.
|
|
*/
|
|
if (total_length > wsrep_max_ws_size)
|
|
{
|
|
WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
|
|
wsrep_max_ws_size, total_length);
|
|
goto error;
|
|
}
|
|
uchar* tmp = (uchar *)my_realloc(*buf, total_length,
|
|
MYF(MY_ALLOW_ZERO_PTR));
|
|
if (!tmp)
|
|
{
|
|
WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
|
|
*buf_len, length);
|
|
goto error;
|
|
}
|
|
*buf = tmp;
|
|
|
|
memcpy(*buf + *buf_len, cache->read_pos, length);
|
|
*buf_len = total_length;
|
|
|
|
if (cache->file < 0)
|
|
{
|
|
cache->read_pos= cache->read_end;
|
|
break;
|
|
}
|
|
} while ((length = my_b_fill(cache)));
|
|
|
|
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
|
|
{
|
|
WSREP_WARN("failed to initialize io-cache");
|
|
goto cleanup;
|
|
}
|
|
|
|
return 0;
|
|
|
|
error:
|
|
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
|
|
{
|
|
WSREP_WARN("failed to initialize io-cache");
|
|
}
|
|
cleanup:
|
|
my_free(*buf);
|
|
*buf= NULL;
|
|
*buf_len= 0;
|
|
return ER_ERROR_ON_WRITE;
|
|
}
|
|
|
|
#define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
|
|
* many transactions would fit in there
|
|
* so there is no need to reach for the heap */
|
|
|
|
/* Returns minimum multiple of HEAP_PAGE_SIZE that is >= length */
|
|
static inline size_t
|
|
heap_size(size_t length)
|
|
{
|
|
return (length + HEAP_PAGE_SIZE - 1)/HEAP_PAGE_SIZE*HEAP_PAGE_SIZE;
|
|
}
|
|
|
|
/* append data to writeset */
|
|
static inline wsrep_status_t
|
|
wsrep_append_data(wsrep_t* const wsrep,
|
|
wsrep_ws_handle_t* const ws,
|
|
const void* const data,
|
|
size_t const len)
|
|
{
|
|
struct wsrep_buf const buff = { data, len };
|
|
wsrep_status_t const rc(wsrep->append_data(wsrep, ws, &buff, 1,
|
|
WSREP_DATA_ORDERED, true));
|
|
if (rc != WSREP_OK)
|
|
{
|
|
WSREP_WARN("append_data() returned %d", rc);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
/*
|
|
Write the contents of a cache to wsrep provider.
|
|
|
|
This function quite the same as MYSQL_BIN_LOG::write_cache(),
|
|
with the exception that here we write in buffer instead of log file.
|
|
|
|
This version reads all of cache into single buffer and then appends to a
|
|
writeset at once.
|
|
*/
|
|
static int wsrep_write_cache_once(wsrep_t* const wsrep,
|
|
THD* const thd,
|
|
IO_CACHE* const cache,
|
|
size_t* const len)
|
|
{
|
|
my_off_t const saved_pos(my_b_tell(cache));
|
|
|
|
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
|
|
{
|
|
WSREP_ERROR("failed to initialize io-cache");
|
|
return ER_ERROR_ON_WRITE;
|
|
}
|
|
|
|
int err(WSREP_OK);
|
|
|
|
size_t total_length(0);
|
|
uchar stack_buf[STACK_SIZE]; /* to avoid dynamic allocations for few data*/
|
|
uchar* heap_buf(NULL);
|
|
uchar* buf(stack_buf);
|
|
size_t allocated(sizeof(stack_buf));
|
|
size_t used(0);
|
|
|
|
uint length(my_b_bytes_in_cache(cache));
|
|
if (unlikely(0 == length)) length = my_b_fill(cache);
|
|
|
|
if (likely(length > 0)) do
|
|
{
|
|
total_length += length;
|
|
/*
|
|
Bail out if buffer grows too large.
|
|
A temporary fix to avoid allocating indefinitely large buffer,
|
|
not a real limit on a writeset size which includes other things
|
|
like header and keys.
|
|
*/
|
|
if (unlikely(total_length > wsrep_max_ws_size))
|
|
{
|
|
WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
|
|
wsrep_max_ws_size, total_length);
|
|
err = WSREP_TRX_SIZE_EXCEEDED;
|
|
goto cleanup;
|
|
}
|
|
|
|
if (total_length > allocated)
|
|
{
|
|
size_t const new_size(heap_size(total_length));
|
|
uchar* tmp = (uchar *)my_realloc(heap_buf, new_size,
|
|
MYF(MY_ALLOW_ZERO_PTR));
|
|
if (!tmp)
|
|
{
|
|
WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
|
|
allocated, length);
|
|
err = WSREP_TRX_SIZE_EXCEEDED;
|
|
goto cleanup;
|
|
}
|
|
|
|
heap_buf = tmp;
|
|
buf = heap_buf;
|
|
allocated = new_size;
|
|
|
|
if (used <= STACK_SIZE && used > 0) // there's data in stack_buf
|
|
{
|
|
DBUG_ASSERT(buf == stack_buf);
|
|
memcpy(heap_buf, stack_buf, used);
|
|
}
|
|
}
|
|
|
|
memcpy(buf + used, cache->read_pos, length);
|
|
used = total_length;
|
|
if (cache->file < 0)
|
|
{
|
|
cache->read_pos= cache->read_end;
|
|
break;
|
|
}
|
|
} while ((length = my_b_fill(cache)));
|
|
|
|
if (used > 0)
|
|
err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used);
|
|
|
|
if (WSREP_OK == err) *len = total_length;
|
|
|
|
cleanup:
|
|
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
|
|
{
|
|
WSREP_ERROR("failed to reinitialize io-cache");
|
|
}
|
|
|
|
if (unlikely(WSREP_OK != err))
|
|
{
|
|
wsrep_dump_rbr_buf_with_header(thd, buf, used);
|
|
}
|
|
|
|
my_free(heap_buf);
|
|
return err;
|
|
}
|
|
|
|
/*
|
|
Write the contents of a cache to wsrep provider.
|
|
|
|
This function quite the same as MYSQL_BIN_LOG::write_cache(),
|
|
with the exception that here we write in buffer instead of log file.
|
|
|
|
This version uses incremental data appending as it reads it from cache.
|
|
*/
|
|
static int wsrep_write_cache_inc(wsrep_t* const wsrep,
|
|
THD* const thd,
|
|
IO_CACHE* const cache,
|
|
size_t* const len)
|
|
{
|
|
my_off_t const saved_pos(my_b_tell(cache));
|
|
|
|
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
|
|
{
|
|
WSREP_ERROR("failed to initialize io-cache");
|
|
return WSREP_TRX_ERROR;
|
|
}
|
|
|
|
int err(WSREP_OK);
|
|
|
|
size_t total_length(0);
|
|
|
|
uint length(my_b_bytes_in_cache(cache));
|
|
if (unlikely(0 == length)) length = my_b_fill(cache);
|
|
|
|
if (likely(length > 0)) do
|
|
{
|
|
total_length += length;
|
|
/* bail out if buffer grows too large
|
|
not a real limit on a writeset size which includes other things
|
|
like header and keys.
|
|
*/
|
|
if (unlikely(total_length > wsrep_max_ws_size))
|
|
{
|
|
WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
|
|
wsrep_max_ws_size, total_length);
|
|
err = WSREP_TRX_SIZE_EXCEEDED;
|
|
goto cleanup;
|
|
}
|
|
|
|
if(WSREP_OK != (err=wsrep_append_data(wsrep, &thd->wsrep_ws_handle,
|
|
cache->read_pos, length)))
|
|
goto cleanup;
|
|
|
|
if (cache->file < 0)
|
|
{
|
|
cache->read_pos= cache->read_end;
|
|
break;
|
|
}
|
|
} while ((length = my_b_fill(cache)));
|
|
|
|
if (WSREP_OK == err) *len = total_length;
|
|
|
|
cleanup:
|
|
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
|
|
{
|
|
WSREP_ERROR("failed to reinitialize io-cache");
|
|
}
|
|
|
|
return err;
|
|
}
|
|
|
|
/*
|
|
Write the contents of a cache to wsrep provider.
|
|
|
|
This function quite the same as MYSQL_BIN_LOG::write_cache(),
|
|
with the exception that here we write in buffer instead of log file.
|
|
*/
|
|
int wsrep_write_cache(wsrep_t* const wsrep,
|
|
THD* const thd,
|
|
IO_CACHE* const cache,
|
|
size_t* const len)
|
|
{
|
|
if (wsrep_incremental_data_collection) {
|
|
return wsrep_write_cache_inc(wsrep, thd, cache, len);
|
|
}
|
|
else {
|
|
return wsrep_write_cache_once(wsrep, thd, cache, len);
|
|
}
|
|
}
|
|
|
|
void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
|
|
{
|
|
char filename[PATH_MAX]= {0};
|
|
int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
|
|
wsrep_data_home_dir, thd->thread_id,
|
|
(long long)wsrep_thd_trx_seqno(thd));
|
|
if (len >= PATH_MAX)
|
|
{
|
|
WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
|
|
return;
|
|
}
|
|
|
|
FILE *of= fopen(filename, "wb");
|
|
if (of)
|
|
{
|
|
fwrite (rbr_buf, buf_len, 1, of);
|
|
fclose(of);
|
|
}
|
|
else
|
|
{
|
|
WSREP_ERROR("Failed to open file '%s': %d (%s)",
|
|
filename, errno, strerror(errno));
|
|
}
|
|
}
|
|
|
|
/*
|
|
wsrep exploits binlog's caches even if binlogging itself is not
|
|
activated. In such case connection close needs calling
|
|
actual binlog's method.
|
|
Todo: split binlog hton from its caches to use ones by wsrep
|
|
without referring to binlog's stuff.
|
|
*/
|
|
int wsrep_binlog_close_connection(THD* thd)
|
|
{
|
|
DBUG_ENTER("wsrep_binlog_close_connection");
|
|
if (thd_get_ha_data(thd, binlog_hton) != NULL)
|
|
binlog_hton->close_connection (binlog_hton, thd);
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
int wsrep_binlog_savepoint_set(THD *thd, void *sv)
|
|
{
|
|
if (!wsrep_emulate_bin_log) return 0;
|
|
int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv);
|
|
return rcode;
|
|
}
|
|
|
|
int wsrep_binlog_savepoint_rollback(THD *thd, void *sv)
|
|
{
|
|
if (!wsrep_emulate_bin_log) return 0;
|
|
int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv);
|
|
return rcode;
|
|
}
|
|
|
|
#if 0
|
|
void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache)
|
|
{
|
|
char filename[PATH_MAX]= {0};
|
|
int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
|
|
wsrep_data_home_dir, thd->thread_id,
|
|
(long long)wsrep_thd_trx_seqno(thd));
|
|
size_t bytes_in_cache = 0;
|
|
// check path
|
|
if (len >= PATH_MAX)
|
|
{
|
|
WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
|
|
return ;
|
|
}
|
|
// init cache
|
|
my_off_t const saved_pos(my_b_tell(cache));
|
|
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
|
|
{
|
|
WSREP_ERROR("failed to initialize io-cache");
|
|
return ;
|
|
}
|
|
// open file
|
|
FILE* of = fopen(filename, "wb");
|
|
if (!of)
|
|
{
|
|
WSREP_ERROR("Failed to open file '%s': %d (%s)",
|
|
filename, errno, strerror(errno));
|
|
goto cleanup;
|
|
}
|
|
// ready to write
|
|
bytes_in_cache= my_b_bytes_in_cache(cache);
|
|
if (unlikely(bytes_in_cache == 0)) bytes_in_cache = my_b_fill(cache);
|
|
if (likely(bytes_in_cache > 0)) do
|
|
{
|
|
if (my_fwrite(of, cache->read_pos, bytes_in_cache,
|
|
MYF(MY_WME | MY_NABP)) == (size_t) -1)
|
|
{
|
|
WSREP_ERROR("Failed to write file '%s'", filename);
|
|
goto cleanup;
|
|
}
|
|
|
|
if (cache->file < 0)
|
|
{
|
|
cache->read_pos= cache->read_end;
|
|
break;
|
|
}
|
|
} while ((bytes_in_cache= my_b_fill(cache)));
|
|
if(cache->error == -1)
|
|
{
|
|
WSREP_ERROR("RBR inconsistent");
|
|
goto cleanup;
|
|
}
|
|
cleanup:
|
|
// init back
|
|
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
|
|
{
|
|
WSREP_ERROR("failed to reinitialize io-cache");
|
|
}
|
|
// close file
|
|
if (of) fclose(of);
|
|
}
|
|
#endif
|
|
|
|
void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
|
|
{
|
|
thd->binlog_flush_pending_rows_event(stmt_end);
|
|
}
|
|
|
|
/* Dump replication buffer along with header to a file. */
|
|
void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
|
|
size_t buf_len)
|
|
{
|
|
char filename[PATH_MAX]= {0};
|
|
File file;
|
|
IO_CACHE cache;
|
|
Log_event_writer writer(&cache);
|
|
Format_description_log_event *ev= wsrep_get_apply_format(thd);
|
|
|
|
int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld_v2.log",
|
|
wsrep_data_home_dir, thd->thread_id,
|
|
(long long) wsrep_thd_trx_seqno(thd));
|
|
|
|
if (len >= PATH_MAX)
|
|
{
|
|
WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
|
|
return;
|
|
}
|
|
|
|
if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
|
|
O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
|
|
{
|
|
WSREP_ERROR("Failed to open file '%s' : %d (%s)",
|
|
filename, errno, strerror(errno));
|
|
goto cleanup1;
|
|
}
|
|
|
|
if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
|
|
{
|
|
mysql_file_close(file, MYF(MY_WME));
|
|
goto cleanup2;
|
|
}
|
|
|
|
if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
|
|
{
|
|
goto cleanup2;
|
|
}
|
|
|
|
if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
|
|
flush_io_cache(&cache))
|
|
{
|
|
WSREP_ERROR("Failed to write to '%s'.", filename);
|
|
goto cleanup2;
|
|
}
|
|
|
|
cleanup2:
|
|
end_io_cache(&cache);
|
|
|
|
cleanup1:
|
|
mysql_file_close(file, MYF(MY_WME));
|
|
}
|
|
|