mirror of
https://github.com/MariaDB/server.git
synced 2025-01-15 19:42:28 +01:00
609 lines
14 KiB
C++
609 lines
14 KiB
C++
/******************************************************
|
|
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
|
|
|
|
The xbstream utility: serialize/deserialize files in the XBSTREAM format.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; version 2 of the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
|
|
|
|
*******************************************************/
|
|
|
|
#define VER "1.0"
|
|
#include <my_global.h>
|
|
#include <my_base.h>
|
|
#include <my_getopt.h>
|
|
#include <hash.h>
|
|
#include <my_pthread.h>
|
|
#include "common.h"
|
|
#include "xbstream.h"
|
|
#include "datasink.h"
|
|
#include <welcome_copyright_notice.h>
|
|
|
|
#define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL)
|
|
|
|
#define START_FILE_HASH_SIZE 16
|
|
|
|
typedef enum {
|
|
RUN_MODE_NONE,
|
|
RUN_MODE_CREATE,
|
|
RUN_MODE_EXTRACT
|
|
} run_mode_t;
|
|
|
|
/* Need the following definitions to avoid linking with ds_*.o and their link
|
|
dependencies */
|
|
datasink_t datasink_archive;
|
|
datasink_t datasink_xbstream;
|
|
datasink_t datasink_compress;
|
|
datasink_t datasink_tmpfile;
|
|
|
|
static run_mode_t opt_mode;
|
|
static char * opt_directory = NULL;
|
|
static my_bool opt_verbose = 0;
|
|
static int opt_parallel = 1;
|
|
|
|
static struct my_option my_long_options[] =
|
|
{
|
|
{"help", '?', "Display this help and exit.",
|
|
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
|
|
{"create", 'c', "Stream the specified files to the standard output.",
|
|
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
|
|
{"extract", 'x', "Extract to disk files from the stream on the "
|
|
"standard input.",
|
|
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
|
|
{"directory", 'C', "Change the current directory to the specified one "
|
|
"before streaming or extracting.", &opt_directory, &opt_directory, 0,
|
|
GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
|
|
{"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,
|
|
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
|
|
{"parallel", 'p', "Number of worker threads for reading / writing.",
|
|
&opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG,
|
|
1, 1, INT_MAX, 0, 0, 0},
|
|
|
|
{0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
|
|
};
|
|
|
|
typedef struct {
|
|
HASH *filehash;
|
|
xb_rstream_t *stream;
|
|
ds_ctxt_t *ds_ctxt;
|
|
pthread_mutex_t *mutex;
|
|
} extract_ctxt_t;
|
|
|
|
typedef struct {
|
|
char *path;
|
|
uint pathlen;
|
|
my_off_t offset;
|
|
ds_file_t *file;
|
|
pthread_mutex_t mutex;
|
|
} file_entry_t;
|
|
|
|
static int get_options(int *argc, char ***argv);
|
|
static int mode_create(int argc, char **argv);
|
|
static int mode_extract(int n_threads, int argc, char **argv);
|
|
static my_bool get_one_option(const struct my_option *opt,
|
|
const char *argument, const char *filename);
|
|
|
|
int
|
|
main(int argc, char **argv)
|
|
{
|
|
MY_INIT(argv[0]);
|
|
|
|
if (get_options(&argc, &argv)) {
|
|
goto err;
|
|
}
|
|
|
|
if (opt_mode == RUN_MODE_NONE) {
|
|
msg("%s: either -c or -x must be specified.", my_progname);
|
|
goto err;
|
|
}
|
|
|
|
/* Change the current directory if -C is specified */
|
|
if (opt_directory && my_setwd(opt_directory, MYF(MY_WME))) {
|
|
goto err;
|
|
}
|
|
|
|
if (opt_mode == RUN_MODE_CREATE && mode_create(argc, argv)) {
|
|
goto err;
|
|
} else if (opt_mode == RUN_MODE_EXTRACT &&
|
|
mode_extract(opt_parallel, argc, argv)) {
|
|
goto err;
|
|
}
|
|
|
|
my_cleanup_options(my_long_options);
|
|
|
|
my_end(0);
|
|
|
|
return EXIT_SUCCESS;
|
|
err:
|
|
my_cleanup_options(my_long_options);
|
|
|
|
my_end(0);
|
|
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
|
|
static
|
|
int
|
|
get_options(int *argc, char ***argv)
|
|
{
|
|
int ho_error;
|
|
|
|
if ((ho_error= handle_options(argc, argv, my_long_options,
|
|
get_one_option)))
|
|
{
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
void
|
|
usage(void)
|
|
{
|
|
print_version();
|
|
puts("Copyright (C) 2011-2013 Percona LLC and/or its affiliates.");
|
|
puts("This software comes with ABSOLUTELY NO WARRANTY. "
|
|
"This is free software,\nand you are welcome to modify and "
|
|
"redistribute it under the GPL license.\n");
|
|
|
|
puts("Serialize/deserialize files in the XBSTREAM format.\n");
|
|
|
|
puts("Usage: ");
|
|
printf(" %s -c [OPTIONS...] FILES... # stream specified files to "
|
|
"standard output.\n", my_progname);
|
|
printf(" %s -x [OPTIONS...] # extract files from the stream"
|
|
"on the standard input.\n", my_progname);
|
|
|
|
puts("\nOptions:");
|
|
my_print_help(my_long_options);
|
|
}
|
|
|
|
static
|
|
int
|
|
set_run_mode(run_mode_t mode)
|
|
{
|
|
if (opt_mode != RUN_MODE_NONE) {
|
|
msg("%s: can't set specify both -c and -x.", my_progname);
|
|
return 1;
|
|
}
|
|
|
|
opt_mode = mode;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
my_bool
|
|
get_one_option(const struct my_option *opt, const char *, const char *)
|
|
{
|
|
switch (opt->id) {
|
|
case 'c':
|
|
if (set_run_mode(RUN_MODE_CREATE)) {
|
|
return TRUE;
|
|
}
|
|
break;
|
|
case 'x':
|
|
if (set_run_mode(RUN_MODE_EXTRACT)) {
|
|
return TRUE;
|
|
}
|
|
break;
|
|
case '?':
|
|
usage();
|
|
exit(0);
|
|
}
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
static
|
|
int
|
|
stream_one_file(File file, xb_wstream_file_t *xbfile)
|
|
{
|
|
uchar *buf;
|
|
ssize_t bytes;
|
|
my_off_t offset;
|
|
|
|
posix_fadvise(file, 0, 0, POSIX_FADV_SEQUENTIAL);
|
|
offset = my_tell(file, MYF(MY_WME));
|
|
|
|
buf = (uchar*)(my_malloc(PSI_NOT_INSTRUMENTED, XBSTREAM_BUFFER_SIZE, MYF(MY_FAE)));
|
|
|
|
while ((bytes = (ssize_t)my_read(file, buf, XBSTREAM_BUFFER_SIZE,
|
|
MYF(MY_WME))) > 0) {
|
|
if (xb_stream_write_data(xbfile, buf, bytes)) {
|
|
msg("%s: xb_stream_write_data() failed.",
|
|
my_progname);
|
|
my_free(buf);
|
|
return 1;
|
|
}
|
|
posix_fadvise(file, offset, XBSTREAM_BUFFER_SIZE,
|
|
POSIX_FADV_DONTNEED);
|
|
offset += XBSTREAM_BUFFER_SIZE;
|
|
|
|
}
|
|
|
|
my_free(buf);
|
|
|
|
if (bytes < 0) {
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
int
|
|
mode_create(int argc, char **argv)
|
|
{
|
|
int i;
|
|
MY_STAT mystat;
|
|
xb_wstream_t *stream;
|
|
|
|
if (argc < 1) {
|
|
msg("%s: no files are specified.", my_progname);
|
|
return 1;
|
|
}
|
|
|
|
stream = xb_stream_write_new(nullptr, nullptr);
|
|
if (stream == NULL) {
|
|
msg("%s: xb_stream_write_new() failed.", my_progname);
|
|
return 1;
|
|
}
|
|
|
|
for (i = 0; i < argc; i++) {
|
|
char *filepath = argv[i];
|
|
File src_file;
|
|
xb_wstream_file_t *file;
|
|
|
|
if (my_stat(filepath, &mystat, MYF(MY_WME)) == NULL) {
|
|
goto err;
|
|
}
|
|
if (!MY_S_ISREG(mystat.st_mode)) {
|
|
msg("%s: %s is not a regular file, exiting.",
|
|
my_progname, filepath);
|
|
goto err;
|
|
}
|
|
|
|
if ((src_file = my_open(filepath, O_RDONLY, MYF(MY_WME))) < 0) {
|
|
msg("%s: failed to open %s.", my_progname, filepath);
|
|
goto err;
|
|
}
|
|
|
|
file = xb_stream_write_open(stream, filepath, &mystat, false);
|
|
if (file == NULL) {
|
|
goto err;
|
|
}
|
|
|
|
if (opt_verbose) {
|
|
msg("%s", filepath);
|
|
}
|
|
|
|
if (stream_one_file(src_file, file) ||
|
|
xb_stream_write_close(file) ||
|
|
my_close(src_file, MYF(MY_WME))) {
|
|
goto err;
|
|
}
|
|
}
|
|
|
|
xb_stream_write_done(stream);
|
|
|
|
return 0;
|
|
err:
|
|
xb_stream_write_done(stream);
|
|
|
|
return 1;
|
|
}
|
|
|
|
static
|
|
file_entry_t *
|
|
file_entry_new(extract_ctxt_t *ctxt, const char *path, uint pathlen,
|
|
uchar chunk_flags)
|
|
{
|
|
file_entry_t *entry;
|
|
ds_file_t *file;
|
|
|
|
entry = (file_entry_t *) my_malloc(PSI_NOT_INSTRUMENTED, sizeof(file_entry_t),
|
|
MYF(MY_WME | MY_ZEROFILL));
|
|
if (entry == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
entry->path = my_strndup(PSI_NOT_INSTRUMENTED, path, pathlen, MYF(MY_WME));
|
|
if (entry->path == NULL) {
|
|
goto err;
|
|
}
|
|
entry->pathlen = pathlen;
|
|
|
|
file = ds_open(ctxt->ds_ctxt, path, NULL,
|
|
chunk_flags == XB_STREAM_FLAG_REWRITE);
|
|
|
|
if (file == NULL) {
|
|
msg("%s: failed to create file.", my_progname);
|
|
goto err;
|
|
}
|
|
|
|
if (opt_verbose) {
|
|
msg("%s", entry->path);
|
|
}
|
|
|
|
entry->file = file;
|
|
|
|
pthread_mutex_init(&entry->mutex, NULL);
|
|
|
|
return entry;
|
|
|
|
err:
|
|
if (entry->path != NULL) {
|
|
my_free(entry->path);
|
|
}
|
|
my_free(entry);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static
|
|
const uchar *
|
|
get_file_entry_key(const void *entry_, size_t *length, my_bool)
|
|
{
|
|
const file_entry_t *entry= static_cast<const file_entry_t *>(entry_);
|
|
*length= entry->pathlen;
|
|
return reinterpret_cast<const uchar *>(entry->path);
|
|
}
|
|
|
|
static
|
|
void
|
|
file_entry_free(void *entry_)
|
|
{
|
|
file_entry_t *entry= static_cast<file_entry_t *>(entry_);
|
|
pthread_mutex_destroy(&entry->mutex);
|
|
ds_close(entry->file);
|
|
my_free(entry->path);
|
|
my_free(entry);
|
|
}
|
|
|
|
static
|
|
void *
|
|
extract_worker_thread_func(void *arg)
|
|
{
|
|
xb_rstream_chunk_t chunk;
|
|
file_entry_t *entry;
|
|
xb_rstream_result_t res;
|
|
|
|
extract_ctxt_t *ctxt = (extract_ctxt_t *) arg;
|
|
|
|
my_thread_init();
|
|
|
|
memset(&chunk, 0, sizeof(chunk));
|
|
|
|
while (1) {
|
|
|
|
pthread_mutex_lock(ctxt->mutex);
|
|
res = xb_stream_read_chunk(ctxt->stream, &chunk);
|
|
|
|
if (res != XB_STREAM_READ_CHUNK) {
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
break;
|
|
}
|
|
|
|
/* If unknown type and ignorable flag is set, skip this chunk */
|
|
if (chunk.type == XB_CHUNK_TYPE_UNKNOWN && \
|
|
!(chunk.flags & XB_STREAM_FLAG_IGNORABLE)) {
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
continue;
|
|
}
|
|
|
|
/* See if we already have this file open */
|
|
entry = (file_entry_t *) my_hash_search(ctxt->filehash,
|
|
(uchar *) chunk.path,
|
|
chunk.pathlen);
|
|
|
|
if (entry && (chunk.type == XB_CHUNK_TYPE_REMOVE ||
|
|
chunk.type == XB_CHUNK_TYPE_RENAME)) {
|
|
msg("%s: rename and remove chunks can not be applied to opened file: %s",
|
|
my_progname, chunk.path);
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
break;
|
|
}
|
|
|
|
if (chunk.type == XB_CHUNK_TYPE_REMOVE) {
|
|
if (ds_remove(ctxt->ds_ctxt, chunk.path)) {
|
|
msg("%s: error on file removing: %s", my_progname, chunk.path);
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
res = XB_STREAM_READ_ERROR;
|
|
break;
|
|
}
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
continue;
|
|
}
|
|
|
|
if (chunk.type == XB_CHUNK_TYPE_RENAME) {
|
|
if (my_hash_search(ctxt->filehash,
|
|
reinterpret_cast<const uchar *>(chunk.data), chunk.length)) {
|
|
msg("%s: rename chunks can not be applied to opened file: %s",
|
|
my_progname, reinterpret_cast<const uchar *>(chunk.data));
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
break;
|
|
}
|
|
if (ds_rename(ctxt->ds_ctxt, chunk.path,
|
|
reinterpret_cast<const char *>(chunk.data))) {
|
|
msg("%s: error on file renaming: %s to %s", my_progname,
|
|
reinterpret_cast<const char *>(chunk.data), chunk.path);
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
res = XB_STREAM_READ_ERROR;
|
|
break;
|
|
}
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
continue;
|
|
}
|
|
|
|
if (entry == NULL) {
|
|
entry = file_entry_new(ctxt,
|
|
chunk.path,
|
|
chunk.pathlen,
|
|
chunk.flags);
|
|
if (entry == NULL) {
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
break;
|
|
}
|
|
if (my_hash_insert(ctxt->filehash, (uchar *) entry)) {
|
|
msg("%s: my_hash_insert() failed.",
|
|
my_progname);
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
break;
|
|
}
|
|
}
|
|
|
|
pthread_mutex_lock(&entry->mutex);
|
|
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
|
|
if (chunk.type == XB_CHUNK_TYPE_SEEK) {
|
|
if (ds_seek_set(entry->file, chunk.offset)) {
|
|
msg("%s: my_seek() failed.", my_progname);
|
|
pthread_mutex_unlock(&entry->mutex);
|
|
res = XB_STREAM_READ_ERROR;
|
|
break;
|
|
}
|
|
entry->offset = chunk.offset;
|
|
pthread_mutex_unlock(&entry->mutex);
|
|
continue;
|
|
}
|
|
|
|
res = xb_stream_validate_checksum(&chunk);
|
|
|
|
if (res != XB_STREAM_READ_CHUNK) {
|
|
pthread_mutex_unlock(&entry->mutex);
|
|
break;
|
|
}
|
|
|
|
if (chunk.type == XB_CHUNK_TYPE_EOF) {
|
|
pthread_mutex_unlock(&entry->mutex);
|
|
pthread_mutex_lock(ctxt->mutex);
|
|
my_hash_delete(ctxt->filehash, (uchar *) entry);
|
|
pthread_mutex_unlock(ctxt->mutex);
|
|
|
|
continue;
|
|
}
|
|
|
|
if (entry->offset != chunk.offset) {
|
|
msg("%s: out-of-order chunk: real offset = 0x%llx, "
|
|
"expected offset = 0x%llx", my_progname,
|
|
chunk.offset, entry->offset);
|
|
pthread_mutex_unlock(&entry->mutex);
|
|
res = XB_STREAM_READ_ERROR;
|
|
break;
|
|
}
|
|
|
|
if (ds_write(entry->file, chunk.data, chunk.length)) {
|
|
msg("%s: my_write() failed.", my_progname);
|
|
pthread_mutex_unlock(&entry->mutex);
|
|
res = XB_STREAM_READ_ERROR;
|
|
break;
|
|
}
|
|
|
|
entry->offset += chunk.length;
|
|
|
|
pthread_mutex_unlock(&entry->mutex);
|
|
}
|
|
|
|
if (chunk.data)
|
|
my_free(chunk.data);
|
|
|
|
my_thread_end();
|
|
|
|
return (void *)(res);
|
|
}
|
|
|
|
|
|
static
|
|
int
|
|
mode_extract(int n_threads, int argc __attribute__((unused)),
|
|
char **argv __attribute__((unused)))
|
|
{
|
|
xb_rstream_t *stream = NULL;
|
|
HASH filehash;
|
|
ds_ctxt_t *ds_ctxt = NULL;
|
|
extract_ctxt_t ctxt;
|
|
int i;
|
|
pthread_t *tids = NULL;
|
|
void **retvals = NULL;
|
|
pthread_mutex_t mutex;
|
|
int ret = 0;
|
|
|
|
if (my_hash_init(PSI_NOT_INSTRUMENTED, &filehash, &my_charset_bin,
|
|
START_FILE_HASH_SIZE, 0, 0, get_file_entry_key,
|
|
file_entry_free, MYF(0)))
|
|
{
|
|
msg("%s: failed to initialize file hash.", my_progname);
|
|
return 1;
|
|
}
|
|
|
|
if (pthread_mutex_init(&mutex, NULL)) {
|
|
msg("%s: failed to initialize mutex.", my_progname);
|
|
my_hash_free(&filehash);
|
|
return 1;
|
|
}
|
|
|
|
/* If --directory is specified, it is already set as CWD by now. */
|
|
ds_ctxt = ds_create(".", DS_TYPE_LOCAL);
|
|
if (ds_ctxt == NULL) {
|
|
ret = 1;
|
|
goto exit;
|
|
}
|
|
|
|
|
|
stream = xb_stream_read_new();
|
|
if (stream == NULL) {
|
|
msg("%s: xb_stream_read_new() failed.", my_progname);
|
|
pthread_mutex_destroy(&mutex);
|
|
ret = 1;
|
|
goto exit;
|
|
}
|
|
|
|
ctxt.stream = stream;
|
|
ctxt.filehash = &filehash;
|
|
ctxt.ds_ctxt = ds_ctxt;
|
|
ctxt.mutex = &mutex;
|
|
|
|
tids = (pthread_t *)calloc(n_threads, sizeof(pthread_t));
|
|
retvals = (void **)calloc(n_threads, sizeof(void*));
|
|
|
|
for (i = 0; i < n_threads; i++)
|
|
pthread_create(tids + i, NULL, extract_worker_thread_func,
|
|
&ctxt);
|
|
|
|
for (i = 0; i < n_threads; i++)
|
|
pthread_join(tids[i], retvals + i);
|
|
|
|
for (i = 0; i < n_threads; i++) {
|
|
if ((size_t)retvals[i] == XB_STREAM_READ_ERROR) {
|
|
ret = 1;
|
|
goto exit;
|
|
}
|
|
}
|
|
|
|
exit:
|
|
pthread_mutex_destroy(&mutex);
|
|
|
|
free(tids);
|
|
free(retvals);
|
|
|
|
my_hash_free(&filehash);
|
|
if (ds_ctxt != NULL) {
|
|
ds_destroy(ds_ctxt);
|
|
}
|
|
xb_stream_read_done(stream);
|
|
|
|
return ret;
|
|
}
|