mariadb/extra/mariabackup/ds_buffer.cc
Monty 1c55b845e0 MDEV-32932 Port backup features from ES
Added support to BACKUP STAGE to maria-backup

This is a port of the code from ES 10.6
See MDEV-5336 for backup stages description.

The following old options are not supported by the new code:
--rsync             ; This is because rsync will not work on tables
                      that are in used.
--no-backup-locks   ; This is disabled as mariadb-backup will always
                      use backup locks for better performance.
2024-02-27 20:55:54 +02:00

194 lines
4.7 KiB
C++

/******************************************************
Copyright (c) 2012-2013 Percona LLC and/or its affiliates.
buffer datasink for XtraBackup.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*******************************************************/
/* Does buffered output to a destination datasink set with ds_set_pipe().
Writes to the destination datasink are guaranteed to not be smaller than a
specified buffer size (DS_DEFAULT_BUFFER_SIZE by default), with the only
exception for the last write for a file. */
#include <my_global.h>
#include <my_base.h>
#include "ds_buffer.h"
#include "common.h"
#include "datasink.h"
#define DS_DEFAULT_BUFFER_SIZE (64 * 1024)
typedef struct {
ds_file_t *dst_file;
char *buf;
size_t pos;
size_t size;
} ds_buffer_file_t;
typedef struct {
size_t buffer_size;
} ds_buffer_ctxt_t;
static ds_ctxt_t *buffer_init(const char *root);
static ds_file_t *buffer_open(ds_ctxt_t *ctxt, const char *path,
const MY_STAT *mystat, bool rewrite);
static int buffer_write(ds_file_t *file, const uchar *buf, size_t len);
static int buffer_close(ds_file_t *file);
static void buffer_deinit(ds_ctxt_t *ctxt);
datasink_t datasink_buffer = {
&buffer_init,
&buffer_open,
&buffer_write,
nullptr,
&buffer_close,
&dummy_remove,
nullptr,
nullptr,
&buffer_deinit
};
/* Change the default buffer size */
void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size)
{
ds_buffer_ctxt_t *buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
buffer_ctxt->buffer_size = size;
}
static ds_ctxt_t *
buffer_init(const char *root)
{
ds_ctxt_t *ctxt;
ds_buffer_ctxt_t *buffer_ctxt;
ctxt = (ds_ctxt_t *)my_malloc(PSI_NOT_INSTRUMENTED,
sizeof(ds_ctxt_t) + sizeof(ds_buffer_ctxt_t), MYF(MY_FAE));
buffer_ctxt = (ds_buffer_ctxt_t *) (ctxt + 1);
buffer_ctxt->buffer_size = DS_DEFAULT_BUFFER_SIZE;
ctxt->ptr = buffer_ctxt;
ctxt->root = my_strdup(PSI_NOT_INSTRUMENTED, root, MYF(MY_FAE));
return ctxt;
}
static ds_file_t *
buffer_open(ds_ctxt_t *ctxt, const char *path,
const MY_STAT *mystat, bool rewrite)
{
DBUG_ASSERT(rewrite == false);
ds_buffer_ctxt_t *buffer_ctxt;
ds_ctxt_t *pipe_ctxt;
ds_file_t *dst_file;
ds_file_t *file;
ds_buffer_file_t *buffer_file;
pipe_ctxt = ctxt->pipe_ctxt;
xb_a(pipe_ctxt != NULL);
dst_file = ds_open(pipe_ctxt, path, mystat);
if (dst_file == NULL) {
die("ds_open(%s) failed", path);
}
buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
file = (ds_file_t *) my_malloc(PSI_NOT_INSTRUMENTED, sizeof(ds_file_t) +
sizeof(ds_buffer_file_t) + buffer_ctxt->buffer_size,
MYF(MY_FAE));
buffer_file = (ds_buffer_file_t *) (file + 1);
buffer_file->dst_file = dst_file;
buffer_file->buf = (char *) (buffer_file + 1);
buffer_file->size = buffer_ctxt->buffer_size;
buffer_file->pos = 0;
file->path = dst_file->path;
file->ptr = buffer_file;
return file;
}
static int
buffer_write(ds_file_t *file, const uchar *buf, size_t len)
{
ds_buffer_file_t *buffer_file;
buffer_file = (ds_buffer_file_t *) file->ptr;
while (len > 0) {
if (buffer_file->pos + len > buffer_file->size) {
if (buffer_file->pos > 0) {
size_t bytes;
bytes = buffer_file->size - buffer_file->pos;
memcpy(buffer_file->buf + buffer_file->pos, buf,
bytes);
if (ds_write(buffer_file->dst_file,
buffer_file->buf,
buffer_file->size)) {
return 1;
}
buffer_file->pos = 0;
buf += bytes;
len -= bytes;
} else {
/* We don't have any buffered bytes, just write
the entire source buffer */
if (ds_write(buffer_file->dst_file, buf, len)) {
return 1;
}
break;
}
} else {
memcpy(buffer_file->buf + buffer_file->pos, buf, len);
buffer_file->pos += len;
break;
}
}
return 0;
}
static int
buffer_close(ds_file_t *file)
{
ds_buffer_file_t *buffer_file;
int ret;
buffer_file = (ds_buffer_file_t *) file->ptr;
if (buffer_file->pos > 0) {
ds_write(buffer_file->dst_file, buffer_file->buf,
buffer_file->pos);
}
ret = ds_close(buffer_file->dst_file);
my_free(file);
return ret;
}
static void
buffer_deinit(ds_ctxt_t *ctxt)
{
my_free(ctxt->root);
my_free(ctxt);
}