mariadb/storage/maria/ma_sort.c
unknown 2f6f08ed88 Added MARIA_SHARE *share to a lot of places to make code simpler
Changed info->s -> share to get more efficent code
Updated arguments to page accessor functions to use MARIA_SHARE * instead of MARIA_HA *.
Tested running tests in quick mode (no balance page on insert and only when critical on delete)
Fixed bug in underflow handling in quick mode
Fixed bug in log handler where it accessed not initialized variable
Fixed bug in log handler where it didn't free mutex in unlikely error condition
Removed double write of page in case of of some underflow conditions
Added DBUG_PRINT in safemutex lock/unlock


dbug/dbug.c:
  Compile without SAFE_MUTEX (to be able to use DBUG_PRINT in safe_mutex code)
  Use calls to get/set my_thread_var->dbug. (Make dbug independent of compile time options for mysys)
include/my_pthread.h:
  Added prototypes for my_thread_var_get_dbug() & my_thread_var_set_dbug()
mysql-test/lib/mtr_report.pl:
  Don't check warnings in log files if we are using --extern
mysys/my_thr_init.c:
  Added my_thread_var_get_dbug() & my_thread_var_set_dbug()
mysys/thr_mutex.c:
  Added DBUG printing of addresses to mutex for lock/unlock
storage/maria/ma_blockrec.c:
  Fixed comment
storage/maria/ma_check.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/ma_close.c:
  Indentation fixes
storage/maria/ma_create.c:
  Calculate min_key_length correctly
storage/maria/ma_dbug.c:
  Indentation fixes
storage/maria/ma_delete.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
  Removed some writing of key pages that underflow (will be written by caller)
  Fixed crashing bug in underflow handling when using quick mode
storage/maria/ma_delete_all.c:
  Indentation fixes
storage/maria/ma_dynrec.c:
  Indentation fixes
storage/maria/ma_extra.c:
  Fixed indentation
  Removed old useless code
  Reset share->changed if we have written state
storage/maria/ma_ft_update.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/ma_info.c:
  Indentation fixes
storage/maria/ma_key_recover.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/ma_locking.c:
  Indentation fixes
storage/maria/ma_loghandler.c:
  Removed wrapper functions translog_mutex_lock and translog_mutex_unlock (safemutex now does same kind of printing)
  Renamed LOGREC_REDO_INSERT_ROW_BLOB to LOGREC_REDO_INSERT_NOT_USED to mark it free
  Fixed some DBUG_PRINT to ensure that convert-dbug-for-diff works
  Fixed bug in translog_flush() that caused log to stop syncing to disk
  Added missing mutex_unlock in case of error
storage/maria/ma_loghandler.h:
  Renamed LOGREC_REDO_INSERT_ROW_BLOB to LOGREC_REDO_INSERT_NOT_USED to mark it free
storage/maria/ma_open.c:
  Indentation fixes
storage/maria/ma_packrec.c:
  Indentation fixes
storage/maria/ma_page.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
  Added check that we never write a key page without content (except in recovery where a key page may temporary be without content)
storage/maria/ma_preload.c:
  Updated arguments to page accessor functions
storage/maria/ma_range.c:
  Updated arguments to page accessor functions
storage/maria/ma_rkey.c:
  Indentation fixes
storage/maria/ma_rprev.c:
  Indentation fixes
storage/maria/ma_rt_index.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/ma_rt_index.h:
  Updated arguments to page accessor functions
storage/maria/ma_rt_key.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/ma_rt_mbr.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/ma_rt_split.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/ma_search.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/ma_sort.c:
  Indentation fixes
storage/maria/ma_statrec.c:
  Indentation fixes
storage/maria/ma_test1.c:
  Added extra undo test
  Flush also keys in -u1, to ensure that the full log is flushed
storage/maria/ma_test2.c:
  Added extra undo test
  Flush also keys in -u1, to ensure that the full log is flushed
storage/maria/ma_test_recovery.expected:
  Updated results
storage/maria/ma_test_recovery:
  Added extra undo test
storage/maria/ma_update.c:
  Indentation fixes
storage/maria/ma_write.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
  Prepare for quick mode for insert (don't balance page)
storage/maria/maria_chk.c:
  Added MARIA_SHARE *share to a lot of places to make code simpler
  info->s -> share
  Updated arguments to page accessor functions
storage/maria/maria_def.h:
  Updated arguments to page accessor functions
2007-12-10 02:32:00 +02:00

1059 lines
32 KiB
C

/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
Creates a index for a database by reading keys, sorting them and outputing
them in sorted order through MARIA_SORT_INFO functions.
*/
#include "ma_fulltext.h"
#if defined(MSDOS) || defined(__WIN__)
#include <fcntl.h>
#else
#include <stddef.h>
#endif
#include <queues.h>
/* static variables */
#undef MIN_SORT_MEMORY
#undef MYF_RW
#undef DISK_BUFFER_SIZE
#define MERGEBUFF 15
#define MERGEBUFF2 31
#define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
#define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
#define DISK_BUFFER_SIZE (IO_SIZE*16)
/*
Pointers of functions for store and read keys from temp file
*/
extern void print_error _VARARGS((const char *fmt,...));
/* Functions defined in this file */
static ha_rows find_all_keys(MARIA_SORT_PARAM *info,uint keys,
uchar **sort_keys,
DYNAMIC_ARRAY *buffpek,int *maxbuffer,
IO_CACHE *tempfile,
IO_CACHE *tempfile_for_exceptions);
static int write_keys(MARIA_SORT_PARAM *info, uchar **sort_keys,
uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
static int write_key(MARIA_SORT_PARAM *info, uchar *key,
IO_CACHE *tempfile);
static int write_index(MARIA_SORT_PARAM *info, uchar **sort_keys,
uint count);
static int merge_many_buff(MARIA_SORT_PARAM *info,uint keys,
uchar **sort_keys,
BUFFPEK *buffpek,int *maxbuffer,
IO_CACHE *t_file);
static uint read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
uint sort_length);
static int merge_buffers(MARIA_SORT_PARAM *info,uint keys,
IO_CACHE *from_file, IO_CACHE *to_file,
uchar **sort_keys, BUFFPEK *lastbuff,
BUFFPEK *Fb, BUFFPEK *Tb);
static int merge_index(MARIA_SORT_PARAM *,uint, uchar **,BUFFPEK *, int,
IO_CACHE *);
static int flush_maria_ft_buf(MARIA_SORT_PARAM *info);
static int write_keys_varlen(MARIA_SORT_PARAM *info, uchar **sort_keys,
uint count, BUFFPEK *buffpek,
IO_CACHE *tempfile);
static uint read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
uint sort_length);
static int write_merge_key(MARIA_SORT_PARAM *info, IO_CACHE *to_file,
uchar *key, uint sort_length, uint count);
static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
IO_CACHE *to_file, uchar *key,
uint sort_length, uint count);
static inline int
my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
/*
Creates a index of sorted keys
SYNOPSIS
_ma_create_index_by_sort()
info Sort parameters
no_messages Set to 1 if no output
sortbuff_size Size of sortbuffer to allocate
RESULT
0 ok
<> 0 Error
*/
int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
size_t sortbuff_size)
{
int error,maxbuffer,skr;
size_t memavl,old_memavl;
uint keys,sort_length;
DYNAMIC_ARRAY buffpek;
ha_rows records;
uchar **sort_keys;
IO_CACHE tempfile, tempfile_for_exceptions;
DBUG_ENTER("_ma_create_index_by_sort");
DBUG_PRINT("enter",("sort_buff_size: %lu sort_length: %d max_records: %lu",
(ulong) sortbuff_size, info->key_length,
(ulong) info->sort_info->max_records));
if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
{
info->write_keys= write_keys_varlen;
info->read_to_buffer=read_to_buffer_varlen;
info->write_key=write_merge_key_varlen;
}
else
{
info->write_keys= write_keys;
info->read_to_buffer=read_to_buffer;
info->write_key=write_merge_key;
}
my_b_clear(&tempfile);
my_b_clear(&tempfile_for_exceptions);
bzero((char*) &buffpek,sizeof(buffpek));
sort_keys= (uchar **) NULL; error= 1;
maxbuffer=1;
memavl=max(sortbuff_size,MIN_SORT_MEMORY);
records= info->sort_info->max_records;
sort_length= info->key_length;
LINT_INIT(keys);
while (memavl >= MIN_SORT_MEMORY)
{
if ((records < UINT_MAX32) &&
((my_off_t) (records + 1) *
(sort_length + sizeof(char*)) <= (my_off_t) memavl))
keys= (uint)records+1;
else
do
{
skr=maxbuffer;
if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
(keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
(sort_length+sizeof(char*))) <= 1 ||
keys < (uint) maxbuffer)
{
_ma_check_print_error(info->sort_info->param,
"maria_sort_buffer_size is too small");
goto err;
}
}
while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
if ((sort_keys=(uchar**) my_malloc(keys*(sort_length+sizeof(char*))+
HA_FT_MAXBYTELEN, MYF(0))))
{
if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
maxbuffer/2))
{
my_free((uchar*) sort_keys,MYF(0));
sort_keys= 0;
}
else
break;
}
old_memavl=memavl;
if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
memavl=MIN_SORT_MEMORY;
}
if (memavl < MIN_SORT_MEMORY)
{
_ma_check_print_error(info->sort_info->param, "Maria sort buffer"
" too small"); /* purecov: tested */
goto err; /* purecov: tested */
}
(*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
if (!no_messages)
printf(" - Searching for keys, allocating buffer for %d keys\n",keys);
if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
&tempfile,&tempfile_for_exceptions))
== HA_POS_ERROR)
goto err; /* purecov: tested */
if (maxbuffer == 0)
{
if (!no_messages)
printf(" - Dumping %lu keys\n", (ulong) records);
if (write_index(info,sort_keys, (uint) records))
goto err; /* purecov: inspected */
}
else
{
keys=(keys*(sort_length+sizeof(char*)))/sort_length;
if (maxbuffer >= MERGEBUFF2)
{
if (!no_messages)
printf(" - Merging %lu keys\n", (ulong) records); /* purecov: tested */
if (merge_many_buff(info,keys,sort_keys,
dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
goto err; /* purecov: inspected */
}
if (flush_io_cache(&tempfile) ||
reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
goto err; /* purecov: inspected */
if (!no_messages)
printf(" - Last merge and dumping keys\n"); /* purecov: tested */
if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
maxbuffer,&tempfile))
goto err; /* purecov: inspected */
}
if (flush_maria_ft_buf(info) || _ma_flush_pending_blocks(info))
goto err;
if (my_b_inited(&tempfile_for_exceptions))
{
MARIA_HA *idx=info->sort_info->info;
uint keyno=info->key;
uint key_length, ref_length=idx->s->rec_reflength;
if (!no_messages)
printf(" - Adding exceptions\n"); /* purecov: tested */
if (flush_io_cache(&tempfile_for_exceptions) ||
reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
goto err;
while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
sizeof(key_length))
&& !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
(uint) key_length))
{
if (_ma_ck_write(idx,keyno,(uchar*) sort_keys,key_length-ref_length))
goto err;
}
}
error =0;
err:
if (sort_keys)
my_free((uchar*) sort_keys,MYF(0));
delete_dynamic(&buffpek);
close_cached_file(&tempfile);
close_cached_file(&tempfile_for_exceptions);
DBUG_RETURN(error ? -1 : 0);
} /* _ma_create_index_by_sort */
/* Search after all keys and place them in a temp. file */
static ha_rows find_all_keys(MARIA_SORT_PARAM *info, uint keys,
uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
int *maxbuffer, IO_CACHE *tempfile,
IO_CACHE *tempfile_for_exceptions)
{
int error;
uint idx;
DBUG_ENTER("find_all_keys");
idx=error=0;
sort_keys[0]= (uchar*) (sort_keys+keys);
while (!(error=(*info->key_read)(info,sort_keys[idx])))
{
if (info->real_key_length > info->key_length)
{
if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
continue;
}
if (++idx == keys)
{
if (info->write_keys(info,sort_keys,idx-1,
(BUFFPEK *)alloc_dynamic(buffpek),
tempfile))
DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
sort_keys[0]=(uchar*) (sort_keys+keys);
memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
idx=1;
}
sort_keys[idx]=sort_keys[idx-1]+info->key_length;
}
if (error > 0)
DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */
if (buffpek->elements)
{
if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
tempfile))
DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
*maxbuffer=buffpek->elements-1;
}
else
*maxbuffer=0;
DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
} /* find_all_keys */
#ifdef THREAD
/* Search after all keys and place them in a temp. file */
pthread_handler_t _ma_thr_find_all_keys(void *arg)
{
MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg;
int error;
size_t memavl,old_memavl;
uint sort_length;
ulong idx, maxbuffer, keys;
uchar **sort_keys=0;
LINT_INIT(keys);
error=1;
if (my_thread_init())
goto err;
{ /* Add extra block since DBUG_ENTER declare variables */
DBUG_ENTER("_ma_thr_find_all_keys");
DBUG_PRINT("enter", ("master: %d", sort_param->master));
if (sort_param->sort_info->got_error)
goto err;
if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
{
sort_param->write_keys= write_keys_varlen;
sort_param->read_to_buffer= read_to_buffer_varlen;
sort_param->write_key= write_merge_key_varlen;
}
else
{
sort_param->write_keys= write_keys;
sort_param->read_to_buffer= read_to_buffer;
sort_param->write_key= write_merge_key;
}
my_b_clear(&sort_param->tempfile);
my_b_clear(&sort_param->tempfile_for_exceptions);
bzero((char*) &sort_param->buffpek,sizeof(sort_param->buffpek));
bzero((char*) &sort_param->unique, sizeof(sort_param->unique));
memavl= max(sort_param->sortbuff_size, MIN_SORT_MEMORY);
idx= (uint)sort_param->sort_info->max_records;
sort_length= sort_param->key_length;
maxbuffer= 1;
while (memavl >= MIN_SORT_MEMORY)
{
if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= (my_off_t) memavl)
keys= idx+1;
else
{
ulong skr;
do
{
skr= maxbuffer;
if (memavl < sizeof(BUFFPEK)*maxbuffer ||
(keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
(sort_length+sizeof(char*))) <= 1 ||
keys < maxbuffer)
{
_ma_check_print_error(sort_param->sort_info->param,
"maria_sort_buffer_size is too small");
goto err;
}
}
while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
}
if ((sort_keys= (uchar **)
my_malloc(keys*(sort_length+sizeof(char*))+
((sort_param->keyinfo->flag & HA_FULLTEXT) ?
HA_FT_MAXBYTELEN : 0), MYF(0))))
{
if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
maxbuffer, maxbuffer/2))
{
my_free((uchar*) sort_keys,MYF(0));
sort_keys= (uchar **) NULL; /* for err: label */
}
else
break;
}
old_memavl= memavl;
if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
old_memavl > MIN_SORT_MEMORY)
memavl= MIN_SORT_MEMORY;
}
if (memavl < MIN_SORT_MEMORY)
{
_ma_check_print_error(sort_param->sort_info->param,
"Maria sort buffer too small");
goto err; /* purecov: tested */
}
if (sort_param->sort_info->param->testflag & T_VERBOSE)
printf("Key %d - Allocating buffer for %lu keys\n",
sort_param->key+1, (ulong) keys);
sort_param->sort_keys= sort_keys;
idx= error= 0;
sort_keys[0]= (uchar*) (sort_keys+keys);
DBUG_PRINT("info", ("reading keys"));
while (!(error= sort_param->sort_info->got_error) &&
!(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
{
if (sort_param->real_key_length > sort_param->key_length)
{
if (write_key(sort_param,sort_keys[idx],
&sort_param->tempfile_for_exceptions))
goto err;
continue;
}
if (++idx == keys)
{
if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
(BUFFPEK *)alloc_dynamic(&sort_param->
buffpek),
&sort_param->tempfile))
goto err;
sort_keys[0]= (uchar*) (sort_keys+keys);
memcpy(sort_keys[0], sort_keys[idx - 1],
(size_t) sort_param->key_length);
idx= 1;
}
sort_keys[idx]=sort_keys[idx - 1] + sort_param->key_length;
}
if (error > 0)
goto err;
if (sort_param->buffpek.elements)
{
if (sort_param->write_keys(sort_param,sort_keys, idx,
(BUFFPEK *) alloc_dynamic(&sort_param->
buffpek),
&sort_param->tempfile))
goto err;
sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
}
else
sort_param->keys= idx;
sort_param->sort_keys_length= keys;
goto ok;
err:
DBUG_PRINT("error", ("got some error"));
sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
my_free((uchar*) sort_keys,MYF(MY_ALLOW_ZERO_PTR));
sort_param->sort_keys=0;
delete_dynamic(& sort_param->buffpek);
close_cached_file(&sort_param->tempfile);
close_cached_file(&sort_param->tempfile_for_exceptions);
ok:
free_root(&sort_param->wordroot, MYF(0));
/*
Detach from the share if the writer is involved. Avoid others to
be blocked. This includes a flush of the write buffer. This will
also indicate EOF to the readers.
*/
if (sort_param->sort_info->info->rec_cache.share)
remove_io_thread(&sort_param->sort_info->info->rec_cache);
/* Readers detach from the share if any. Avoid others to be blocked. */
if (sort_param->read_cache.share)
remove_io_thread(&sort_param->read_cache);
pthread_mutex_lock(&sort_param->sort_info->mutex);
if (!--sort_param->sort_info->threads_running)
pthread_cond_signal(&sort_param->sort_info->cond);
pthread_mutex_unlock(&sort_param->sort_info->mutex);
DBUG_PRINT("exit", ("======== ending thread ========"));
}
my_thread_end();
return NULL;
}
int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
{
MARIA_SORT_INFO *sort_info=sort_param->sort_info;
HA_CHECK *param=sort_info->param;
ulong length, keys;
double *rec_per_key_part= param->new_rec_per_key_part;
int got_error=sort_info->got_error;
uint i;
MARIA_HA *info=sort_info->info;
MARIA_SHARE *share= info->s;
MARIA_SORT_PARAM *sinfo;
uchar *mergebuf=0;
DBUG_ENTER("_ma_thr_write_keys");
LINT_INIT(length);
for (i= 0, sinfo= sort_param ;
i < sort_info->total_keys ;
i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
{
if (!sinfo->sort_keys)
{
got_error=1;
my_free(sinfo->rec_buff, MYF(MY_ALLOW_ZERO_PTR));
continue;
}
if (!got_error)
{
maria_set_key_active(share->state.key_map, sinfo->key);
if (!sinfo->buffpek.elements)
{
if (param->testflag & T_VERBOSE)
{
printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
fflush(stdout);
}
if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
flush_maria_ft_buf(sinfo) || _ma_flush_pending_blocks(sinfo))
got_error=1;
}
if (!got_error && param->testflag & T_STATISTICS)
maria_update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
sinfo->notnull: NULL,
(ulonglong) info->state->records);
}
my_free((uchar*) sinfo->sort_keys,MYF(0));
my_free(sinfo->rec_buff, MYF(MY_ALLOW_ZERO_PTR));
sinfo->sort_keys=0;
}
for (i= 0, sinfo= sort_param ;
i < sort_info->total_keys ;
i++,
delete_dynamic(&sinfo->buffpek),
close_cached_file(&sinfo->tempfile),
close_cached_file(&sinfo->tempfile_for_exceptions),
sinfo++)
{
if (got_error)
continue;
if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
{
sinfo->write_keys=write_keys_varlen;
sinfo->read_to_buffer=read_to_buffer_varlen;
sinfo->write_key=write_merge_key_varlen;
}
else
{
sinfo->write_keys=write_keys;
sinfo->read_to_buffer=read_to_buffer;
sinfo->write_key=write_merge_key;
}
if (sinfo->buffpek.elements)
{
uint maxbuffer=sinfo->buffpek.elements-1;
if (!mergebuf)
{
length=param->sort_buffer_length;
while (length >= MIN_SORT_MEMORY && !mergebuf)
{
mergebuf=my_malloc(length, MYF(0));
length=length*3/4;
}
if (!mergebuf)
{
got_error=1;
continue;
}
}
keys=length/sinfo->key_length;
if (maxbuffer >= MERGEBUFF2)
{
if (param->testflag & T_VERBOSE)
printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
if (merge_many_buff(sinfo, keys, (uchar **) mergebuf,
dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
(int*) &maxbuffer, &sinfo->tempfile))
{
got_error=1;
continue;
}
}
if (flush_io_cache(&sinfo->tempfile) ||
reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
{
got_error=1;
continue;
}
if (param->testflag & T_VERBOSE)
printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
if (merge_index(sinfo, keys, (uchar**) mergebuf,
dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
maxbuffer,&sinfo->tempfile) ||
flush_maria_ft_buf(sinfo) ||
_ma_flush_pending_blocks(sinfo))
{
got_error=1;
continue;
}
}
if (my_b_inited(&sinfo->tempfile_for_exceptions))
{
uint key_length;
if (param->testflag & T_VERBOSE)
printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
{
got_error=1;
continue;
}
while (!got_error &&
!my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
sizeof(key_length)))
{
uchar maria_ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
if (key_length > sizeof(maria_ft_buf) ||
my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)maria_ft_buf,
(uint)key_length) ||
_ma_ck_write(info, sinfo->key, maria_ft_buf,
key_length - info->s->rec_reflength))
got_error=1;
}
}
}
my_free((uchar*) mergebuf,MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(got_error);
}
#endif /* THREAD */
/* Write all keys in memory to file for later merge */
static int write_keys(MARIA_SORT_PARAM *info, register uchar **sort_keys,
uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
{
uchar **end;
uint sort_length=info->key_length;
DBUG_ENTER("write_keys");
qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
info);
if (!my_b_inited(tempfile) &&
open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
DBUG_RETURN(1); /* purecov: inspected */
buffpek->file_pos=my_b_tell(tempfile);
buffpek->count=count;
for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
{
if (my_b_write(tempfile, *sort_keys, (uint) sort_length))
DBUG_RETURN(1); /* purecov: inspected */
}
DBUG_RETURN(0);
} /* write_keys */
static inline int
my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
{
int err;
uint16 len= _ma_keylength(info->keyinfo, bufs);
/* The following is safe as this is a local file */
if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
return (err);
if ((err= my_b_write(to_file,bufs, (uint) len)))
return (err);
return (0);
}
static int write_keys_varlen(MARIA_SORT_PARAM *info,
register uchar **sort_keys,
uint count, BUFFPEK *buffpek,
IO_CACHE *tempfile)
{
uchar **end;
int err;
DBUG_ENTER("write_keys_varlen");
qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
info);
if (!my_b_inited(tempfile) &&
open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
DBUG_RETURN(1); /* purecov: inspected */
buffpek->file_pos=my_b_tell(tempfile);
buffpek->count=count;
for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
{
if ((err= my_var_write(info,tempfile, *sort_keys)))
DBUG_RETURN(err);
}
DBUG_RETURN(0);
} /* write_keys_varlen */
static int write_key(MARIA_SORT_PARAM *info, uchar *key,
IO_CACHE *tempfile)
{
uint key_length=info->real_key_length;
DBUG_ENTER("write_key");
if (!my_b_inited(tempfile) &&
open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
DBUG_RETURN(1);
if (my_b_write(tempfile, (uchar*)&key_length,sizeof(key_length)) ||
my_b_write(tempfile, key, (uint) key_length))
DBUG_RETURN(1);
DBUG_RETURN(0);
} /* write_key */
/* Write index */
static int write_index(MARIA_SORT_PARAM *info,
register uchar **sort_keys,
register uint count)
{
DBUG_ENTER("write_index");
qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
(qsort2_cmp) info->key_cmp,info);
while (count--)
{
if ((*info->key_write)(info, *sort_keys++))
DBUG_RETURN(-1); /* purecov: inspected */
}
DBUG_RETURN(0);
} /* write_index */
/* Merge buffers to make < MERGEBUFF2 buffers */
static int merge_many_buff(MARIA_SORT_PARAM *info, uint keys,
uchar **sort_keys, BUFFPEK *buffpek,
int *maxbuffer, IO_CACHE *t_file)
{
register int i;
IO_CACHE t_file2, *from_file, *to_file, *temp;
BUFFPEK *lastbuff;
DBUG_ENTER("merge_many_buff");
if (*maxbuffer < MERGEBUFF2)
DBUG_RETURN(0); /* purecov: inspected */
if (flush_io_cache(t_file) ||
open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
DBUG_RETURN(1); /* purecov: inspected */
from_file= t_file ; to_file= &t_file2;
while (*maxbuffer >= MERGEBUFF2)
{
reinit_io_cache(from_file,READ_CACHE,0L,0,0);
reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
lastbuff=buffpek;
for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
{
if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
buffpek+i,buffpek+i+MERGEBUFF-1))
goto cleanup;
}
if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
buffpek+i,buffpek+ *maxbuffer))
break; /* purecov: inspected */
if (flush_io_cache(to_file))
break; /* purecov: inspected */
temp=from_file; from_file=to_file; to_file=temp;
*maxbuffer= (int) (lastbuff-buffpek)-1;
}
cleanup:
close_cached_file(to_file); /* This holds old result */
if (to_file == t_file)
*t_file=t_file2; /* Copy result file */
DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
} /* merge_many_buff */
/*
Read data to buffer
SYNOPSIS
read_to_buffer()
fromfile File to read from
buffpek Where to read from
sort_length max length to read
RESULT
> 0 Ammount of bytes read
-1 Error
*/
static uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
uint sort_length)
{
register uint count;
uint length;
if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
{
if (my_pread(fromfile->file,(uchar*) buffpek->base,
(length= sort_length*count),buffpek->file_pos,MYF_RW))
return((uint) -1); /* purecov: inspected */
buffpek->key=buffpek->base;
buffpek->file_pos+= length; /* New filepos */
buffpek->count-= count;
buffpek->mem_count= count;
}
return (count*sort_length);
} /* read_to_buffer */
static uint read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
uint sort_length)
{
register uint count;
uint16 length_of_key = 0;
uint idx;
uchar *buffp;
if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
{
buffp= buffpek->base;
for (idx=1;idx<=count;idx++)
{
if (my_pread(fromfile->file,(uchar*)&length_of_key,sizeof(length_of_key),
buffpek->file_pos,MYF_RW))
return((uint) -1);
buffpek->file_pos+=sizeof(length_of_key);
if (my_pread(fromfile->file,(uchar*) buffp,length_of_key,
buffpek->file_pos,MYF_RW))
return((uint) -1);
buffpek->file_pos+=length_of_key;
buffp = buffp + sort_length;
}
buffpek->key=buffpek->base;
buffpek->count-= count;
buffpek->mem_count= count;
}
return (count*sort_length);
} /* read_to_buffer_varlen */
static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
IO_CACHE *to_file, uchar* key,
uint sort_length, uint count)
{
uint idx;
uchar *bufs = key;
for (idx=1;idx<=count;idx++)
{
int err;
if ((err= my_var_write(info, to_file, bufs)))
return (err);
bufs=bufs+sort_length;
}
return(0);
}
static int write_merge_key(MARIA_SORT_PARAM *info __attribute__((unused)),
IO_CACHE *to_file, uchar *key,
uint sort_length, uint count)
{
return my_b_write(to_file, key, (size_t) sort_length*count);
}
/*
Merge buffers to one buffer
If to_file == 0 then use info->key_write
*/
static int NEAR_F
merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
BUFFPEK *Fb, BUFFPEK *Tb)
{
int error;
uint sort_length,maxcount;
ha_rows count;
my_off_t to_start_filepos;
uchar *strpos;
BUFFPEK *buffpek,**refpek;
QUEUE queue;
volatile int *killed= _ma_killed_ptr(info->sort_info->param);
DBUG_ENTER("merge_buffers");
count=error=0;
maxcount=keys/((uint) (Tb-Fb) +1);
LINT_INIT(to_start_filepos);
if (to_file)
to_start_filepos=my_b_tell(to_file);
strpos= (uchar*) sort_keys;
sort_length=info->key_length;
if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
(int (*)(void*, uchar *,uchar*)) info->key_cmp,
(void*) info))
DBUG_RETURN(1); /* purecov: inspected */
for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
{
count+= buffpek->count;
buffpek->base= strpos;
buffpek->max_keys=maxcount;
strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
sort_length));
if (error == -1)
goto err; /* purecov: inspected */
queue_insert(&queue,(uchar*) buffpek);
}
while (queue.elements > 1)
{
for (;;)
{
if (*killed)
{
error=1; goto err;
}
buffpek=(BUFFPEK*) queue_top(&queue);
if (to_file)
{
if (info->write_key(info,to_file,(uchar*) buffpek->key,
(uint) sort_length,1))
{
error=1; goto err; /* purecov: inspected */
}
}
else
{
if ((*info->key_write)(info,(void*) buffpek->key))
{
error=1; goto err; /* purecov: inspected */
}
}
buffpek->key+=sort_length;
if (! --buffpek->mem_count)
{
if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
{
uchar *base= buffpek->base;
uint max_keys=buffpek->max_keys;
VOID(queue_remove(&queue,0));
/* Put room used by buffer to use in other buffer */
for (refpek= (BUFFPEK**) &queue_top(&queue);
refpek <= (BUFFPEK**) &queue_end(&queue);
refpek++)
{
buffpek= *refpek;
if (buffpek->base+buffpek->max_keys*sort_length == base)
{
buffpek->max_keys+=max_keys;
break;
}
else if (base+max_keys*sort_length == buffpek->base)
{
buffpek->base=base;
buffpek->max_keys+=max_keys;
break;
}
}
break; /* One buffer have been removed */
}
}
else if (error == -1)
goto err; /* purecov: inspected */
queue_replaced(&queue); /* Top element has been replaced */
}
}
buffpek=(BUFFPEK*) queue_top(&queue);
buffpek->base= (uchar*) sort_keys;
buffpek->max_keys=keys;
do
{
if (to_file)
{
if (info->write_key(info,to_file,(uchar*) buffpek->key,
sort_length,buffpek->mem_count))
{
error=1; goto err; /* purecov: inspected */
}
}
else
{
register uchar *end;
strpos= buffpek->key;
for (end= strpos+buffpek->mem_count*sort_length;
strpos != end ;
strpos+=sort_length)
{
if ((*info->key_write)(info, (uchar*) strpos))
{
error=1; goto err; /* purecov: inspected */
}
}
}
}
while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) !=
-1 && error != 0);
lastbuff->count=count;
if (to_file)
lastbuff->file_pos=to_start_filepos;
err:
delete_queue(&queue);
DBUG_RETURN(error);
} /* merge_buffers */
/* Do a merge to output-file (save only positions) */
static int NEAR_F
merge_index(MARIA_SORT_PARAM *info, uint keys, uchar **sort_keys,
BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
{
DBUG_ENTER("merge_index");
if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
buffpek+maxbuffer))
DBUG_RETURN(1); /* purecov: inspected */
DBUG_RETURN(0);
} /* merge_index */
static int flush_maria_ft_buf(MARIA_SORT_PARAM *info)
{
int err=0;
if (info->sort_info->ft_buf)
{
err=_ma_sort_ft_buf_flush(info);
my_free((uchar*)info->sort_info->ft_buf, MYF(0));
info->sort_info->ft_buf=0;
}
return err;
}