mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-30 02:16:32 +01:00 
			
		
		
		
	 1c55b845e0
			
		
	
	
	1c55b845e0
	
	
	
		
			
			Added support to BACKUP STAGE to maria-backup
This is a port of the code from ES 10.6
See MDEV-5336 for backup stages description.
The following old options are not supported by the new code:
--rsync             ; This is because rsync will not work on tables
                      that are in used.
--no-backup-locks   ; This is disabled as mariadb-backup will always
                      use backup locks for better performance.
		
	
			
		
			
				
	
	
		
			274 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			274 lines
		
	
	
	
		
			6.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /******************************************************
 | |
| Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
 | |
| 
 | |
| Streaming implementation for XtraBackup.
 | |
| 
 | |
| This program is free software; you can redistribute it and/or modify
 | |
| it under the terms of the GNU General Public License as published by
 | |
| the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
| This program is distributed in the hope that it will be useful,
 | |
| but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| GNU General Public License for more details.
 | |
| 
 | |
| You should have received a copy of the GNU General Public License
 | |
| along with this program; if not, write to the Free Software
 | |
| Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
 | |
| 
 | |
| *******************************************************/
 | |
| 
 | |
| #include <my_global.h>
 | |
| #include <my_base.h>
 | |
| #include "common.h"
 | |
| #include "datasink.h"
 | |
| #include "xbstream.h"
 | |
| 
 | |
| typedef struct {
 | |
| 	xb_wstream_t	*xbstream;
 | |
| 	ds_file_t	*dest_file;
 | |
| 	pthread_mutex_t	mutex;
 | |
| } ds_stream_ctxt_t;
 | |
| 
 | |
| typedef struct {
 | |
| 	xb_wstream_file_t	*xbstream_file;
 | |
| 	ds_stream_ctxt_t	*stream_ctxt;
 | |
| } ds_stream_file_t;
 | |
| 
 | |
| /***********************************************************************
 | |
| General streaming interface */
 | |
| 
 | |
| static ds_ctxt_t *xbstream_init(const char *root);
 | |
| static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path,
 | |
| 			const MY_STAT *mystat, bool rewrite);
 | |
| static int xbstream_write(ds_file_t *file, const uchar *buf, size_t len);
 | |
| static int xbstream_seek_set(ds_file_t *file, my_off_t offset);
 | |
| static int xbstream_close(ds_file_t *file);
 | |
| static void xbstream_deinit(ds_ctxt_t *ctxt);
 | |
| 
 | |
| static int xbstream_rename(
 | |
| 	ds_ctxt_t *ctxt, const char *old_path, const char *new_path);
 | |
| static int xbstream_mremove(ds_ctxt_t *ctxt, const char *path);
 | |
| 
 | |
| datasink_t datasink_xbstream = {
 | |
| 	&xbstream_init,
 | |
| 	&xbstream_open,
 | |
| 	&xbstream_write,
 | |
| 	&xbstream_seek_set,
 | |
| 	&xbstream_close,
 | |
| 	&dummy_remove,
 | |
| 	&xbstream_rename,
 | |
| 	&xbstream_mremove,
 | |
| 	&xbstream_deinit
 | |
| };
 | |
| 
 | |
| static
 | |
| ssize_t
 | |
| my_xbstream_write_callback(void *userdata, const void *buf, size_t len)
 | |
| {
 | |
| 	ds_stream_ctxt_t	*stream_ctxt;
 | |
| 
 | |
| 	stream_ctxt = (ds_stream_ctxt_t *) userdata;
 | |
| 
 | |
| 	xb_ad(stream_ctxt != NULL);
 | |
| 	xb_ad(stream_ctxt->dest_file != NULL);
 | |
| 
 | |
| 	if (!ds_write(stream_ctxt->dest_file, buf, len)) {
 | |
| 		return len;
 | |
| 	}
 | |
| 	return -1;
 | |
| }
 | |
| 
 | |
| static
 | |
| ds_ctxt_t *
 | |
| xbstream_init(const char *root __attribute__((unused)))
 | |
| {
 | |
| 	ds_ctxt_t		*ctxt;
 | |
| 	ds_stream_ctxt_t	*stream_ctxt;
 | |
| 	xb_wstream_t *xbstream;
 | |
| 
 | |
| 	ctxt = (ds_ctxt_t *)my_malloc(PSI_NOT_INSTRUMENTED,
 | |
|                     sizeof(ds_ctxt_t) + sizeof(ds_stream_ctxt_t), MYF(MY_FAE));
 | |
| 	stream_ctxt = (ds_stream_ctxt_t *)(ctxt + 1);
 | |
| 
 | |
| 	if (pthread_mutex_init(&stream_ctxt->mutex, NULL)) {
 | |
| 		msg("xbstream_init: pthread_mutex_init() failed.");
 | |
| 		goto err;
 | |
| 	}
 | |
| 
 | |
| 	xbstream = xb_stream_write_new(my_xbstream_write_callback, stream_ctxt);
 | |
| 	if (xbstream == NULL) {
 | |
| 		msg("xb_stream_write_new() failed.");
 | |
| 		goto err;
 | |
| 	}
 | |
| 	stream_ctxt->xbstream = xbstream;
 | |
| 	stream_ctxt->dest_file = NULL;
 | |
| 
 | |
| 	ctxt->ptr = stream_ctxt;
 | |
| 
 | |
| 	return ctxt;
 | |
| 
 | |
| err:
 | |
| 	my_free(ctxt);
 | |
| 	return NULL;
 | |
| }
 | |
| 
 | |
| static
 | |
| ds_file_t *
 | |
| xbstream_open(ds_ctxt_t *ctxt, const char *path,
 | |
| 	const MY_STAT *mystat, bool rewrite)
 | |
| {
 | |
| 	ds_file_t		*file;
 | |
| 	ds_stream_file_t	*stream_file;
 | |
| 	ds_stream_ctxt_t	*stream_ctxt;
 | |
| 	ds_ctxt_t		*dest_ctxt;
 | |
| 	xb_wstream_t		*xbstream;
 | |
| 	xb_wstream_file_t	*xbstream_file;
 | |
| 
 | |
| 
 | |
| 	xb_ad(ctxt->pipe_ctxt != NULL);
 | |
| 	dest_ctxt = ctxt->pipe_ctxt;
 | |
| 
 | |
| 	stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
 | |
| 
 | |
| 	pthread_mutex_lock(&stream_ctxt->mutex);
 | |
| 	if (stream_ctxt->dest_file == NULL) {
 | |
| 		stream_ctxt->dest_file = ds_open(dest_ctxt, path, mystat);
 | |
| 	}
 | |
| 	pthread_mutex_unlock(&stream_ctxt->mutex);
 | |
| 	if (stream_ctxt->dest_file == NULL) {
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	file = (ds_file_t *) my_malloc(PSI_NOT_INSTRUMENTED,
 | |
| 				       sizeof(ds_file_t) +
 | |
| 				       sizeof(ds_stream_file_t),
 | |
| 				       MYF(MY_FAE));
 | |
| 	if (!file) {
 | |
| 		msg("my_malloc() failed.");
 | |
| 		goto err;
 | |
| 	}
 | |
| 	stream_file = (ds_stream_file_t *) (file + 1);
 | |
| 
 | |
| 	xbstream = stream_ctxt->xbstream;
 | |
| 
 | |
| 	xbstream_file = xb_stream_write_open(xbstream, path, mystat, rewrite);
 | |
| 
 | |
| 	if (xbstream_file == NULL) {
 | |
| 		msg("xb_stream_write_open() failed.");
 | |
| 		goto err;
 | |
| 	}
 | |
| 
 | |
| 	stream_file->xbstream_file = xbstream_file;
 | |
| 	stream_file->stream_ctxt = stream_ctxt;
 | |
| 	file->ptr = stream_file;
 | |
| 	file->path = stream_ctxt->dest_file->path;
 | |
| 
 | |
| 	return file;
 | |
| 
 | |
| err:
 | |
| 	if (stream_ctxt->dest_file) {
 | |
| 		ds_close(stream_ctxt->dest_file);
 | |
| 		stream_ctxt->dest_file = NULL;
 | |
| 	}
 | |
| 	my_free(file);
 | |
| 
 | |
| 	return NULL;
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| xbstream_write(ds_file_t *file, const uchar *buf, size_t len)
 | |
| {
 | |
| 	ds_stream_file_t	*stream_file;
 | |
| 	xb_wstream_file_t	*xbstream_file;
 | |
| 
 | |
| 
 | |
| 	stream_file = (ds_stream_file_t *) file->ptr;
 | |
| 
 | |
| 	xbstream_file = stream_file->xbstream_file;
 | |
| 
 | |
| 	if (xb_stream_write_data(xbstream_file, buf, len)) {
 | |
| 		msg("xb_stream_write_data() failed.");
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| xbstream_seek_set(ds_file_t *file, my_off_t offset)
 | |
| {
 | |
| 	ds_stream_file_t	*stream_file;
 | |
| 	xb_wstream_file_t	*xbstream_file;
 | |
| 
 | |
| 
 | |
| 	stream_file = (ds_stream_file_t *) file->ptr;
 | |
| 
 | |
| 	xbstream_file = stream_file->xbstream_file;
 | |
| 
 | |
| 	if (xb_stream_write_seek_set(xbstream_file, offset)) {
 | |
| 		msg("xb_stream_write_seek_set() failed.");
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| xbstream_mremove(ds_ctxt_t *ctxt, const char *path) {
 | |
| 	ds_stream_ctxt_t	*stream_ctxt =
 | |
| 		reinterpret_cast<ds_stream_ctxt_t *>(ctxt->ptr);
 | |
| 	xb_wstream_t		*xbstream = stream_ctxt->xbstream;
 | |
| 	return xb_stream_write_remove(xbstream, path);
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| xbstream_rename(
 | |
| 	ds_ctxt_t *ctxt, const char *old_path, const char *new_path) {
 | |
| 	ds_stream_ctxt_t	*stream_ctxt =
 | |
| 		reinterpret_cast<ds_stream_ctxt_t *>(ctxt->ptr);
 | |
| 	xb_wstream_t		*xbstream = stream_ctxt->xbstream;
 | |
| 	return xb_stream_write_rename(xbstream, old_path, new_path);
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| xbstream_close(ds_file_t *file)
 | |
| {
 | |
| 	ds_stream_file_t	*stream_file;
 | |
| 	int			rc = 0;
 | |
| 
 | |
| 	stream_file = (ds_stream_file_t *)file->ptr;
 | |
| 
 | |
| 	rc = xb_stream_write_close(stream_file->xbstream_file);
 | |
| 
 | |
| 	my_free(file);
 | |
| 
 | |
| 	return rc;
 | |
| }
 | |
| 
 | |
| static
 | |
| void
 | |
| xbstream_deinit(ds_ctxt_t *ctxt)
 | |
| {
 | |
| 	ds_stream_ctxt_t	*stream_ctxt;
 | |
| 
 | |
| 	stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
 | |
| 
 | |
| 	if (xb_stream_write_done(stream_ctxt->xbstream)) {
 | |
| 		msg("xb_stream_done() failed.");
 | |
| 	}
 | |
| 
 | |
| 	if (stream_ctxt->dest_file) {
 | |
| 		ds_close(stream_ctxt->dest_file);
 | |
| 		stream_ctxt->dest_file = NULL;
 | |
| 	}
 | |
| 
 | |
| 	pthread_mutex_destroy(&stream_ctxt->mutex);
 | |
| 
 | |
| 	my_free(ctxt);
 | |
| }
 |