mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-25 08:58:14 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			609 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			609 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /******************************************************
 | |
| Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
 | |
| 
 | |
| The xbstream utility: serialize/deserialize files in the XBSTREAM format.
 | |
| 
 | |
| This program is free software; you can redistribute it and/or modify
 | |
| it under the terms of the GNU General Public License as published by
 | |
| the Free Software Foundation; version 2 of the License.
 | |
| 
 | |
| This program is distributed in the hope that it will be useful,
 | |
| but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| GNU General Public License for more details.
 | |
| 
 | |
| You should have received a copy of the GNU General Public License
 | |
| along with this program; if not, write to the Free Software
 | |
| Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
 | |
| 
 | |
| *******************************************************/
 | |
| 
 | |
| #define VER "1.0"
 | |
| #include <my_global.h>
 | |
| #include <my_base.h>
 | |
| #include <my_getopt.h>
 | |
| #include <hash.h>
 | |
| #include <my_pthread.h>
 | |
| #include "common.h"
 | |
| #include "xbstream.h"
 | |
| #include "datasink.h"
 | |
| #include <welcome_copyright_notice.h>
 | |
| 
 | |
| #define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL)
 | |
| 
 | |
| #define START_FILE_HASH_SIZE 16
 | |
| 
 | |
| typedef enum {
 | |
| 	RUN_MODE_NONE,
 | |
| 	RUN_MODE_CREATE,
 | |
| 	RUN_MODE_EXTRACT
 | |
| } run_mode_t;
 | |
| 
 | |
| /* Need the following definitions to avoid linking with ds_*.o and their link
 | |
| dependencies */
 | |
| datasink_t datasink_archive;
 | |
| datasink_t datasink_xbstream;
 | |
| datasink_t datasink_compress;
 | |
| datasink_t datasink_tmpfile;
 | |
| 
 | |
| static run_mode_t	opt_mode;
 | |
| static char *		opt_directory = NULL;
 | |
| static my_bool		opt_verbose = 0;
 | |
| static int		opt_parallel = 1;
 | |
| 
 | |
| static struct my_option my_long_options[] =
 | |
| {
 | |
| 	{"help", '?', "Display this help and exit.",
 | |
| 	 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
 | |
| 	{"create", 'c', "Stream the specified files to the standard output.",
 | |
| 	 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
 | |
| 	{"extract", 'x', "Extract to disk files from the stream on the "
 | |
| 	 "standard input.",
 | |
| 	 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
 | |
| 	{"directory", 'C', "Change the current directory to the specified one "
 | |
| 	 "before streaming or extracting.", &opt_directory, &opt_directory, 0,
 | |
| 	 GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
 | |
| 	{"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,
 | |
| 	 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
 | |
| 	{"parallel", 'p', "Number of worker threads for reading / writing.",
 | |
| 	 &opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG,
 | |
| 	 1, 1, INT_MAX, 0, 0, 0},
 | |
| 
 | |
| 	{0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
 | |
| };
 | |
| 
 | |
| typedef struct {
 | |
| 	HASH			*filehash;
 | |
| 	xb_rstream_t		*stream;
 | |
| 	ds_ctxt_t		*ds_ctxt;
 | |
| 	pthread_mutex_t		*mutex;
 | |
| } extract_ctxt_t;
 | |
| 
 | |
| typedef struct {
 | |
| 	char 		*path;
 | |
| 	uint		pathlen;
 | |
| 	my_off_t	offset;
 | |
| 	ds_file_t	*file;
 | |
| 	pthread_mutex_t	mutex;
 | |
| } file_entry_t;
 | |
| 
 | |
| static int get_options(int *argc, char ***argv);
 | |
| static int mode_create(int argc, char **argv);
 | |
| static int mode_extract(int n_threads, int argc, char **argv);
 | |
| static my_bool get_one_option(const struct my_option *opt,
 | |
| 			      const char *argument, const char *filename);
 | |
| 
 | |
| int
 | |
| main(int argc, char **argv)
 | |
| {
 | |
| 	MY_INIT(argv[0]);
 | |
| 
 | |
| 	if (get_options(&argc, &argv)) {
 | |
| 		goto err;
 | |
| 	}
 | |
| 
 | |
| 	if (opt_mode == RUN_MODE_NONE) {
 | |
| 		msg("%s: either -c or -x must be specified.", my_progname);
 | |
| 		goto err;
 | |
| 	}
 | |
| 
 | |
| 	/* Change the current directory if -C is specified */
 | |
| 	if (opt_directory && my_setwd(opt_directory, MYF(MY_WME))) {
 | |
| 		goto err;
 | |
| 	}
 | |
| 
 | |
| 	if (opt_mode == RUN_MODE_CREATE && mode_create(argc, argv)) {
 | |
| 		goto err;
 | |
| 	} else if (opt_mode == RUN_MODE_EXTRACT &&
 | |
| 		   mode_extract(opt_parallel, argc, argv)) {
 | |
| 		goto err;
 | |
| 	}
 | |
| 
 | |
| 	my_cleanup_options(my_long_options);
 | |
| 
 | |
| 	my_end(0);
 | |
| 
 | |
| 	return EXIT_SUCCESS;
 | |
| err:
 | |
| 	my_cleanup_options(my_long_options);
 | |
| 
 | |
| 	my_end(0);
 | |
| 
 | |
| 	exit(EXIT_FAILURE);
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| get_options(int *argc, char ***argv)
 | |
| {
 | |
| 	int ho_error;
 | |
| 
 | |
| 	if ((ho_error= handle_options(argc, argv, my_long_options,
 | |
| 				      get_one_option)))
 | |
|         {
 | |
| 		exit(EXIT_FAILURE);
 | |
| 	}
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static
 | |
| void
 | |
| usage(void)
 | |
| {
 | |
| 	print_version();
 | |
| 	puts("Copyright (C) 2011-2013 Percona LLC and/or its affiliates.");
 | |
| 	puts("This software comes with ABSOLUTELY NO WARRANTY. "
 | |
| 	     "This is free software,\nand you are welcome to modify and "
 | |
| 	     "redistribute it under the GPL license.\n");
 | |
| 
 | |
| 	puts("Serialize/deserialize files in the XBSTREAM format.\n");
 | |
| 
 | |
| 	puts("Usage: ");
 | |
| 	printf("  %s -c [OPTIONS...] FILES...	# stream specified files to "
 | |
| 	       "standard output.\n", my_progname);
 | |
| 	printf("  %s -x [OPTIONS...]		# extract files from the stream"
 | |
| 	       "on the standard input.\n", my_progname);
 | |
| 
 | |
| 	puts("\nOptions:");
 | |
| 	my_print_help(my_long_options);
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| set_run_mode(run_mode_t mode)
 | |
| {
 | |
| 	if (opt_mode != RUN_MODE_NONE) {
 | |
| 		msg("%s: can't set specify both -c and -x.", my_progname);
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	opt_mode = mode;
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static
 | |
| my_bool
 | |
| get_one_option(const struct my_option *opt, const char *, const char *)
 | |
| {
 | |
| 	switch (opt->id) {
 | |
| 	case 'c':
 | |
| 		if (set_run_mode(RUN_MODE_CREATE)) {
 | |
| 			return TRUE;
 | |
| 		}
 | |
| 		break;
 | |
| 	case 'x':
 | |
| 		if (set_run_mode(RUN_MODE_EXTRACT)) {
 | |
| 			return TRUE;
 | |
| 		}
 | |
| 		break;
 | |
| 	case '?':
 | |
| 		usage();
 | |
| 		exit(0);
 | |
| 	}
 | |
| 
 | |
| 	return FALSE;
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| stream_one_file(File file, xb_wstream_file_t *xbfile)
 | |
| {
 | |
| 	uchar	*buf;
 | |
| 	ssize_t	bytes;
 | |
| 	my_off_t	offset;
 | |
| 
 | |
| 	posix_fadvise(file, 0, 0, POSIX_FADV_SEQUENTIAL);
 | |
| 	offset = my_tell(file, MYF(MY_WME));
 | |
| 
 | |
| 	buf = (uchar*)(my_malloc(PSI_NOT_INSTRUMENTED, XBSTREAM_BUFFER_SIZE, MYF(MY_FAE)));
 | |
| 
 | |
| 	while ((bytes = (ssize_t)my_read(file, buf, XBSTREAM_BUFFER_SIZE,
 | |
| 				MYF(MY_WME))) > 0) {
 | |
| 		if (xb_stream_write_data(xbfile, buf, bytes)) {
 | |
| 			msg("%s: xb_stream_write_data() failed.",
 | |
| 			    my_progname);
 | |
| 			my_free(buf);
 | |
| 			return 1;
 | |
| 		}
 | |
| 		posix_fadvise(file, offset, XBSTREAM_BUFFER_SIZE,
 | |
| 			      POSIX_FADV_DONTNEED);
 | |
| 		offset += XBSTREAM_BUFFER_SIZE;
 | |
| 
 | |
| 	}
 | |
| 
 | |
| 	my_free(buf);
 | |
| 
 | |
| 	if (bytes < 0) {
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static
 | |
| int
 | |
| mode_create(int argc, char **argv)
 | |
| {
 | |
| 	int		i;
 | |
| 	MY_STAT		mystat;
 | |
| 	xb_wstream_t	*stream;
 | |
| 
 | |
| 	if (argc < 1) {
 | |
| 		msg("%s: no files are specified.", my_progname);
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	stream = xb_stream_write_new(nullptr, nullptr);
 | |
| 	if (stream == NULL) {
 | |
| 		msg("%s: xb_stream_write_new() failed.", my_progname);
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	for (i = 0; i < argc; i++) {
 | |
| 		char			*filepath = argv[i];
 | |
| 		File			src_file;
 | |
| 		xb_wstream_file_t	*file;
 | |
| 
 | |
| 		if (my_stat(filepath, &mystat, MYF(MY_WME)) == NULL) {
 | |
| 			goto err;
 | |
| 		}
 | |
| 		if (!MY_S_ISREG(mystat.st_mode)) {
 | |
| 			msg("%s: %s is not a regular file, exiting.",
 | |
| 			    my_progname, filepath);
 | |
| 			goto err;
 | |
| 		}
 | |
| 
 | |
| 		if ((src_file = my_open(filepath, O_RDONLY, MYF(MY_WME))) < 0) {
 | |
| 			msg("%s: failed to open %s.", my_progname, filepath);
 | |
| 			goto err;
 | |
| 		}
 | |
| 
 | |
| 		file = xb_stream_write_open(stream, filepath, &mystat, false);
 | |
| 		if (file == NULL) {
 | |
| 			goto err;
 | |
| 		}
 | |
| 
 | |
| 		if (opt_verbose) {
 | |
| 			msg("%s", filepath);
 | |
| 		}
 | |
| 
 | |
| 		if (stream_one_file(src_file, file) ||
 | |
| 		    xb_stream_write_close(file) ||
 | |
| 		    my_close(src_file, MYF(MY_WME))) {
 | |
| 			goto err;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	xb_stream_write_done(stream);
 | |
| 
 | |
| 	return 0;
 | |
| err:
 | |
| 	xb_stream_write_done(stream);
 | |
| 
 | |
| 	return 1;
 | |
| }
 | |
| 
 | |
| static
 | |
| file_entry_t *
 | |
| file_entry_new(extract_ctxt_t *ctxt, const char *path, uint pathlen,
 | |
| 	uchar chunk_flags)
 | |
| {
 | |
| 	file_entry_t	*entry;
 | |
| 	ds_file_t	*file;
 | |
| 
 | |
| 	entry = (file_entry_t *) my_malloc(PSI_NOT_INSTRUMENTED, sizeof(file_entry_t),
 | |
| 					   MYF(MY_WME | MY_ZEROFILL));
 | |
| 	if (entry == NULL) {
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	entry->path = my_strndup(PSI_NOT_INSTRUMENTED, path, pathlen, MYF(MY_WME));
 | |
| 	if (entry->path == NULL) {
 | |
| 		goto err;
 | |
| 	}
 | |
| 	entry->pathlen = pathlen;
 | |
| 
 | |
| 	file = ds_open(ctxt->ds_ctxt, path, NULL,
 | |
| 		chunk_flags == XB_STREAM_FLAG_REWRITE);
 | |
| 
 | |
| 	if (file == NULL) {
 | |
| 		msg("%s: failed to create file.", my_progname);
 | |
| 		goto err;
 | |
| 	}
 | |
| 
 | |
| 	if (opt_verbose) {
 | |
| 		msg("%s", entry->path);
 | |
| 	}
 | |
| 
 | |
| 	entry->file = file;
 | |
| 
 | |
| 	pthread_mutex_init(&entry->mutex, NULL);
 | |
| 
 | |
| 	return entry;
 | |
| 
 | |
| err:
 | |
| 	if (entry->path != NULL) {
 | |
| 		my_free(entry->path);
 | |
| 	}
 | |
| 	my_free(entry);
 | |
| 
 | |
| 	return NULL;
 | |
| }
 | |
| 
 | |
| static
 | |
| const uchar *
 | |
| get_file_entry_key(const void *entry_, size_t *length, my_bool)
 | |
| {
 | |
|   const file_entry_t *entry= static_cast<const file_entry_t *>(entry_);
 | |
|   *length= entry->pathlen;
 | |
|   return reinterpret_cast<const uchar *>(entry->path);
 | |
| }
 | |
| 
 | |
| static
 | |
| void
 | |
| file_entry_free(void *entry_)
 | |
| {
 | |
|   file_entry_t *entry= static_cast<file_entry_t *>(entry_);
 | |
|   pthread_mutex_destroy(&entry->mutex);
 | |
|   ds_close(entry->file);
 | |
|   my_free(entry->path);
 | |
|   my_free(entry);
 | |
| }
 | |
| 
 | |
| static
 | |
| void *
 | |
| extract_worker_thread_func(void *arg)
 | |
| {
 | |
| 	xb_rstream_chunk_t	chunk;
 | |
| 	file_entry_t		*entry;
 | |
| 	xb_rstream_result_t	res;
 | |
| 
 | |
| 	extract_ctxt_t *ctxt = (extract_ctxt_t *) arg;
 | |
| 
 | |
| 	my_thread_init();
 | |
| 
 | |
| 	memset(&chunk, 0, sizeof(chunk));
 | |
| 
 | |
| 	while (1) {
 | |
| 
 | |
| 		pthread_mutex_lock(ctxt->mutex);
 | |
| 		res = xb_stream_read_chunk(ctxt->stream, &chunk);
 | |
| 
 | |
| 		if (res != XB_STREAM_READ_CHUNK) {
 | |
| 			pthread_mutex_unlock(ctxt->mutex);
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		/* If unknown type and ignorable flag is set, skip this chunk */
 | |
| 		if (chunk.type == XB_CHUNK_TYPE_UNKNOWN && \
 | |
| 		    !(chunk.flags & XB_STREAM_FLAG_IGNORABLE)) {
 | |
| 			pthread_mutex_unlock(ctxt->mutex);
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		/* See if we already have this file open */
 | |
| 		entry = (file_entry_t *) my_hash_search(ctxt->filehash,
 | |
| 							(uchar *) chunk.path,
 | |
| 							chunk.pathlen);
 | |
| 
 | |
| 		if (entry && (chunk.type == XB_CHUNK_TYPE_REMOVE ||
 | |
| 			chunk.type == XB_CHUNK_TYPE_RENAME)) {
 | |
| 			msg("%s: rename and remove chunks can not be applied to opened file: %s",
 | |
| 				my_progname, chunk.path);
 | |
| 			pthread_mutex_unlock(ctxt->mutex);
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		if (chunk.type == XB_CHUNK_TYPE_REMOVE) {
 | |
| 			if (ds_remove(ctxt->ds_ctxt, chunk.path)) {
 | |
| 				msg("%s: error on file removing: %s", my_progname, chunk.path);
 | |
| 				pthread_mutex_unlock(ctxt->mutex);
 | |
| 				res = XB_STREAM_READ_ERROR;
 | |
| 				break;
 | |
| 			}
 | |
| 			pthread_mutex_unlock(ctxt->mutex);
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		if (chunk.type == XB_CHUNK_TYPE_RENAME) {
 | |
| 			if (my_hash_search(ctxt->filehash,
 | |
| 				reinterpret_cast<const uchar *>(chunk.data), chunk.length)) {
 | |
| 				msg("%s: rename chunks can not be applied to opened file: %s",
 | |
| 					my_progname, reinterpret_cast<const uchar *>(chunk.data));
 | |
| 				pthread_mutex_unlock(ctxt->mutex);
 | |
| 				break;
 | |
| 			}
 | |
| 			if (ds_rename(ctxt->ds_ctxt, chunk.path,
 | |
| 				reinterpret_cast<const char *>(chunk.data))) {
 | |
| 				msg("%s: error on file renaming: %s to %s", my_progname,
 | |
| 					reinterpret_cast<const char *>(chunk.data), chunk.path);
 | |
| 				pthread_mutex_unlock(ctxt->mutex);
 | |
| 				res = XB_STREAM_READ_ERROR;
 | |
| 				break;
 | |
| 			}
 | |
| 			pthread_mutex_unlock(ctxt->mutex);
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		if (entry == NULL) {
 | |
| 			entry = file_entry_new(ctxt,
 | |
| 					       chunk.path,
 | |
| 					       chunk.pathlen,
 | |
| 								 chunk.flags);
 | |
| 			if (entry == NULL) {
 | |
| 				pthread_mutex_unlock(ctxt->mutex);
 | |
| 				break;
 | |
| 			}
 | |
| 			if (my_hash_insert(ctxt->filehash, (uchar *) entry)) {
 | |
| 				msg("%s: my_hash_insert() failed.",
 | |
| 				    my_progname);
 | |
| 				pthread_mutex_unlock(ctxt->mutex);
 | |
| 				break;
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		pthread_mutex_lock(&entry->mutex);
 | |
| 
 | |
| 		pthread_mutex_unlock(ctxt->mutex);
 | |
| 
 | |
| 		if (chunk.type == XB_CHUNK_TYPE_SEEK) {
 | |
| 			if (ds_seek_set(entry->file, chunk.offset)) {
 | |
| 				msg("%s: my_seek() failed.", my_progname);
 | |
| 				pthread_mutex_unlock(&entry->mutex);
 | |
| 				res = XB_STREAM_READ_ERROR;
 | |
| 				break;
 | |
| 			}
 | |
| 			entry->offset = chunk.offset;
 | |
| 			pthread_mutex_unlock(&entry->mutex);
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		res = xb_stream_validate_checksum(&chunk);
 | |
| 
 | |
| 		if (res != XB_STREAM_READ_CHUNK) {
 | |
| 			pthread_mutex_unlock(&entry->mutex);
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		if (chunk.type == XB_CHUNK_TYPE_EOF) {
 | |
| 			pthread_mutex_unlock(&entry->mutex);
 | |
| 			pthread_mutex_lock(ctxt->mutex);
 | |
| 			my_hash_delete(ctxt->filehash, (uchar *) entry);
 | |
| 			pthread_mutex_unlock(ctxt->mutex);
 | |
| 
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		if (entry->offset != chunk.offset) {
 | |
| 			msg("%s: out-of-order chunk: real offset = 0x%llx, "
 | |
| 			    "expected offset = 0x%llx", my_progname,
 | |
| 			    chunk.offset, entry->offset);
 | |
| 			pthread_mutex_unlock(&entry->mutex);
 | |
| 			res = XB_STREAM_READ_ERROR;
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		if (ds_write(entry->file, chunk.data, chunk.length)) {
 | |
| 			msg("%s: my_write() failed.", my_progname);
 | |
| 			pthread_mutex_unlock(&entry->mutex);
 | |
| 			res = XB_STREAM_READ_ERROR;
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		entry->offset += chunk.length;
 | |
| 
 | |
| 		pthread_mutex_unlock(&entry->mutex);
 | |
| 	}
 | |
| 
 | |
| 	if (chunk.data)
 | |
| 		my_free(chunk.data);
 | |
| 
 | |
| 	my_thread_end();
 | |
| 
 | |
| 	return (void *)(res);
 | |
| }
 | |
| 
 | |
| 
 | |
| static
 | |
| int
 | |
| mode_extract(int n_threads, int argc __attribute__((unused)),
 | |
| 	     char **argv __attribute__((unused)))
 | |
| {
 | |
| 	xb_rstream_t		*stream = NULL;
 | |
| 	HASH			filehash;
 | |
| 	ds_ctxt_t		*ds_ctxt = NULL;
 | |
| 	extract_ctxt_t		ctxt;
 | |
| 	int			i;
 | |
| 	pthread_t		*tids = NULL;
 | |
| 	void			**retvals = NULL;
 | |
| 	pthread_mutex_t		mutex;
 | |
| 	int			ret = 0;
 | |
| 
 | |
|   if (my_hash_init(PSI_NOT_INSTRUMENTED, &filehash, &my_charset_bin,
 | |
|                    START_FILE_HASH_SIZE, 0, 0, get_file_entry_key,
 | |
|                    file_entry_free, MYF(0)))
 | |
|   {
 | |
|     msg("%s: failed to initialize file hash.", my_progname);
 | |
|     return 1;
 | |
|   }
 | |
| 
 | |
|   if (pthread_mutex_init(&mutex, NULL)) {
 | |
| 		msg("%s: failed to initialize mutex.", my_progname);
 | |
| 		my_hash_free(&filehash);
 | |
| 		return 1;
 | |
| 	}
 | |
| 
 | |
| 	/* If --directory is specified, it is already set as CWD by now. */
 | |
| 	ds_ctxt = ds_create(".", DS_TYPE_LOCAL);
 | |
| 	if (ds_ctxt == NULL) {
 | |
| 		ret = 1;
 | |
| 		goto exit;
 | |
| 	}
 | |
| 
 | |
| 
 | |
| 	stream = xb_stream_read_new();
 | |
| 	if (stream == NULL) {
 | |
| 		msg("%s: xb_stream_read_new() failed.", my_progname);
 | |
| 		pthread_mutex_destroy(&mutex);
 | |
| 		ret = 1;
 | |
| 		goto exit;
 | |
| 	}
 | |
| 
 | |
| 	ctxt.stream = stream;
 | |
| 	ctxt.filehash = &filehash;
 | |
| 	ctxt.ds_ctxt = ds_ctxt;
 | |
| 	ctxt.mutex = &mutex;
 | |
| 
 | |
| 	tids = (pthread_t *)calloc(n_threads, sizeof(pthread_t));
 | |
| 	retvals = (void **)calloc(n_threads, sizeof(void*));
 | |
| 
 | |
| 	for (i = 0; i < n_threads; i++)
 | |
| 		pthread_create(tids + i, NULL, extract_worker_thread_func,
 | |
| 			       &ctxt);
 | |
| 
 | |
| 	for (i = 0; i < n_threads; i++)
 | |
| 		pthread_join(tids[i], retvals + i);
 | |
| 
 | |
| 	for (i = 0; i < n_threads; i++) {
 | |
| 		if ((size_t)retvals[i] == XB_STREAM_READ_ERROR) {
 | |
| 			ret = 1;
 | |
| 			goto exit;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| exit:
 | |
| 	pthread_mutex_destroy(&mutex);
 | |
| 
 | |
| 	free(tids);
 | |
| 	free(retvals);
 | |
| 
 | |
| 	my_hash_free(&filehash);
 | |
| 	if (ds_ctxt != NULL) {
 | |
| 		ds_destroy(ds_ctxt);
 | |
| 	}
 | |
| 	xb_stream_read_done(stream);
 | |
| 
 | |
| 	return ret;
 | |
| }
 | 
