mariadb/extra/mariabackup/ds_xbstream.cc
Monty 1c55b845e0 MDEV-32932 Port backup features from ES
Added support to BACKUP STAGE to maria-backup

This is a port of the code from ES 10.6
See MDEV-5336 for backup stages description.

The following old options are not supported by the new code:
--rsync             ; This is because rsync will not work on tables
                      that are in used.
--no-backup-locks   ; This is disabled as mariadb-backup will always
                      use backup locks for better performance.
2024-02-27 20:55:54 +02:00

274 lines
6.2 KiB
C++

/******************************************************
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
Streaming implementation for XtraBackup.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*******************************************************/
#include <my_global.h>
#include <my_base.h>
#include "common.h"
#include "datasink.h"
#include "xbstream.h"
typedef struct {
xb_wstream_t *xbstream;
ds_file_t *dest_file;
pthread_mutex_t mutex;
} ds_stream_ctxt_t;
typedef struct {
xb_wstream_file_t *xbstream_file;
ds_stream_ctxt_t *stream_ctxt;
} ds_stream_file_t;
/***********************************************************************
General streaming interface */
static ds_ctxt_t *xbstream_init(const char *root);
static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path,
const MY_STAT *mystat, bool rewrite);
static int xbstream_write(ds_file_t *file, const uchar *buf, size_t len);
static int xbstream_seek_set(ds_file_t *file, my_off_t offset);
static int xbstream_close(ds_file_t *file);
static void xbstream_deinit(ds_ctxt_t *ctxt);
static int xbstream_rename(
ds_ctxt_t *ctxt, const char *old_path, const char *new_path);
static int xbstream_mremove(ds_ctxt_t *ctxt, const char *path);
datasink_t datasink_xbstream = {
&xbstream_init,
&xbstream_open,
&xbstream_write,
&xbstream_seek_set,
&xbstream_close,
&dummy_remove,
&xbstream_rename,
&xbstream_mremove,
&xbstream_deinit
};
static
ssize_t
my_xbstream_write_callback(void *userdata, const void *buf, size_t len)
{
ds_stream_ctxt_t *stream_ctxt;
stream_ctxt = (ds_stream_ctxt_t *) userdata;
xb_ad(stream_ctxt != NULL);
xb_ad(stream_ctxt->dest_file != NULL);
if (!ds_write(stream_ctxt->dest_file, buf, len)) {
return len;
}
return -1;
}
static
ds_ctxt_t *
xbstream_init(const char *root __attribute__((unused)))
{
ds_ctxt_t *ctxt;
ds_stream_ctxt_t *stream_ctxt;
xb_wstream_t *xbstream;
ctxt = (ds_ctxt_t *)my_malloc(PSI_NOT_INSTRUMENTED,
sizeof(ds_ctxt_t) + sizeof(ds_stream_ctxt_t), MYF(MY_FAE));
stream_ctxt = (ds_stream_ctxt_t *)(ctxt + 1);
if (pthread_mutex_init(&stream_ctxt->mutex, NULL)) {
msg("xbstream_init: pthread_mutex_init() failed.");
goto err;
}
xbstream = xb_stream_write_new(my_xbstream_write_callback, stream_ctxt);
if (xbstream == NULL) {
msg("xb_stream_write_new() failed.");
goto err;
}
stream_ctxt->xbstream = xbstream;
stream_ctxt->dest_file = NULL;
ctxt->ptr = stream_ctxt;
return ctxt;
err:
my_free(ctxt);
return NULL;
}
static
ds_file_t *
xbstream_open(ds_ctxt_t *ctxt, const char *path,
const MY_STAT *mystat, bool rewrite)
{
ds_file_t *file;
ds_stream_file_t *stream_file;
ds_stream_ctxt_t *stream_ctxt;
ds_ctxt_t *dest_ctxt;
xb_wstream_t *xbstream;
xb_wstream_file_t *xbstream_file;
xb_ad(ctxt->pipe_ctxt != NULL);
dest_ctxt = ctxt->pipe_ctxt;
stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
pthread_mutex_lock(&stream_ctxt->mutex);
if (stream_ctxt->dest_file == NULL) {
stream_ctxt->dest_file = ds_open(dest_ctxt, path, mystat);
}
pthread_mutex_unlock(&stream_ctxt->mutex);
if (stream_ctxt->dest_file == NULL) {
return NULL;
}
file = (ds_file_t *) my_malloc(PSI_NOT_INSTRUMENTED,
sizeof(ds_file_t) +
sizeof(ds_stream_file_t),
MYF(MY_FAE));
if (!file) {
msg("my_malloc() failed.");
goto err;
}
stream_file = (ds_stream_file_t *) (file + 1);
xbstream = stream_ctxt->xbstream;
xbstream_file = xb_stream_write_open(xbstream, path, mystat, rewrite);
if (xbstream_file == NULL) {
msg("xb_stream_write_open() failed.");
goto err;
}
stream_file->xbstream_file = xbstream_file;
stream_file->stream_ctxt = stream_ctxt;
file->ptr = stream_file;
file->path = stream_ctxt->dest_file->path;
return file;
err:
if (stream_ctxt->dest_file) {
ds_close(stream_ctxt->dest_file);
stream_ctxt->dest_file = NULL;
}
my_free(file);
return NULL;
}
static
int
xbstream_write(ds_file_t *file, const uchar *buf, size_t len)
{
ds_stream_file_t *stream_file;
xb_wstream_file_t *xbstream_file;
stream_file = (ds_stream_file_t *) file->ptr;
xbstream_file = stream_file->xbstream_file;
if (xb_stream_write_data(xbstream_file, buf, len)) {
msg("xb_stream_write_data() failed.");
return 1;
}
return 0;
}
static
int
xbstream_seek_set(ds_file_t *file, my_off_t offset)
{
ds_stream_file_t *stream_file;
xb_wstream_file_t *xbstream_file;
stream_file = (ds_stream_file_t *) file->ptr;
xbstream_file = stream_file->xbstream_file;
if (xb_stream_write_seek_set(xbstream_file, offset)) {
msg("xb_stream_write_seek_set() failed.");
return 1;
}
return 0;
}
static
int
xbstream_mremove(ds_ctxt_t *ctxt, const char *path) {
ds_stream_ctxt_t *stream_ctxt =
reinterpret_cast<ds_stream_ctxt_t *>(ctxt->ptr);
xb_wstream_t *xbstream = stream_ctxt->xbstream;
return xb_stream_write_remove(xbstream, path);
}
static
int
xbstream_rename(
ds_ctxt_t *ctxt, const char *old_path, const char *new_path) {
ds_stream_ctxt_t *stream_ctxt =
reinterpret_cast<ds_stream_ctxt_t *>(ctxt->ptr);
xb_wstream_t *xbstream = stream_ctxt->xbstream;
return xb_stream_write_rename(xbstream, old_path, new_path);
}
static
int
xbstream_close(ds_file_t *file)
{
ds_stream_file_t *stream_file;
int rc = 0;
stream_file = (ds_stream_file_t *)file->ptr;
rc = xb_stream_write_close(stream_file->xbstream_file);
my_free(file);
return rc;
}
static
void
xbstream_deinit(ds_ctxt_t *ctxt)
{
ds_stream_ctxt_t *stream_ctxt;
stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
if (xb_stream_write_done(stream_ctxt->xbstream)) {
msg("xb_stream_done() failed.");
}
if (stream_ctxt->dest_file) {
ds_close(stream_ctxt->dest_file);
stream_ctxt->dest_file = NULL;
}
pthread_mutex_destroy(&stream_ctxt->mutex);
my_free(ctxt);
}