mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-03 20:36:16 +01:00 
			
		
		
		
	Added support to BACKUP STAGE to maria-backup
This is a port of the code from ES 10.6
See MDEV-5336 for backup stages description.
The following old options are not supported by the new code:
--rsync             ; This is because rsync will not work on tables
                      that are in used.
--no-backup-locks   ; This is disabled as mariadb-backup will always
                      use backup locks for better performance.
		
	
			
		
			
				
	
	
		
			480 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			480 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/******************************************************
 | 
						|
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
 | 
						|
Copyright (c) 2022, MariaDB Corporation.
 | 
						|
 | 
						|
Compressing datasink implementation for XtraBackup.
 | 
						|
 | 
						|
This program is free software; you can redistribute it and/or modify
 | 
						|
it under the terms of the GNU General Public License as published by
 | 
						|
the Free Software Foundation; version 2 of the License.
 | 
						|
 | 
						|
This program is distributed in the hope that it will be useful,
 | 
						|
but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
GNU General Public License for more details.
 | 
						|
 | 
						|
You should have received a copy of the GNU General Public License
 | 
						|
along with this program; if not, write to the Free Software
 | 
						|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
 | 
						|
 | 
						|
*******************************************************/
 | 
						|
 | 
						|
#include <my_global.h>
 | 
						|
#include <mysql_version.h>
 | 
						|
#include <my_base.h>
 | 
						|
#include <quicklz.h>
 | 
						|
#include <zlib.h>
 | 
						|
#include "common.h"
 | 
						|
#include "datasink.h"
 | 
						|
 | 
						|
#define COMPRESS_CHUNK_SIZE ((size_t) (xtrabackup_compress_chunk_size))
 | 
						|
#define MY_QLZ_COMPRESS_OVERHEAD 400
 | 
						|
 | 
						|
typedef struct {
 | 
						|
	pthread_t		id;
 | 
						|
	uint			num;
 | 
						|
	pthread_mutex_t		data_mutex;
 | 
						|
	pthread_cond_t  	avail_cond;
 | 
						|
	pthread_cond_t  	data_cond;
 | 
						|
	pthread_cond_t  	done_cond;
 | 
						|
	pthread_t		data_avail;
 | 
						|
	my_bool			cancelled;
 | 
						|
	const char 		*from;
 | 
						|
	size_t			from_len;
 | 
						|
	char			*to;
 | 
						|
	size_t			to_len;
 | 
						|
	qlz_state_compress	state;
 | 
						|
	ulong			adler;
 | 
						|
} comp_thread_ctxt_t;
 | 
						|
 | 
						|
typedef struct {
 | 
						|
	comp_thread_ctxt_t	*threads;
 | 
						|
	uint			nthreads;
 | 
						|
} ds_compress_ctxt_t;
 | 
						|
 | 
						|
typedef struct {
 | 
						|
	ds_file_t		*dest_file;
 | 
						|
	ds_compress_ctxt_t	*comp_ctxt;
 | 
						|
	size_t			bytes_processed;
 | 
						|
} ds_compress_file_t;
 | 
						|
 | 
						|
/* Compression options */
 | 
						|
extern char		*xtrabackup_compress_alg;
 | 
						|
extern uint		xtrabackup_compress_threads;
 | 
						|
extern ulonglong	xtrabackup_compress_chunk_size;
 | 
						|
 | 
						|
static ds_ctxt_t *compress_init(const char *root);
 | 
						|
static ds_file_t *compress_open(ds_ctxt_t *ctxt, const char *path,
 | 
						|
				const MY_STAT *mystat, bool rewrite);
 | 
						|
static int compress_write(ds_file_t *file, const uchar *buf, size_t len);
 | 
						|
static int compress_close(ds_file_t *file);
 | 
						|
static void compress_deinit(ds_ctxt_t *ctxt);
 | 
						|
 | 
						|
datasink_t datasink_compress = {
 | 
						|
	&compress_init,
 | 
						|
	&compress_open,
 | 
						|
	&compress_write,
 | 
						|
	nullptr,
 | 
						|
	&compress_close,
 | 
						|
	&dummy_remove,
 | 
						|
	nullptr,
 | 
						|
	nullptr,
 | 
						|
	&compress_deinit
 | 
						|
};
 | 
						|
 | 
						|
static inline int write_uint32_le(ds_file_t *file, ulong n);
 | 
						|
static inline int write_uint64_le(ds_file_t *file, ulonglong n);
 | 
						|
 | 
						|
static comp_thread_ctxt_t *create_worker_threads(uint n);
 | 
						|
static void destroy_worker_threads(comp_thread_ctxt_t *threads, uint n);
 | 
						|
static void *compress_worker_thread_func(void *arg);
 | 
						|
 | 
						|
static
 | 
						|
ds_ctxt_t *
 | 
						|
compress_init(const char *root)
 | 
						|
{
 | 
						|
	ds_ctxt_t		*ctxt;
 | 
						|
	ds_compress_ctxt_t	*compress_ctxt;
 | 
						|
	comp_thread_ctxt_t	*threads;
 | 
						|
 | 
						|
	/* Create and initialize the worker threads */
 | 
						|
	threads = create_worker_threads(xtrabackup_compress_threads);
 | 
						|
	if (threads == NULL) {
 | 
						|
		msg("compress: failed to create worker threads.");
 | 
						|
		return NULL;
 | 
						|
	}
 | 
						|
 | 
						|
	ctxt = (ds_ctxt_t *) my_malloc(PSI_NOT_INSTRUMENTED,
 | 
						|
                  sizeof(ds_ctxt_t) + sizeof(ds_compress_ctxt_t), MYF(MY_FAE));
 | 
						|
 | 
						|
	compress_ctxt = (ds_compress_ctxt_t *) (ctxt + 1);
 | 
						|
	compress_ctxt->threads = threads;
 | 
						|
	compress_ctxt->nthreads = xtrabackup_compress_threads;
 | 
						|
 | 
						|
	ctxt->ptr = compress_ctxt;
 | 
						|
	ctxt->root = my_strdup(PSI_NOT_INSTRUMENTED, root, MYF(MY_FAE));
 | 
						|
 | 
						|
	return ctxt;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
ds_file_t *
 | 
						|
compress_open(ds_ctxt_t *ctxt, const char *path,
 | 
						|
	const MY_STAT *mystat, bool rewrite)
 | 
						|
{
 | 
						|
  DBUG_ASSERT(rewrite == false);
 | 
						|
	ds_compress_ctxt_t	*comp_ctxt;
 | 
						|
	ds_ctxt_t		*dest_ctxt;
 | 
						|
 	ds_file_t		*dest_file;
 | 
						|
	char			new_name[FN_REFLEN];
 | 
						|
	size_t			name_len;
 | 
						|
	ds_file_t		*file;
 | 
						|
	ds_compress_file_t	*comp_file;
 | 
						|
 | 
						|
	xb_ad(ctxt->pipe_ctxt != NULL);
 | 
						|
	dest_ctxt = ctxt->pipe_ctxt;
 | 
						|
 | 
						|
	comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;
 | 
						|
 | 
						|
	/* Append the .qp extension to the filename */
 | 
						|
	fn_format(new_name, path, "", ".qp", MYF(MY_APPEND_EXT));
 | 
						|
 | 
						|
	dest_file = ds_open(dest_ctxt, new_name, mystat);
 | 
						|
	if (dest_file == NULL) {
 | 
						|
		return NULL;
 | 
						|
	}
 | 
						|
 | 
						|
	/* Write the qpress archive header */
 | 
						|
	if (ds_write(dest_file, "qpress10", 8) ||
 | 
						|
	    write_uint64_le(dest_file, COMPRESS_CHUNK_SIZE)) {
 | 
						|
		goto err;
 | 
						|
	}
 | 
						|
 | 
						|
	/* We are going to create a one-file "flat" (i.e. with no
 | 
						|
	subdirectories) archive. So strip the directory part from the path and
 | 
						|
	remove the '.qp' suffix. */
 | 
						|
	fn_format(new_name, path, "", "", MYF(MY_REPLACE_DIR));
 | 
						|
 | 
						|
	/* Write the qpress file header */
 | 
						|
	name_len = strlen(new_name);
 | 
						|
	if (ds_write(dest_file, "F", 1) ||
 | 
						|
	    write_uint32_le(dest_file, (uint)name_len) ||
 | 
						|
	    /* we want to write the terminating \0 as well */
 | 
						|
	    ds_write(dest_file, new_name, name_len + 1)) {
 | 
						|
		goto err;
 | 
						|
	}
 | 
						|
 | 
						|
	file = (ds_file_t *) my_malloc(PSI_NOT_INSTRUMENTED,
 | 
						|
                  sizeof(ds_file_t) + sizeof(ds_compress_file_t), MYF(MY_FAE));
 | 
						|
	comp_file = (ds_compress_file_t *) (file + 1);
 | 
						|
	comp_file->dest_file = dest_file;
 | 
						|
	comp_file->comp_ctxt = comp_ctxt;
 | 
						|
	comp_file->bytes_processed = 0;
 | 
						|
 | 
						|
	file->ptr = comp_file;
 | 
						|
	file->path = dest_file->path;
 | 
						|
 | 
						|
	return file;
 | 
						|
 | 
						|
err:
 | 
						|
	ds_close(dest_file);
 | 
						|
	return NULL;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
int
 | 
						|
compress_write(ds_file_t *file, const uchar *buf, size_t len)
 | 
						|
{
 | 
						|
	ds_compress_file_t	*comp_file;
 | 
						|
	ds_compress_ctxt_t	*comp_ctxt;
 | 
						|
	comp_thread_ctxt_t	*threads;
 | 
						|
	comp_thread_ctxt_t	*thd;
 | 
						|
	uint			nthreads;
 | 
						|
	uint			i;
 | 
						|
	const char		*ptr;
 | 
						|
	ds_file_t		*dest_file;
 | 
						|
 | 
						|
	comp_file = (ds_compress_file_t *) file->ptr;
 | 
						|
	comp_ctxt = comp_file->comp_ctxt;
 | 
						|
	dest_file = comp_file->dest_file;
 | 
						|
 | 
						|
	threads = comp_ctxt->threads;
 | 
						|
	nthreads = comp_ctxt->nthreads;
 | 
						|
 | 
						|
	const pthread_t self = pthread_self();
 | 
						|
 | 
						|
	ptr = (const char *) buf;
 | 
						|
	while (len > 0) {
 | 
						|
		bool wait = nthreads == 1;
 | 
						|
retry:
 | 
						|
		bool submitted = false;
 | 
						|
 | 
						|
		/* Send data to worker threads for compression */
 | 
						|
		for (i = 0; i < nthreads; i++) {
 | 
						|
			size_t chunk_len;
 | 
						|
 | 
						|
			thd = threads + i;
 | 
						|
 | 
						|
			pthread_mutex_lock(&thd->data_mutex);
 | 
						|
			if (thd->data_avail == pthread_t(~0UL)) {
 | 
						|
			} else if (!wait) {
 | 
						|
skip:
 | 
						|
				pthread_mutex_unlock(&thd->data_mutex);
 | 
						|
				continue;
 | 
						|
			} else {
 | 
						|
				for (;;) {
 | 
						|
					pthread_cond_wait(&thd->avail_cond,
 | 
						|
							  &thd->data_mutex);
 | 
						|
					if (thd->data_avail
 | 
						|
					    == pthread_t(~0UL)) {
 | 
						|
						break;
 | 
						|
					}
 | 
						|
					goto skip;
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
 | 
						|
				COMPRESS_CHUNK_SIZE : len;
 | 
						|
			thd->from = ptr;
 | 
						|
			thd->from_len = chunk_len;
 | 
						|
 | 
						|
			thd->data_avail = self;
 | 
						|
			pthread_cond_signal(&thd->data_cond);
 | 
						|
			pthread_mutex_unlock(&thd->data_mutex);
 | 
						|
 | 
						|
			submitted = true;
 | 
						|
			len -= chunk_len;
 | 
						|
			if (len == 0) {
 | 
						|
				break;
 | 
						|
			}
 | 
						|
			ptr += chunk_len;
 | 
						|
		}
 | 
						|
 | 
						|
		if (!submitted) {
 | 
						|
			wait = true;
 | 
						|
			goto retry;
 | 
						|
		}
 | 
						|
 | 
						|
		for (i = 0; i < nthreads; i++) {
 | 
						|
			thd = threads + i;
 | 
						|
 | 
						|
			pthread_mutex_lock(&thd->data_mutex);
 | 
						|
			if (thd->data_avail != self) {
 | 
						|
				pthread_mutex_unlock(&thd->data_mutex);
 | 
						|
				continue;
 | 
						|
			}
 | 
						|
 | 
						|
			while (!thd->to_len) {
 | 
						|
				pthread_cond_wait(&thd->done_cond,
 | 
						|
						  &thd->data_mutex);
 | 
						|
			}
 | 
						|
 | 
						|
			bool fail = ds_write(dest_file, "NEWBNEWB", 8) ||
 | 
						|
				write_uint64_le(dest_file,
 | 
						|
						comp_file->bytes_processed);
 | 
						|
			comp_file->bytes_processed += thd->from_len;
 | 
						|
 | 
						|
			if (!fail) {
 | 
						|
				fail = write_uint32_le(dest_file, thd->adler) ||
 | 
						|
					ds_write(dest_file, thd->to,
 | 
						|
						 thd->to_len);
 | 
						|
			}
 | 
						|
 | 
						|
			thd->to_len = 0;
 | 
						|
			thd->data_avail = pthread_t(~0UL);
 | 
						|
			pthread_cond_signal(&thd->avail_cond);
 | 
						|
			pthread_mutex_unlock(&thd->data_mutex);
 | 
						|
 | 
						|
			if (fail) {
 | 
						|
				msg("compress: write to the destination stream "
 | 
						|
				    "failed.");
 | 
						|
				return 1;
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return 0;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
int
 | 
						|
compress_close(ds_file_t *file)
 | 
						|
{
 | 
						|
	ds_compress_file_t	*comp_file;
 | 
						|
	ds_file_t		*dest_file;
 | 
						|
	int			rc;
 | 
						|
 | 
						|
	comp_file = (ds_compress_file_t *) file->ptr;
 | 
						|
	dest_file = comp_file->dest_file;
 | 
						|
 | 
						|
	/* Write the qpress file trailer */
 | 
						|
	ds_write(dest_file, "ENDSENDS", 8);
 | 
						|
 | 
						|
	/* Supposedly the number of written bytes should be written as a
 | 
						|
	"recovery information" in the file trailer, but in reality qpress
 | 
						|
	always writes 8 zeros here. Let's do the same */
 | 
						|
 | 
						|
	write_uint64_le(dest_file, 0);
 | 
						|
 | 
						|
	rc = ds_close(dest_file);
 | 
						|
 | 
						|
	my_free(file);
 | 
						|
 | 
						|
	return rc;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
void
 | 
						|
compress_deinit(ds_ctxt_t *ctxt)
 | 
						|
{
 | 
						|
	ds_compress_ctxt_t 	*comp_ctxt;
 | 
						|
 | 
						|
	xb_ad(ctxt->pipe_ctxt != NULL);
 | 
						|
 | 
						|
	comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;
 | 
						|
 | 
						|
	destroy_worker_threads(comp_ctxt->threads, comp_ctxt->nthreads);
 | 
						|
 | 
						|
	my_free(ctxt->root);
 | 
						|
	my_free(ctxt);
 | 
						|
}
 | 
						|
 | 
						|
static inline
 | 
						|
int
 | 
						|
write_uint32_le(ds_file_t *file, ulong n)
 | 
						|
{
 | 
						|
	char tmp[4];
 | 
						|
 | 
						|
	int4store(tmp, n);
 | 
						|
	return ds_write(file, tmp, sizeof(tmp));
 | 
						|
}
 | 
						|
 | 
						|
static inline
 | 
						|
int
 | 
						|
write_uint64_le(ds_file_t *file, ulonglong n)
 | 
						|
{
 | 
						|
	char tmp[8];
 | 
						|
 | 
						|
	int8store(tmp, n);
 | 
						|
	return ds_write(file, tmp, sizeof(tmp));
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
void
 | 
						|
destroy_worker_thread(comp_thread_ctxt_t *thd)
 | 
						|
{
 | 
						|
	pthread_mutex_lock(&thd->data_mutex);
 | 
						|
	thd->cancelled = TRUE;
 | 
						|
	pthread_cond_signal(&thd->data_cond);
 | 
						|
	pthread_mutex_unlock(&thd->data_mutex);
 | 
						|
 | 
						|
	pthread_join(thd->id, NULL);
 | 
						|
 | 
						|
	pthread_cond_destroy(&thd->avail_cond);
 | 
						|
	pthread_cond_destroy(&thd->data_cond);
 | 
						|
	pthread_cond_destroy(&thd->done_cond);
 | 
						|
	pthread_mutex_destroy(&thd->data_mutex);
 | 
						|
 | 
						|
	my_free(thd->to);
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
comp_thread_ctxt_t *
 | 
						|
create_worker_threads(uint n)
 | 
						|
{
 | 
						|
	comp_thread_ctxt_t	*threads;
 | 
						|
	uint 			i;
 | 
						|
 | 
						|
	threads = static_cast<comp_thread_ctxt_t*>
 | 
						|
		(my_malloc(PSI_NOT_INSTRUMENTED, n * sizeof *threads,
 | 
						|
			   MYF(MY_ZEROFILL|MY_FAE)));
 | 
						|
 | 
						|
	for (i = 0; i < n; i++) {
 | 
						|
		comp_thread_ctxt_t *thd = threads + i;
 | 
						|
 | 
						|
		thd->num = i + 1;
 | 
						|
		thd->to = static_cast<char*>
 | 
						|
			(my_malloc(PSI_NOT_INSTRUMENTED,
 | 
						|
				   COMPRESS_CHUNK_SIZE +
 | 
						|
				   MY_QLZ_COMPRESS_OVERHEAD,
 | 
						|
				   MYF(MY_FAE)));
 | 
						|
 | 
						|
		/* Initialize and data mutex and condition var */
 | 
						|
		if (pthread_mutex_init(&thd->data_mutex, NULL) ||
 | 
						|
		    pthread_cond_init(&thd->avail_cond, NULL) ||
 | 
						|
		    pthread_cond_init(&thd->data_cond, NULL) ||
 | 
						|
		    pthread_cond_init(&thd->done_cond, NULL)) {
 | 
						|
			goto err;
 | 
						|
		}
 | 
						|
 | 
						|
		thd->data_avail = pthread_t(~0UL);
 | 
						|
 | 
						|
		if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
 | 
						|
				   thd)) {
 | 
						|
			msg("compress: pthread_create() failed: "
 | 
						|
			    "errno = %d", errno);
 | 
						|
			goto err;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return threads;
 | 
						|
 | 
						|
err:
 | 
						|
	for (; i; i--) {
 | 
						|
		destroy_worker_thread(threads + i);
 | 
						|
	}
 | 
						|
 | 
						|
	my_free(threads);
 | 
						|
	return NULL;
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
void
 | 
						|
destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
 | 
						|
{
 | 
						|
	uint i;
 | 
						|
 | 
						|
	for (i = 0; i < n; i++) {
 | 
						|
		destroy_worker_thread(threads + i);
 | 
						|
	}
 | 
						|
 | 
						|
	my_free(threads);
 | 
						|
}
 | 
						|
 | 
						|
static
 | 
						|
void *
 | 
						|
compress_worker_thread_func(void *arg)
 | 
						|
{
 | 
						|
	comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
 | 
						|
 | 
						|
	pthread_mutex_lock(&thd->data_mutex);
 | 
						|
 | 
						|
	while (1) {
 | 
						|
		while (!thd->cancelled
 | 
						|
		       && (thd->to_len || thd->data_avail == pthread_t(~0UL))) {
 | 
						|
			pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
 | 
						|
		}
 | 
						|
 | 
						|
		if (thd->cancelled)
 | 
						|
			break;
 | 
						|
		thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
 | 
						|
					   &thd->state);
 | 
						|
 | 
						|
		/* qpress uses 0x00010000 as the initial value, but its own
 | 
						|
		Adler-32 implementation treats the value differently:
 | 
						|
		  1. higher order bits are the sum of all bytes in the sequence
 | 
						|
		  2. lower order bits are the sum of resulting values at every
 | 
						|
		     step.
 | 
						|
		So it's the other way around as compared to zlib's adler32().
 | 
						|
		That's why  0x00000001 is being passed here to be compatible
 | 
						|
		with qpress implementation. */
 | 
						|
 | 
						|
		thd->adler = adler32(0x00000001, (uchar *) thd->to,
 | 
						|
				     (uInt)thd->to_len);
 | 
						|
		pthread_cond_signal(&thd->done_cond);
 | 
						|
	}
 | 
						|
 | 
						|
	pthread_mutex_unlock(&thd->data_mutex);
 | 
						|
 | 
						|
	return NULL;
 | 
						|
}
 |