/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* Cashing of files with only does (sequential) read or writes of fixed- length records. A read isn't allowed to go over file-length. A read is ok if it ends at file-length and next read can try to read after file-length (and get a EOF-error). Possibly use of asyncronic io. macros for read and writes for faster io. Used instead of FILE when reading or writing whole files. This will make mf_rec_cache obsolete. One can change info->pos_in_file to a higher value to skip bytes in file if also info->rc_pos is set to info->rc_end. If called through open_cached_file(), then the temporary file will only be created if a write exeeds the file buffer or if one calls flush_io_cache(). */ #define MAP_TO_USE_RAID #include "mysql_priv.h" #include #ifdef HAVE_AIOWAIT #include static void my_aiowait(my_aio_result *result); #endif /* if cachesize == 0 then use default cachesize (from s-file) */ /* returns 0 if we have enough memory */ extern "C" { int init_io_cache(IO_CACHE *info, File file, uint cachesize, enum cache_type type, my_off_t seek_offset, pbool use_async_io, myf cache_myflags) { uint min_cache; DBUG_ENTER("init_io_cache"); DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset)); /* There is no file in net_reading */ info->file= file; if (!cachesize) if (! (cachesize= my_default_record_cache_size)) DBUG_RETURN(1); /* No cache requested */ min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2; if (type == READ_CACHE) { /* Assume file isn't growing */ my_off_t file_pos,end_of_file; if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR)) DBUG_RETURN(1); end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0)); if (end_of_file < seek_offset) end_of_file=seek_offset; VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0))); if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1) { cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1; use_async_io=0; /* No nead to use async */ } } if ((int) type < (int) READ_NET) { for (;;) { cachesize=(uint) ((ulong) (cachesize + min_cache-1) & (ulong) ~(min_cache-1)); if (cachesize < min_cache) cachesize = min_cache; if ((info->buffer= (byte*) my_malloc(cachesize, MYF((cache_myflags & ~ MY_WME) | (cachesize == min_cache ? MY_WME : 0)))) != 0) break; /* Enough memory found */ if (cachesize == min_cache) DBUG_RETURN(2); /* Can't alloc cache */ cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */ } } else info->buffer=0; DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize)); info->pos_in_file= seek_offset; info->read_length=info->buffer_length=cachesize; info->seek_not_done= test(file >= 0 && type != READ_FIFO && type != READ_NET); info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP); info->rc_request_pos=info->rc_pos=info->buffer; if (type == READ_CACHE || type == READ_NET || type == READ_FIFO) { info->rc_end=info->buffer; /* Nothing in cache */ } else /* type == WRITE_CACHE */ { info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); } /* end_of_file may be changed by user later */ info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0 : MY_FILEPOS_ERROR); info->type=type; info->error=0; info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; /* net | file */ #ifdef HAVE_AIOWAIT if (use_async_io && ! my_disable_async_io) { DBUG_PRINT("info",("Using async io")); info->read_length/=2; info->read_function=_my_b_async_read; } info->inited=info->aio_result.pending=0; #endif DBUG_RETURN(0); } /* init_io_cache */ /* Wait until current request is ready */ #ifdef HAVE_AIOWAIT static void my_aiowait(my_aio_result *result) { if (result->pending) { struct aio_result_t *tmp; for (;;) { if ((int) (tmp=aiowait((struct timeval *) 0)) == -1) { if (errno == EINTR) continue; DBUG_PRINT("error",("No aio request, error: %d",errno)); result->pending=0; /* Assume everythings is ok */ break; } ((my_aio_result*) tmp)->pending=0; if ((my_aio_result*) tmp == result) break; } } return; } #endif /* Use this to reset cache to start or other type */ /* Some simple optimizing is done when reinit in current buffer */ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, my_off_t seek_offset, pbool use_async_io __attribute__((unused)), pbool clear_cache) { DBUG_ENTER("reinit_io_cache"); info->seek_not_done= test(info->file >= 0); /* Seek not done */ if (!clear_cache && seek_offset >= info->pos_in_file && seek_offset <= info->pos_in_file + (uint) (info->rc_end - info->rc_request_pos)) { /* use current buffer */ if (info->type == WRITE_CACHE && type == READ_CACHE) { info->rc_end=info->rc_pos; info->end_of_file=my_b_tell(info); } else if (info->type == READ_CACHE && type == WRITE_CACHE) info->rc_end=info->buffer+info->buffer_length; info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); #ifdef HAVE_AIOWAIT my_aiowait(&info->aio_result); /* Wait for outstanding req */ #endif } else { if (info->type == WRITE_CACHE && type == READ_CACHE) info->end_of_file=my_b_tell(info); if (flush_io_cache(info)) DBUG_RETURN(1); info->pos_in_file=seek_offset; info->rc_request_pos=info->rc_pos=info->buffer; if (type == READ_CACHE || type == READ_NET || type == READ_FIFO) { info->rc_end=info->buffer; /* Nothing in cache */ } else { info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 : MY_FILEPOS_ERROR); } } info->type=type; info->error=0; info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; #ifdef HAVE_AIOWAIT if (type != READ_NET) { if (use_async_io && ! my_disable_async_io && ((ulong) info->buffer_length < (ulong) (info->end_of_file - seek_offset))) { info->read_length=info->buffer_length/2; info->read_function=_my_b_async_read; } } info->inited=0; #endif DBUG_RETURN(0); } /* init_io_cache */ /* Read buffered. Returns 1 if can't read requested characters */ /* Returns 0 if record read */ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) { uint length,diff_length,left_length; my_off_t max_length, pos_in_file; memcpy(Buffer,info->rc_pos, (size_t) (left_length=(uint) (info->rc_end-info->rc_pos))); Buffer+=left_length; Count-=left_length; pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); if (info->seek_not_done) { /* File touched, do seek */ VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); info->seek_not_done=0; } diff_length=(uint) (pos_in_file & (IO_SIZE-1)); if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) { /* Fill first intern buffer */ uint read_length; if (info->end_of_file == pos_in_file) { /* End of file */ info->error=(int) left_length; return 1; } length=(Count & (uint) ~(IO_SIZE-1))-diff_length; if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) != (uint) length) { info->error= read_length == (uint) -1 ? -1 : (int) (read_length+left_length); return 1; } Count-=length; Buffer+=length; pos_in_file+=length; left_length+=length; diff_length=0; } max_length=info->read_length-diff_length; if (info->type != READ_FIFO && (info->end_of_file - pos_in_file) < max_length) max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) { info->error= left_length; /* We only got this many char */ return 1; } length=0; /* Didn't read any chars */ } else if ((length=my_read(info->file,info->buffer,(uint) max_length, info->myflags)) < Count || length == (uint) -1) { if (length != (uint) -1) memcpy(Buffer,info->buffer,(size_t) length); info->error= length == (uint) -1 ? -1 : (int) (length+left_length); return 1; } info->rc_pos=info->buffer+Count; info->rc_end=info->buffer+length; info->pos_in_file=pos_in_file; memcpy(Buffer,info->buffer,(size_t) Count); return 0; } /* ** Read buffered from the net. ** Returns 1 if can't read requested characters ** Returns 0 if record read */ int _my_b_net_read(register IO_CACHE *info, byte *Buffer, uint Count __attribute__((unused))) { int read_length; NET *net= &(current_thd)->net; if (info->end_of_file) return 1; /* because my_b_get (no _) takes 1 byte at a time */ read_length=my_net_read(net); if (read_length == (int) packet_error) { info->error=(uint) -1; return 1; } if (read_length == 0) { /* End of file from client */ info->end_of_file = 1; return 1; } /* to set up stuff for my_b_get (no _) */ info->rc_end = (info->rc_pos = (byte*) net->read_pos) + read_length; Buffer[0] = info->rc_pos[0]; /* length is always 1 */ info->rc_pos++; return 0; } #ifdef HAVE_AIOWAIT int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) { uint length,read_length,diff_length,left_length,use_length,org_Count; my_off_t max_length; my_off_t next_pos_in_file; byte *read_buffer; memcpy(Buffer,info->rc_pos, (size_t) (left_length=(uint) (info->rc_end-info->rc_pos))); Buffer+=left_length; org_Count=Count; Count-=left_length; if (info->inited) { /* wait for read block */ info->inited=0; /* No more block to read */ my_aiowait(&info->aio_result); /* Wait for outstanding req */ if (info->aio_result.result.aio_errno) { if (info->myflags & MY_WME) my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG), my_filename(info->file), info->aio_result.result.aio_errno); my_errno=info->aio_result.result.aio_errno; info->error= -1; return(1); } if (! (read_length = (uint) info->aio_result.result.aio_return) || read_length == (uint) -1) { my_errno=0; /* For testing */ info->error= (read_length == (uint) -1 ? -1 : (int) (read_length+left_length)); return(1); } info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos); if (info->rc_request_pos != info->buffer) info->rc_request_pos=info->buffer; else info->rc_request_pos=info->buffer+info->read_length; info->rc_pos=info->rc_request_pos; next_pos_in_file=info->aio_read_pos+read_length; /* Check if pos_in_file is changed (_ni_read_cache may have skipped some bytes) */ if (info->aio_read_pos < info->pos_in_file) { /* Fix if skipped bytes */ if (info->aio_read_pos + read_length < info->pos_in_file) { read_length=0; /* Skipp block */ next_pos_in_file=info->pos_in_file; } else { my_off_t offset= (info->pos_in_file - info->aio_read_pos); info->pos_in_file=info->aio_read_pos; /* Whe are here */ info->rc_pos=info->rc_request_pos+offset; read_length-=offset; /* Bytes left from rc_pos */ } } #ifndef DBUG_OFF if (info->aio_read_pos > info->pos_in_file) { my_errno=EINVAL; return(info->read_length= -1); } #endif /* Copy found bytes to buffer */ length=min(Count,read_length); memcpy(Buffer,info->rc_pos,(size_t) length); Buffer+=length; Count-=length; left_length+=length; info->rc_end=info->rc_pos+read_length; info->rc_pos+=length; } else next_pos_in_file=(info->pos_in_file+ (uint) (info->rc_end - info->rc_request_pos)); /* If reading large blocks, or first read or read with skipp */ if (Count) { if (next_pos_in_file == info->end_of_file) { info->error=(int) (read_length+left_length); return 1; } VOID(my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))); read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1)); if (Count < read_length) { /* Small block, read to cache */ if ((read_length=my_read(info->file,info->rc_request_pos, read_length, info->myflags)) == (uint) -1) return info->error= -1; use_length=min(Count,read_length); memcpy(Buffer,info->rc_request_pos,(size_t) use_length); info->rc_pos=info->rc_request_pos+Count; info->rc_end=info->rc_request_pos+read_length; info->pos_in_file=next_pos_in_file; /* Start of block in cache */ next_pos_in_file+=read_length; if (Count != use_length) { /* Didn't find hole block */ if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count) my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG), my_filename(info->file),my_errno); info->error=(int) (read_length+left_length); return 1; } } else { /* Big block, don't cache it */ if ((read_length=my_read(info->file,Buffer,(uint) Count,info->myflags)) != Count) { info->error= read_length == (uint) -1 ? -1 : read_length+left_length; return 1; } info->rc_pos=info->rc_end=info->rc_request_pos; info->pos_in_file=(next_pos_in_file+=Count); } } /* Read next block with asyncronic io */ max_length=info->end_of_file - next_pos_in_file; diff_length=(next_pos_in_file & (IO_SIZE-1)); if (max_length > (my_off_t) info->read_length - diff_length) max_length= (my_off_t) info->read_length - diff_length; if (info->rc_request_pos != info->buffer) read_buffer=info->buffer; else read_buffer=info->buffer+info->read_length; info->aio_read_pos=next_pos_in_file; if (max_length) { info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */ DBUG_PRINT("aioread",("filepos: %ld length: %ld", (ulong) next_pos_in_file,(ulong) max_length)); if (aioread(info->file,read_buffer,(int) max_length, (my_off_t) next_pos_in_file,MY_SEEK_SET, &info->aio_result.result)) { /* Skipp async io */ my_errno=errno; DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped", errno, info->aio_result.result.aio_errno)); if (info->rc_request_pos != info->buffer) { bmove(info->buffer,info->rc_request_pos, (uint) (info->rc_end - info->rc_pos)); info->rc_request_pos=info->buffer; info->rc_pos-=info->read_length; info->rc_end-=info->read_length; } info->read_length=info->buffer_length; /* Use hole buffer */ info->read_function=_my_b_read; /* Use normal IO_READ next */ } else info->inited=info->aio_result.pending=1; } return 0; /* Block read, async in use */ } /* _my_b_async_read */ #endif /* Read one byte when buffer is empty */ int _my_b_get(IO_CACHE *info) { byte buff; if ((*(info)->read_function)(info,&buff,1)) return my_b_EOF; return (int) (uchar) buff; } /* Returns != 0 if error on write */ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) { uint rest_length,length; rest_length=(uint) (info->rc_end - info->rc_pos); memcpy(info->rc_pos,Buffer,(size_t) rest_length); Buffer+=rest_length; Count-=rest_length; info->rc_pos+=rest_length; if (flush_io_cache(info)) return 1; if (Count >= IO_SIZE) { /* Fill first intern buffer */ length=Count & (uint) ~(IO_SIZE-1); if (info->seek_not_done) { /* File touched, do seek */ VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0))); info->seek_not_done=0; } if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) return info->error= -1; Count-=length; Buffer+=length; info->pos_in_file+=length; } memcpy(info->rc_pos,Buffer,(size_t) Count); info->rc_pos+=Count; return 0; } /* Write a block to disk where part of the data may be inside the record buffer. As all write calls to the data goes through the cache, we will never get a seek over the end of the buffer */ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, my_off_t pos) { uint length; int error=0; if (pos < info->pos_in_file) { /* Of no overlap, write everything without buffering */ if (pos + Count <= info->pos_in_file) return my_pwrite(info->file, Buffer, Count, pos, info->myflags | MY_NABP); /* Write the part of the block that is before buffer */ length= (uint) (info->pos_in_file - pos); if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP)) info->error=error=-1; Buffer+=length; pos+= length; Count-= length; } /* Check if we want to write inside the used part of the buffer.*/ length= (uint) (info->rc_end - info->buffer); if (pos < info->pos_in_file + length) { uint offset= (uint) (pos - info->pos_in_file); length-=offset; if (length > Count) length=Count; memcpy(info->buffer+offset, Buffer, length); Buffer+=length; Count-= length; /* Fix length of buffer if the new data was larger */ if (info->buffer+length > info->rc_pos) info->rc_pos=info->buffer+length; if (!Count) return (error); } /* Write at the end of the current buffer; This is the normal case */ if (_my_b_write(info, Buffer, Count)) error= -1; return error; } /* Flush write cache */ int flush_io_cache(IO_CACHE *info) { uint length; DBUG_ENTER("flush_io_cache"); if (info->type == WRITE_CACHE) { if (info->file == -1) { if (real_open_cached_file(info)) DBUG_RETURN((info->error= -1)); } if (info->rc_pos != info->buffer) { length=(uint) (info->rc_pos - info->buffer); if (info->seek_not_done) { /* File touched, do seek */ VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0))); info->seek_not_done=0; } info->rc_pos=info->buffer; info->pos_in_file+=length; info->rc_end=(info->buffer+info->buffer_length- (info->pos_in_file & (IO_SIZE-1))); if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) DBUG_RETURN((info->error= -1)); DBUG_RETURN(0); } } #ifdef HAVE_AIOWAIT else if (info->type != READ_NET) { my_aiowait(&info->aio_result); /* Wait for outstanding req */ info->inited=0; } #endif DBUG_RETURN(0); } int end_io_cache(IO_CACHE *info) { int error=0; DBUG_ENTER("end_io_cache"); if (info->buffer) { if (info->file != -1) /* File doesn't exist */ error=flush_io_cache(info); my_free((gptr) info->buffer,MYF(MY_WME)); info->buffer=info->rc_pos=(byte*) 0; } DBUG_RETURN(error); } /* end_io_cache */ }