mirror of
https://github.com/MariaDB/server.git
synced 2025-01-19 05:22:25 +01:00
ndb - add OM_AUTO_SYNC ta make sure os-kernel does not buffer too much
add sync-flag to FsAppendReq storage/ndb/include/kernel/signaldata/FsAppendReq.hpp: Add sync flag to FsAppend storage/ndb/include/kernel/signaldata/FsOpenReq.hpp: Add auto sync flag to FSOPEN storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp: Add append_synch and auto sync storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp: Add variables for auto sync storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp: Add append_sync and auto sync
This commit is contained in:
parent
9f820cf2ce
commit
f6e3b15d25
5 changed files with 43 additions and 24 deletions
|
@ -39,7 +39,7 @@ class FsAppendReq {
|
||||||
friend bool printFSAPPENDREQ(FILE * output, const Uint32 * theData,
|
friend bool printFSAPPENDREQ(FILE * output, const Uint32 * theData,
|
||||||
Uint32 len, Uint16 receiverBlockNo);
|
Uint32 len, Uint16 receiverBlockNo);
|
||||||
public:
|
public:
|
||||||
STATIC_CONST( SignalLength = 6 );
|
STATIC_CONST( SignalLength = 7 );
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -52,6 +52,7 @@ private:
|
||||||
UintR varIndex; // DATA 3
|
UintR varIndex; // DATA 3
|
||||||
UintR offset; // DATA 4
|
UintR offset; // DATA 4
|
||||||
UintR size; // DATA 5
|
UintR size; // DATA 5
|
||||||
|
UintR synch_flag; // DATA 6
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -53,7 +53,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Length of signal
|
* Length of signal
|
||||||
*/
|
*/
|
||||||
STATIC_CONST( SignalLength = 10 );
|
STATIC_CONST( SignalLength = 11 );
|
||||||
SECTION( FILENAME = 0 );
|
SECTION( FILENAME = 0 );
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -69,6 +69,7 @@ private:
|
||||||
Uint32 page_size;
|
Uint32 page_size;
|
||||||
Uint32 file_size_hi;
|
Uint32 file_size_hi;
|
||||||
Uint32 file_size_lo;
|
Uint32 file_size_lo;
|
||||||
|
Uint32 auto_sync_size; // In bytes
|
||||||
|
|
||||||
STATIC_CONST( OM_READONLY = 0 );
|
STATIC_CONST( OM_READONLY = 0 );
|
||||||
STATIC_CONST( OM_WRITEONLY = 1 );
|
STATIC_CONST( OM_WRITEONLY = 1 );
|
||||||
|
@ -80,10 +81,10 @@ private:
|
||||||
STATIC_CONST( OM_TRUNCATE = 0x200 );
|
STATIC_CONST( OM_TRUNCATE = 0x200 );
|
||||||
STATIC_CONST( OM_AUTOSYNC = 0x400 );
|
STATIC_CONST( OM_AUTOSYNC = 0x400 );
|
||||||
|
|
||||||
STATIC_CONST( OM_CREATE_IF_NONE = 0x0400 );
|
STATIC_CONST( OM_CREATE_IF_NONE = 0x0800 );
|
||||||
STATIC_CONST( OM_INIT = 0x0800 ); //
|
STATIC_CONST( OM_INIT = 0x1000 ); //
|
||||||
STATIC_CONST( OM_CHECK_SIZE = 0x1000 );
|
STATIC_CONST( OM_CHECK_SIZE = 0x2000 );
|
||||||
STATIC_CONST( OM_DIRECT = 0x2000 );
|
STATIC_CONST( OM_DIRECT = 0x4000 );
|
||||||
|
|
||||||
enum Suffixes {
|
enum Suffixes {
|
||||||
S_DATA = 0,
|
S_DATA = 0,
|
||||||
|
|
|
@ -219,6 +219,10 @@ AsyncFile::run()
|
||||||
case Request:: append:
|
case Request:: append:
|
||||||
appendReq(request);
|
appendReq(request);
|
||||||
break;
|
break;
|
||||||
|
case Request:: append_synch:
|
||||||
|
appendReq(request);
|
||||||
|
syncReq(request);
|
||||||
|
break;
|
||||||
case Request::rmrf:
|
case Request::rmrf:
|
||||||
rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
|
rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
|
||||||
break;
|
break;
|
||||||
|
@ -246,9 +250,8 @@ extern Uint32 Global_syncFreq;
|
||||||
|
|
||||||
void AsyncFile::openReq(Request* request)
|
void AsyncFile::openReq(Request* request)
|
||||||
{
|
{
|
||||||
m_openedWithSync = false;
|
m_auto_sync_freq = 0;
|
||||||
m_syncFrequency = 0;
|
m_write_wo_sync = 0;
|
||||||
m_syncCount= 0;
|
|
||||||
|
|
||||||
// for open.flags, see signal FSOPENREQ
|
// for open.flags, see signal FSOPENREQ
|
||||||
#ifdef NDB_WIN32
|
#ifdef NDB_WIN32
|
||||||
|
@ -329,7 +332,7 @@ void AsyncFile::openReq(Request* request)
|
||||||
|
|
||||||
if (flags & FsOpenReq::OM_AUTOSYNC)
|
if (flags & FsOpenReq::OM_AUTOSYNC)
|
||||||
{
|
{
|
||||||
m_syncFrequency = 1024*1024; // Hard coded to 1M
|
m_auto_sync_freq = request->par.open.auto_sync_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (flags & FsOpenReq::OM_APPEND){
|
if (flags & FsOpenReq::OM_APPEND){
|
||||||
|
@ -429,7 +432,7 @@ no_odirect:
|
||||||
{
|
{
|
||||||
request->error = errno;
|
request->error = errno;
|
||||||
}
|
}
|
||||||
else if(buf.st_size != request->par.open.file_size)
|
else if((Uint64)buf.st_size != request->par.open.file_size)
|
||||||
{
|
{
|
||||||
request->error = FsRef::fsErrInvalidFileSize;
|
request->error = FsRef::fsErrInvalidFileSize;
|
||||||
}
|
}
|
||||||
|
@ -737,6 +740,10 @@ AsyncFile::writeReq( Request * request)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} // while(write_not_complete)
|
} // while(write_not_complete)
|
||||||
|
|
||||||
|
if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
|
||||||
|
syncReq(request);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
|
@ -746,6 +753,8 @@ AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
|
||||||
size_t bytes_to_write = chunk_size;
|
size_t bytes_to_write = chunk_size;
|
||||||
int return_value;
|
int return_value;
|
||||||
|
|
||||||
|
m_write_wo_sync += size;
|
||||||
|
|
||||||
#ifdef NDB_WIN32
|
#ifdef NDB_WIN32
|
||||||
DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
|
DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
|
||||||
if(dwSFP != offset) {
|
if(dwSFP != offset) {
|
||||||
|
@ -805,7 +814,6 @@ AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
m_syncCount+= bytes_written;
|
|
||||||
buf += bytes_written;
|
buf += bytes_written;
|
||||||
size -= bytes_written;
|
size -= bytes_written;
|
||||||
offset += bytes_written;
|
offset += bytes_written;
|
||||||
|
@ -856,8 +864,7 @@ bool AsyncFile::isOpen(){
|
||||||
void
|
void
|
||||||
AsyncFile::syncReq(Request * request)
|
AsyncFile::syncReq(Request * request)
|
||||||
{
|
{
|
||||||
if(m_openedWithSync ||
|
if(m_auto_sync_freq && m_write_wo_sync == 0){
|
||||||
m_syncCount == 0){
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#ifdef NDB_WIN32
|
#ifdef NDB_WIN32
|
||||||
|
@ -871,7 +878,7 @@ AsyncFile::syncReq(Request * request)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
m_syncCount = 0;
|
m_write_wo_sync = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -880,7 +887,7 @@ AsyncFile::appendReq(Request * request){
|
||||||
const char * buf = request->par.append.buf;
|
const char * buf = request->par.append.buf;
|
||||||
Uint32 size = request->par.append.size;
|
Uint32 size = request->par.append.size;
|
||||||
|
|
||||||
m_syncCount += size;
|
m_write_wo_sync += size;
|
||||||
|
|
||||||
#ifdef NDB_WIN32
|
#ifdef NDB_WIN32
|
||||||
DWORD dwWritten = 0;
|
DWORD dwWritten = 0;
|
||||||
|
@ -912,7 +919,7 @@ AsyncFile::appendReq(Request * request){
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if(m_syncFrequency != 0 && m_syncCount > m_syncFrequency){
|
if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
|
||||||
syncReq(request);
|
syncReq(request);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,6 +123,7 @@ public:
|
||||||
sync,
|
sync,
|
||||||
end,
|
end,
|
||||||
append,
|
append,
|
||||||
|
append_synch,
|
||||||
rmrf,
|
rmrf,
|
||||||
readPartial
|
readPartial
|
||||||
};
|
};
|
||||||
|
@ -132,6 +133,7 @@ public:
|
||||||
Uint32 flags;
|
Uint32 flags;
|
||||||
Uint32 page_size;
|
Uint32 page_size;
|
||||||
Uint64 file_size;
|
Uint64 file_size;
|
||||||
|
Uint32 auto_sync_size;
|
||||||
} open;
|
} open;
|
||||||
struct {
|
struct {
|
||||||
int numberOfPages;
|
int numberOfPages;
|
||||||
|
@ -232,9 +234,8 @@ private:
|
||||||
int theWriteBufferSize;
|
int theWriteBufferSize;
|
||||||
char* theWriteBuffer;
|
char* theWriteBuffer;
|
||||||
|
|
||||||
bool m_openedWithSync;
|
size_t m_write_wo_sync; // Writes wo/ sync
|
||||||
Uint32 m_syncCount;
|
size_t m_auto_sync_freq; // Auto sync freq in bytes
|
||||||
Uint32 m_syncFrequency;
|
|
||||||
public:
|
public:
|
||||||
SimulatedBlock& m_fs;
|
SimulatedBlock& m_fs;
|
||||||
Ptr<GlobalPage> m_page_ptr;
|
Ptr<GlobalPage> m_page_ptr;
|
||||||
|
|
|
@ -230,6 +230,7 @@ Ndbfs::execFSOPENREQ(Signal* signal)
|
||||||
request->par.open.file_size = fsOpenReq->file_size_hi;
|
request->par.open.file_size = fsOpenReq->file_size_hi;
|
||||||
request->par.open.file_size <<= 32;
|
request->par.open.file_size <<= 32;
|
||||||
request->par.open.file_size |= fsOpenReq->file_size_lo;
|
request->par.open.file_size |= fsOpenReq->file_size_lo;
|
||||||
|
request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
|
||||||
|
|
||||||
ndbrequire(forward(file, request));
|
ndbrequire(forward(file, request));
|
||||||
}
|
}
|
||||||
|
@ -567,6 +568,7 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
|
||||||
const Uint32 tSz = myBaseAddrRef->nrr;
|
const Uint32 tSz = myBaseAddrRef->nrr;
|
||||||
const Uint32 offset = fsReq->offset;
|
const Uint32 offset = fsReq->offset;
|
||||||
const Uint32 size = fsReq->size;
|
const Uint32 size = fsReq->size;
|
||||||
|
const Uint32 synch_flag = fsReq->synch_flag;
|
||||||
Request *request = theRequestPool->get();
|
Request *request = theRequestPool->get();
|
||||||
|
|
||||||
if (openFile == NULL) {
|
if (openFile == NULL) {
|
||||||
|
@ -596,12 +598,15 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
|
||||||
request->error = 0;
|
request->error = 0;
|
||||||
request->set(userRef, userPointer, filePointer);
|
request->set(userRef, userPointer, filePointer);
|
||||||
request->file = openFile;
|
request->file = openFile;
|
||||||
request->action = Request::append;
|
|
||||||
request->theTrace = signal->getTrace();
|
request->theTrace = signal->getTrace();
|
||||||
|
|
||||||
request->par.append.buf = (const char *)(tWA + offset);
|
request->par.append.buf = (const char *)(tWA + offset);
|
||||||
request->par.append.size = size << 2;
|
request->par.append.size = size << 2;
|
||||||
|
|
||||||
|
if (!synch_flag)
|
||||||
|
request->action = Request::append;
|
||||||
|
else
|
||||||
|
request->action = Request::append_synch;
|
||||||
ndbrequire(forward(openFile, request));
|
ndbrequire(forward(openFile, request));
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -744,7 +749,9 @@ Ndbfs::report(Request * request, Signal* signal)
|
||||||
sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
|
sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case Request::append: {
|
case Request::append:
|
||||||
|
case Request::append_synch:
|
||||||
|
{
|
||||||
jam();
|
jam();
|
||||||
sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
|
sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
|
||||||
break;
|
break;
|
||||||
|
@ -814,7 +821,9 @@ Ndbfs::report(Request * request, Signal* signal)
|
||||||
sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB);
|
sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB);
|
||||||
break;
|
break;
|
||||||
}//case
|
}//case
|
||||||
case Request::append: {
|
case Request::append:
|
||||||
|
case Request::append_synch:
|
||||||
|
{
|
||||||
jam();
|
jam();
|
||||||
signal->theData[1] = request->par.append.size;
|
signal->theData[1] = request->par.append.size;
|
||||||
sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB);
|
sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB);
|
||||||
|
|
Loading…
Reference in a new issue