mariadb/storage/ibmdb2i/db2i_ioBuffers.h
Michael Widenius b2abd1cb0c Fixed typos
Patch provided by Dolf Schimmel
2011-01-07 13:45:59 +02:00

416 lines
12 KiB
C++

/*
Licensed Materials - Property of IBM
DB2 Storage Engine Enablement
Copyright IBM Corporation 2007,2008
All rights reserved
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
(a) Redistributions of source code must retain this list of conditions, the
copyright notice in section {d} below, and the disclaimer following this
list of conditions.
(b) Redistributions in binary form must reproduce this list of conditions, the
copyright notice in section (d) below, and the disclaimer following this
list of conditions, in the documentation and/or other materials provided
with the distribution.
(c) The name of IBM may not be used to endorse or promote products derived from
this software without specific prior written permission.
(d) The text of the required copyright notice is:
Licensed Materials - Property of IBM
DB2 Storage Engine Enablement
Copyright IBM Corporation 2007,2008
All rights reserved
THIS SOFTWARE IS PROVIDED BY IBM CORPORATION "AS IS" AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
SHALL IBM CORPORATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
OF SUCH DAMAGE.
*/
/**
@file db2i_ioBuffers.h
@brief Buffer classes used for interacting with QMYSE read/write buffers.
*/
#include "db2i_validatedPointer.h"
#include "mysql_priv.h"
#include <sys/stat.h>
#include <fcntl.h>
#include <as400_types.h>
// Needed for compilers which do not include fstatx in standard headers.
extern "C" int fstatx(int, struct stat *, int, int);
/**
Basic row buffer
Provides the basic structure and methods needed for communicating
with QMYSE I/O APIs.
@details All QMYSE I/O apis use a buffer that is structured as two integer
row counts (max and used) and storage for some number of rows. The row counts
are both input and output for the API, and their usage depends on the
particular API invoked. This class encapsulates that buffer definition.
*/
class IORowBuffer
{
public:
IORowBuffer() : allocSize(0), rowLength(0) {;}
~IORowBuffer() { freeBuf(); }
ValidatedPointer<char>& ptr() { return data; }
/**
Sets up the buffer to hold the size indicated.
@param rowLen length of the rows that will be stored in this buffer
@param nullMapOffset position of null map within each row
@param size buffer size requested
*/
void allocBuf(uint32 rowLen, uint16 nullMapOffset, uint32 size)
{
nullOffset = nullMapOffset;
uint32 newSize = size + sizeof(BufferHdr_t);
// If the internal structure of the row is changing, we need to
// remember this and notify the subclasses via initAfterAllocate();
bool formatChanged = ((size/rowLen) != rowCapacity);
if (newSize > allocSize)
{
this->freeBuf();
data.alloc(newSize);
if (likely((void*)data))
allocSize = newSize;
}
if (likely((void*)data))
{
DBUG_ASSERT((uint64)(void*)data % 16 == 0);
rowLength = rowLen;
rowCapacity = size / rowLength;
initAfterAllocate(formatChanged);
}
else
{
allocSize = 0;
rowCapacity = 0;
}
DBUG_PRINT("db2i_ioBuffers::allocBuf",("rowCapacity = %d", rowCapacity));
}
void zeroBuf()
{
memset(data, 0, allocSize);
}
void freeBuf()
{
if (likely(allocSize))
{
prepForFree();
DBUG_PRINT("IORowBuffer::freeBuf",("Freeing 0x%p", (char*)data));
data.dealloc();
}
}
char* getRowN(uint32 n)
{
if (unlikely(n >= getRowCapacity()))
return NULL;
return (char*)data + sizeof(BufferHdr_t) + (rowLength * n);
};
uint32 getRowCapacity() const {return rowCapacity;}
uint32 getRowNullOffset() const {return nullOffset;}
uint32 getRowLength() const {return rowLength;}
protected:
/**
Called prior to freeing buffer storage so that subclasses can do
any required cleanup
*/
virtual void prepForFree()
{
allocSize = 0;
rowCapacity = 0;
}
/**
Called after buffer storage so that subclasses can do any required setup.
*/
virtual void initAfterAllocate(bool sizeChanged) { return;}
ValidatedPointer<char> data;
uint32 allocSize;
uint32 rowCapacity;
uint32 rowLength;
uint16 nullOffset;
uint32& usedRows() const { return ((BufferHdr_t*)(char*)data)->UsedRowCnt; }
uint32& maxRows() const {return ((BufferHdr_t*)(char*)data)->MaxRowCnt; }
};
/**
Write buffer
Implements methods for inserting data into a row buffer for use with the
QMY_WRITE and QMY_UPDATE APIs.
@details The max row count defines how many rows are in the buffer. The used
row count is updated by QMYSE to indicate how many rows have been
successfully written.
*/
class IOWriteBuffer : public IORowBuffer
{
public:
bool endOfBuffer() const {return (maxRows() == getRowCapacity());}
char* addRow()
{
return getRowN(maxRows()++);
}
void resetAfterWrite()
{
maxRows() = 0;
}
void deleteRow()
{
--maxRows();
}
uint32 rowCount() const {return maxRows();}
uint32 rowsWritten() const {return usedRows()-1;}
private:
void initAfterAllocate(bool sizeChanged) {maxRows() = 0; usedRows() = 0;}
};
/**
Read buffer
Implements methods for reading data from and managing a row buffer for use
with the QMY_READ APIs. This is primarily for use with metainformation queries.
*/
class IOReadBuffer : public IORowBuffer
{
public:
IOReadBuffer() {;}
IOReadBuffer(uint32 rows, uint32 rowLength)
{
allocBuf(rows, 0, rows * rowLength);
maxRows() = rows;
}
uint32 rowCount() {return usedRows();}
void setRowsToProcess(uint32 rows) { maxRows() = rows; }
};
/**
Read buffer
Implements methods for reading data from and managing a row buffer for use
with the QMY_READ APIs.
@details This class supports both sync and async read modes. The max row
count defines the number of rows that are requested to be read. The used row
count defines how many rows have been read. Sync mode is reasonably
straightforward, but async mode has a complex system of communicating with
QMYSE that is optimized for low latency. In async mode, the used row count is
updated continuously by QMYSE as rows are read. At the same time, messages are
sent to the associated pipe indicating that a row has been read. As long as
the internal read cursor lags behind the used row count, the pipe is never
consulted. But if the internal read cursor "catches up to" the used row count,
then we block on the pipe until we find a message indicating that a new row
has been read or that an error has occurred.
*/
class IOAsyncReadBuffer : public IOReadBuffer
{
public:
IOAsyncReadBuffer() :
file(0), readIsAsync(false), msgPipe(QMY_REUSE), bridge(NULL)
{
}
~IOAsyncReadBuffer()
{
interruptRead();
rrnList.dealloc();
}
/**
Signal read operation complete
Indicates that the storage engine requires no more data from the table.
Must be called between calls to newReadRequest().
*/
void endRead()
{
#ifndef DBUG_OFF
if (readCursor < rowCount())
DBUG_PRINT("PERF:",("Wasting %d buffered rows!\n", rowCount() - readCursor));
#endif
interruptRead();
file = 0;
bridge = NULL;
}
/**
Update data that may change on each read operation
*/
void update(char newAccessIntent,
bool* newReleaseRowNeeded,
char commitLvl)
{
accessIntent = newAccessIntent;
releaseRowNeeded = newReleaseRowNeeded;
commitLevel = commitLvl;
}
/**
Read the next row in the table.
Return a pointer to the next row in the table, where "next" is defined
by the orientation.
@param orientation
@param[out] rrn The relative record number of the row returned. Not reliable
if NULL is returned by this function.
@return Pointer to the row. Null if no more rows are available or an error
occurred.
*/
char* readNextRow(char orientation, uint32& rrn)
{
DBUG_PRINT("db2i_ioBuffers::readNextRow", ("readCursor: %d, filledRows: %d, rc: %d", readCursor, rowCount(), rc));
while (readCursor >= rowCount() && !rc)
{
if (!readIsAsync)
loadNewRows(orientation);
else
pollNextRow(orientation);
}
if (readCursor >= rowCount())
return NULL;
rrn = rrnList[readCursor];
return getRowN(readCursor++);
}
/**
Retrieve the return code generated by the last operation.
@return The return code, translated to the appropriate HA_ERR_*
value if possible.
*/
int32 lastrc()
{
return db2i_ileBridge::translateErrorCode(rc);
}
void rewind()
{
readCursor = 0;
rc = 0;
usedRows() = 0;
}
bool reachedEOD() { return EOD; }
void newReadRequest(FILE_HANDLE infile,
char orientation,
uint32 rowsToBuffer,
bool useAsync,
ILEMemHandle key,
int keyLength,
int keyParts);
private:
/**
End any running async read operation.
*/
void interruptRead()
{
closePipe();
if (file && readIsAsync && (rc == 0) && (rowCount() < getRowCapacity()))
{
DBUG_PRINT("IOReadBuffer::interruptRead", ("PERF: Interrupting %d", (uint32)file));
getBridge()->readInterrupt(file);
}
}
void closePipe()
{
if (msgPipe != QMY_REUSE)
{
DBUG_PRINT("db2i_ioBuffers::closePipe", ("Closing pipe %d", msgPipe));
close(msgPipe);
msgPipe = QMY_REUSE;
}
}
/**
Get a pointer to the active ILE bridge.
Getting the bridge pointer is (relatively) expensive, so we cache
it off for each operation.
*/
db2i_ileBridge* getBridge()
{
if (unlikely(bridge == NULL))
{
bridge = db2i_ileBridge::getBridgeForThread();
}
return bridge;
}
void drainPipe();
void pollNextRow(char orientation);
void prepForFree();
void initAfterAllocate(bool sizeChanged);
void loadNewRows(char orientation);
uint32 readCursor; // Read position within buffer
int32 rc; // Last return code received
ValidatedPointer<uint32> rrnList; // Receiver for list of rrns
char accessIntent; // The access intent for this read
char commitLevel; // What isolation level should be used
char EOD; // Whether end-of-data was hit
char readIsAsync; // Are reads to be done asynchronously?
bool* releaseRowNeeded;
/* Does the caller need to release the current row when finished reading */
FILE_HANDLE file; // The file to be read
int msgPipe;
/* The read descriptor of the pipe used to pass messages during async reads */
db2i_ileBridge* bridge; // Cached pointer to bridge
uint32 rowsToBlock; // Number of rows to request
enum
{
ConsumedFullBufferMsg,
PendingFullBufferMsg,
Untouched
} pipeState;
/* The state of the async read message pipe */
};