mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-03 20:36:16 +01:00 
			
		
		
		
	Added support to BACKUP STAGE to maria-backup
This is a port of the code from ES 10.6
See MDEV-5336 for backup stages description.
The following old options are not supported by the new code:
--rsync             ; This is because rsync will not work on tables
                      that are in used.
--no-backup-locks   ; This is disabled as mariadb-backup will always
                      use backup locks for better performance.
		
	
			
		
			
				
	
	
		
			274 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			274 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/******************************************************
 | 
						|
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
 | 
						|
 | 
						|
Streaming implementation for XtraBackup.
 | 
						|
 | 
						|
This program is free software; you can redistribute it and/or modify
 | 
						|
it under the terms of the GNU General Public License as published by
 | 
						|
the Free Software Foundation; version 2 of the License.
 | 
						|
 | 
						|
This program is distributed in the hope that it will be useful,
 | 
						|
but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
GNU General Public License for more details.
 | 
						|
 | 
						|
You should have received a copy of the GNU General Public License
 | 
						|
along with this program; if not, write to the Free Software
 | 
						|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
 | 
						|
 | 
						|
*******************************************************/
 | 
						|
 | 
						|
#include <my_global.h>
 | 
						|
#include <my_base.h>
 | 
						|
#include "common.h"
 | 
						|
#include "datasink.h"
 | 
						|
#include "xbstream.h"
 | 
						|
 | 
						|
typedef struct {
 | 
						|
	xb_wstream_t	*xbstream;
 | 
						|
	ds_file_t	*dest_file;
 | 
						|
	pthread_mutex_t	mutex;
 | 
						|
} ds_stream_ctxt_t;
 | 
						|
 | 
						|
typedef struct {
 | 
						|
	xb_wstream_file_t	*xbstream_file;
 | 
						|
	ds_stream_ctxt_t	*stream_ctxt;
 | 
						|
} ds_stream_file_t;
 | 
						|
 | 
						|
/***********************************************************************
 | 
						|
General streaming interface */
 | 
						|
 | 
						|
static ds_ctxt_t *xbstream_init(const char *root);
 | 
						|
static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path,
 | 
						|
			const MY_STAT *mystat, bool rewrite);
 | 
						|
static int xbstream_write(ds_file_t *file, const uchar *buf, size_t len);
 | 
						|
static int xbstream_seek_set(ds_file_t *file, my_off_t offset);
 | 
						|
static int xbstream_close(ds_file_t *file);
 | 
						|
static void xbstream_deinit(ds_ctxt_t *ctxt);
 | 
						|
 | 
						|
static int xbstream_rename(
 | 
						|
	ds_ctxt_t *ctxt, const char *old_path, const char *new_path);
 | 
						|
static int xbstream_mremove(ds_ctxt_t *ctxt, const char *path);
 | 
						|
 | 
						|
datasink_t datasink_xbstream = {
 | 
						|
	&xbstream_init,
 | 
						|
	&xbstream_open,
 | 
						|
	&xbstream_write,
 | 
						|
	&xbstream_seek_set,
 | 
						|
	&xbstream_close,
 | 
						|
	&dummy_remove,
 | 
						|
	&xbstream_rename,
 | 
						|
	&xbstream_mremove,
 | 
						|
	&xbstream_deinit
 | 
						|
};
 | 
						|
 | 
						|
static
 | 
						|
ssize_t
 | 
						|
my_xbstream_write_callback(void *userdata, const void *buf, size_t len)
 | 
						|
{
 | 
						|
	ds_stream_ctxt_t	*stream_ctxt;
 | 
						|
 | 
						|
	stream_ctxt = (ds_stream_ctxt_t *) userdata;
 | 
						|
 | 
						|
	xb_ad(stream_ctxt != NULL);
 | 
						|
	xb_ad(stream_ctxt->dest_file != NULL);
 | 
						|
 | 
						|
	if (!ds_write(stream_ctxt->dest_file, buf, len)) {
 | 
						|
		return len;
 | 
						|
	}
 | 
						|
	return -1;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
ds_ctxt_t *
 | 
						|
xbstream_init(const char *root __attribute__((unused)))
 | 
						|
{
 | 
						|
	ds_ctxt_t		*ctxt;
 | 
						|
	ds_stream_ctxt_t	*stream_ctxt;
 | 
						|
	xb_wstream_t *xbstream;
 | 
						|
 | 
						|
	ctxt = (ds_ctxt_t *)my_malloc(PSI_NOT_INSTRUMENTED,
 | 
						|
                    sizeof(ds_ctxt_t) + sizeof(ds_stream_ctxt_t), MYF(MY_FAE));
 | 
						|
	stream_ctxt = (ds_stream_ctxt_t *)(ctxt + 1);
 | 
						|
 | 
						|
	if (pthread_mutex_init(&stream_ctxt->mutex, NULL)) {
 | 
						|
		msg("xbstream_init: pthread_mutex_init() failed.");
 | 
						|
		goto err;
 | 
						|
	}
 | 
						|
 | 
						|
	xbstream = xb_stream_write_new(my_xbstream_write_callback, stream_ctxt);
 | 
						|
	if (xbstream == NULL) {
 | 
						|
		msg("xb_stream_write_new() failed.");
 | 
						|
		goto err;
 | 
						|
	}
 | 
						|
	stream_ctxt->xbstream = xbstream;
 | 
						|
	stream_ctxt->dest_file = NULL;
 | 
						|
 | 
						|
	ctxt->ptr = stream_ctxt;
 | 
						|
 | 
						|
	return ctxt;
 | 
						|
 | 
						|
err:
 | 
						|
	my_free(ctxt);
 | 
						|
	return NULL;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
ds_file_t *
 | 
						|
xbstream_open(ds_ctxt_t *ctxt, const char *path,
 | 
						|
	const MY_STAT *mystat, bool rewrite)
 | 
						|
{
 | 
						|
	ds_file_t		*file;
 | 
						|
	ds_stream_file_t	*stream_file;
 | 
						|
	ds_stream_ctxt_t	*stream_ctxt;
 | 
						|
	ds_ctxt_t		*dest_ctxt;
 | 
						|
	xb_wstream_t		*xbstream;
 | 
						|
	xb_wstream_file_t	*xbstream_file;
 | 
						|
 | 
						|
 | 
						|
	xb_ad(ctxt->pipe_ctxt != NULL);
 | 
						|
	dest_ctxt = ctxt->pipe_ctxt;
 | 
						|
 | 
						|
	stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
 | 
						|
 | 
						|
	pthread_mutex_lock(&stream_ctxt->mutex);
 | 
						|
	if (stream_ctxt->dest_file == NULL) {
 | 
						|
		stream_ctxt->dest_file = ds_open(dest_ctxt, path, mystat);
 | 
						|
	}
 | 
						|
	pthread_mutex_unlock(&stream_ctxt->mutex);
 | 
						|
	if (stream_ctxt->dest_file == NULL) {
 | 
						|
		return NULL;
 | 
						|
	}
 | 
						|
 | 
						|
	file = (ds_file_t *) my_malloc(PSI_NOT_INSTRUMENTED,
 | 
						|
				       sizeof(ds_file_t) +
 | 
						|
				       sizeof(ds_stream_file_t),
 | 
						|
				       MYF(MY_FAE));
 | 
						|
	if (!file) {
 | 
						|
		msg("my_malloc() failed.");
 | 
						|
		goto err;
 | 
						|
	}
 | 
						|
	stream_file = (ds_stream_file_t *) (file + 1);
 | 
						|
 | 
						|
	xbstream = stream_ctxt->xbstream;
 | 
						|
 | 
						|
	xbstream_file = xb_stream_write_open(xbstream, path, mystat, rewrite);
 | 
						|
 | 
						|
	if (xbstream_file == NULL) {
 | 
						|
		msg("xb_stream_write_open() failed.");
 | 
						|
		goto err;
 | 
						|
	}
 | 
						|
 | 
						|
	stream_file->xbstream_file = xbstream_file;
 | 
						|
	stream_file->stream_ctxt = stream_ctxt;
 | 
						|
	file->ptr = stream_file;
 | 
						|
	file->path = stream_ctxt->dest_file->path;
 | 
						|
 | 
						|
	return file;
 | 
						|
 | 
						|
err:
 | 
						|
	if (stream_ctxt->dest_file) {
 | 
						|
		ds_close(stream_ctxt->dest_file);
 | 
						|
		stream_ctxt->dest_file = NULL;
 | 
						|
	}
 | 
						|
	my_free(file);
 | 
						|
 | 
						|
	return NULL;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
int
 | 
						|
xbstream_write(ds_file_t *file, const uchar *buf, size_t len)
 | 
						|
{
 | 
						|
	ds_stream_file_t	*stream_file;
 | 
						|
	xb_wstream_file_t	*xbstream_file;
 | 
						|
 | 
						|
 | 
						|
	stream_file = (ds_stream_file_t *) file->ptr;
 | 
						|
 | 
						|
	xbstream_file = stream_file->xbstream_file;
 | 
						|
 | 
						|
	if (xb_stream_write_data(xbstream_file, buf, len)) {
 | 
						|
		msg("xb_stream_write_data() failed.");
 | 
						|
		return 1;
 | 
						|
	}
 | 
						|
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
int
 | 
						|
xbstream_seek_set(ds_file_t *file, my_off_t offset)
 | 
						|
{
 | 
						|
	ds_stream_file_t	*stream_file;
 | 
						|
	xb_wstream_file_t	*xbstream_file;
 | 
						|
 | 
						|
 | 
						|
	stream_file = (ds_stream_file_t *) file->ptr;
 | 
						|
 | 
						|
	xbstream_file = stream_file->xbstream_file;
 | 
						|
 | 
						|
	if (xb_stream_write_seek_set(xbstream_file, offset)) {
 | 
						|
		msg("xb_stream_write_seek_set() failed.");
 | 
						|
		return 1;
 | 
						|
	}
 | 
						|
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
int
 | 
						|
xbstream_mremove(ds_ctxt_t *ctxt, const char *path) {
 | 
						|
	ds_stream_ctxt_t	*stream_ctxt =
 | 
						|
		reinterpret_cast<ds_stream_ctxt_t *>(ctxt->ptr);
 | 
						|
	xb_wstream_t		*xbstream = stream_ctxt->xbstream;
 | 
						|
	return xb_stream_write_remove(xbstream, path);
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
int
 | 
						|
xbstream_rename(
 | 
						|
	ds_ctxt_t *ctxt, const char *old_path, const char *new_path) {
 | 
						|
	ds_stream_ctxt_t	*stream_ctxt =
 | 
						|
		reinterpret_cast<ds_stream_ctxt_t *>(ctxt->ptr);
 | 
						|
	xb_wstream_t		*xbstream = stream_ctxt->xbstream;
 | 
						|
	return xb_stream_write_rename(xbstream, old_path, new_path);
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
int
 | 
						|
xbstream_close(ds_file_t *file)
 | 
						|
{
 | 
						|
	ds_stream_file_t	*stream_file;
 | 
						|
	int			rc = 0;
 | 
						|
 | 
						|
	stream_file = (ds_stream_file_t *)file->ptr;
 | 
						|
 | 
						|
	rc = xb_stream_write_close(stream_file->xbstream_file);
 | 
						|
 | 
						|
	my_free(file);
 | 
						|
 | 
						|
	return rc;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
void
 | 
						|
xbstream_deinit(ds_ctxt_t *ctxt)
 | 
						|
{
 | 
						|
	ds_stream_ctxt_t	*stream_ctxt;
 | 
						|
 | 
						|
	stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
 | 
						|
 | 
						|
	if (xb_stream_write_done(stream_ctxt->xbstream)) {
 | 
						|
		msg("xb_stream_done() failed.");
 | 
						|
	}
 | 
						|
 | 
						|
	if (stream_ctxt->dest_file) {
 | 
						|
		ds_close(stream_ctxt->dest_file);
 | 
						|
		stream_ctxt->dest_file = NULL;
 | 
						|
	}
 | 
						|
 | 
						|
	pthread_mutex_destroy(&stream_ctxt->mutex);
 | 
						|
 | 
						|
	my_free(ctxt);
 | 
						|
}
 |