Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
This commit is contained in:
unknown 2006-06-28 17:12:18 +02:00
commit 523a8c0ccd
26 changed files with 417 additions and 125 deletions

View file

@ -188,6 +188,19 @@ ENGINE NDB;
CREATE INDEX b_i on t1(b);
CREATE INDEX bc_i on t1(b, c);
DROP TABLE t1;
CREATE TABLESPACE ts2
ADD DATAFILE 'datafile3.dat'
USE LOGFILE GROUP lg1
INITIAL_SIZE 1M
ENGINE NDB;
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile3.dat'
ENGINE NDB;
ERROR HY000: Failed to alter: NO SUCH FILE
ALTER TABLESPACE ts2
DROP DATAFILE 'datafile2.dat'
ENGINE NDB;
ERROR HY000: Failed to alter: NO SUCH FILE
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile2.dat'
ENGINE NDB;
@ -196,6 +209,11 @@ DROP DATAFILE 'datafile.dat'
ENGINE NDB;
DROP TABLESPACE ts1
ENGINE NDB;
ALTER TABLESPACE ts2
DROP DATAFILE 'datafile3.dat'
ENGINE NDB;
DROP TABLESPACE ts2
ENGINE NDB;
DROP LOGFILE GROUP lg1
ENGINE NDB;
**** End = And No = ****

View file

@ -280,6 +280,25 @@ CREATE INDEX bc_i on t1(b, c);
DROP TABLE t1;
# bug#20053
CREATE TABLESPACE ts2
ADD DATAFILE 'datafile3.dat'
USE LOGFILE GROUP lg1
INITIAL_SIZE 1M
ENGINE NDB;
--error ER_ALTER_FILEGROUP_FAILED
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile3.dat'
ENGINE NDB;
--error ER_ALTER_FILEGROUP_FAILED
ALTER TABLESPACE ts2
DROP DATAFILE 'datafile2.dat'
ENGINE NDB;
# bug#20053
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile2.dat'
ENGINE NDB;
@ -291,6 +310,13 @@ ENGINE NDB;
DROP TABLESPACE ts1
ENGINE NDB;
ALTER TABLESPACE ts2
DROP DATAFILE 'datafile3.dat'
ENGINE NDB;
DROP TABLESPACE ts2
ENGINE NDB;
DROP LOGFILE GROUP lg1
ENGINE NDB;

View file

@ -186,8 +186,8 @@ static int update_status_variables(Ndb_cluster_connection *c)
SHOW_VAR ndb_status_variables[]= {
{"cluster_node_id", (char*) &ndb_cluster_node_id, SHOW_LONG},
{"connected_host", (char*) &ndb_connected_host, SHOW_CHAR_PTR},
{"connected_port", (char*) &ndb_connected_port, SHOW_LONG},
{"config_from_host", (char*) &ndb_connected_host, SHOW_CHAR_PTR},
{"config_from_port", (char*) &ndb_connected_port, SHOW_LONG},
// {"number_of_replicas", (char*) &ndb_number_of_replicas, SHOW_LONG},
{"number_of_storage_nodes",(char*) &ndb_number_of_storage_nodes, SHOW_LONG},
{NullS, NullS, SHOW_LONG}
@ -10006,7 +10006,7 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
}
NdbError err;
NDBDICT *dict = ndb->getDictionary();
NDBDICT *dict= ndb->getDictionary();
int error;
const char * errmsg;
LINT_INIT(errmsg);
@ -10070,9 +10070,12 @@ int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
}
else if(info->ts_alter_tablespace_type == ALTER_TABLESPACE_DROP_FILE)
{
NdbDictionary::Datafile df = dict->getDatafile(0,
info->data_file_name);
if (strcmp(df.getPath(), info->data_file_name) == 0)
NdbDictionary::Tablespace ts= dict->getTablespace(info->tablespace_name);
NdbDictionary::Datafile df= dict->getDatafile(0, info->data_file_name);
NdbDictionary::ObjectId objid;
df.getTablespaceId(&objid);
if (ts.getObjectId() == objid.getObjectId() &&
strcmp(df.getPath(), info->data_file_name) == 0)
{
errmsg= " DROP DATAFILE";
if (dict->dropDatafile(df))
@ -10401,10 +10404,12 @@ static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables,
table->field[c++]->set_null(); // TABLE_NAME
// LOGFILE_GROUP_NAME
NdbDictionary::ObjectId objid;
uf.getLogfileGroupId(&objid);
table->field[c++]->store(uf.getLogfileGroup(),
strlen(uf.getLogfileGroup()),
system_charset_info);
table->field[c++]->store(uf.getLogfileGroupId()); // LOGFILE_GROUP_NUMBER
table->field[c++]->store(objid.getObjectId()); // LOGFILE_GROUP_NUMBER
table->field[c++]->store(ndbcluster_hton_name,
ndbcluster_hton_name_length,
system_charset_info); // ENGINE

View file

@ -159,7 +159,8 @@ struct CreateFileRef {
InvalidFilegroupVersion = 754,
FilenameAlreadyExists = 760,
OutOfFileRecords = 751,
InvalidFileType = 750
InvalidFileType = 750,
NotSupportedWhenDiskless = 775
};
Uint32 senderData;

View file

@ -45,7 +45,8 @@ public:
CopyFragRefError = 5,
TestStopOnError = 6,
CopySubscriptionRef = 7,
CopySubscriberRef = 8
CopySubscriberRef = 8,
StartFragRefError = 9
};
Uint32 errorRef;

View file

@ -184,7 +184,7 @@ public:
virtual int getObjectId() const;
private:
friend class Dictionary;
friend class NdbDictObjectImpl;
class NdbDictObjectImpl & m_impl;
};
@ -1469,11 +1469,11 @@ public:
void setSize(Uint64);
Uint64 getSize() const;
Uint64 getFree() const;
void setTablespace(const char * name);
void setTablespace(const class Tablespace &);
const char * getTablespace() const;
Uint32 getTablespaceId() const;
void getTablespaceId(ObjectId * dst) const;
void setNode(Uint32 nodeId);
Uint32 getNode() const;
@ -1516,7 +1516,7 @@ public:
void setLogfileGroup(const char * name);
void setLogfileGroup(const class LogfileGroup &);
const char * getLogfileGroup() const;
Uint32 getLogfileGroupId() const;
void getLogfileGroupId(ObjectId * dst) const;
void setNode(Uint32 nodeId);
Uint32 getNode() const;

View file

@ -42,7 +42,9 @@ public:
* @param parallel No of fragments to scan in parallel (0=max)
*/
virtual int readTuples(LockMode lock_mode = LM_Read,
Uint32 scan_flags = 0, Uint32 parallel = 0);
Uint32 scan_flags = 0,
Uint32 parallel = 0,
Uint32 batch = 0);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
@ -70,7 +72,7 @@ public:
(SF_ReadRangeNo & -(Int32)read_range_no) |
(SF_KeyInfo & -(Int32)keyinfo);
return readTuples(lock_mode, scan_flags, parallel);
return readTuples(lock_mode, scan_flags, parallel, batch);
}
#endif

View file

@ -58,7 +58,9 @@ public:
*/
virtual
int readTuples(LockMode lock_mode = LM_Read,
Uint32 scan_flags = 0, Uint32 parallel = 0);
Uint32 scan_flags = 0,
Uint32 parallel = 0,
Uint32 batch = 0);
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
/**

View file

@ -9537,7 +9537,14 @@ Dbdict::createEventComplete_RT_USER_GET(Signal* signal,
NodeReceiverGroup rg(DBDICT, c_aliveNodes);
RequestTracker & p = evntRecPtr.p->m_reqTracker;
p.init<CreateEvntRef>(c_counterMgr, rg, GSN_CREATE_EVNT_REF, evntRecPtr.i);
if (!p.init<CreateEvntRef>(c_counterMgr, rg, GSN_CREATE_EVNT_REF,
evntRecPtr.i))
{
jam();
evntRecPtr.p->m_errorCode = 701;
createEvent_sendReply(signal, evntRecPtr);
return;
}
sendSignal(rg, GSN_CREATE_EVNT_REQ, signal, CreateEvntReq::SignalLength, JBB);
}
@ -9825,8 +9832,12 @@ void Dbdict::execSUB_START_REQ(Signal* signal)
return;
}
OpSubEventPtr subbPtr;
Uint32 errCode = 0;
if (!c_opSubEvent.seize(subbPtr)) {
errCode = SubStartRef::Busy;
busy:
SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
{ // fix
Uint32 subcriberRef = ((SubStartReq*)signal->getDataPtr())->subscriberRef;
ref->subscriberRef = subcriberRef;
@ -9836,7 +9847,7 @@ void Dbdict::execSUB_START_REQ(Signal* signal)
// ret->setErrorLine(__LINE__);
// ret->setErrorNode(reference());
ref->senderRef = reference();
ref->errorCode = SubStartRef::Busy;
ref->errorCode = errCode;
sendSignal(origSenderRef, GSN_SUB_START_REF, signal,
SubStartRef::SignalLength2, JBB);
@ -9859,7 +9870,12 @@ void Dbdict::execSUB_START_REQ(Signal* signal)
subbPtr.p->m_senderRef = origSenderRef; // not sure if API sets correctly
NodeReceiverGroup rg(DBDICT, c_aliveNodes);
RequestTracker & p = subbPtr.p->m_reqTracker;
p.init<SubStartRef>(c_counterMgr, rg, GSN_SUB_START_REF, subbPtr.i);
if (!p.init<SubStartRef>(c_counterMgr, rg, GSN_SUB_START_REF, subbPtr.i))
{
c_opSubEvent.release(subbPtr);
errCode = SubStartRef::Busy;
goto busy;
}
SubStartReq* req = (SubStartReq*) signal->getDataPtrSend();
@ -10049,14 +10065,17 @@ void Dbdict::execSUB_STOP_REQ(Signal* signal)
return;
}
OpSubEventPtr subbPtr;
Uint32 errCode = 0;
if (!c_opSubEvent.seize(subbPtr)) {
errCode = SubStopRef::Busy;
busy:
SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
jam();
// ret->setErrorCode(SubStartRef::SeizeError);
// ret->setErrorLine(__LINE__);
// ret->setErrorNode(reference());
ref->senderRef = reference();
ref->errorCode = SubStopRef::Busy;
ref->errorCode = errCode;
sendSignal(origSenderRef, GSN_SUB_STOP_REF, signal,
SubStopRef::SignalLength, JBB);
@ -10081,10 +10100,16 @@ void Dbdict::execSUB_STOP_REQ(Signal* signal)
subbPtr.p->m_senderRef = origSenderRef; // not sure if API sets correctly
NodeReceiverGroup rg(DBDICT, c_aliveNodes);
RequestTracker & p = subbPtr.p->m_reqTracker;
p.init<SubStopRef>(c_counterMgr, rg, GSN_SUB_STOP_REF, subbPtr.i);
if (!p.init<SubStopRef>(c_counterMgr, rg, GSN_SUB_STOP_REF, subbPtr.i))
{
jam();
c_opSubEvent.release(subbPtr);
errCode = SubStopRef::Busy;
goto busy;
}
SubStopReq* req = (SubStopReq*) signal->getDataPtrSend();
req->senderRef = reference();
req->senderData = subbPtr.i;
@ -10374,9 +10399,14 @@ Dbdict::dropEventUTIL_EXECUTE_READ(Signal* signal,
NodeReceiverGroup rg(DBDICT, c_aliveNodes);
RequestTracker & p = evntRecPtr.p->m_reqTracker;
p.init<SubRemoveRef>(c_counterMgr, rg, GSN_SUB_REMOVE_REF,
evntRecPtr.i);
if (!p.init<SubRemoveRef>(c_counterMgr, rg, GSN_SUB_REMOVE_REF,
evntRecPtr.i))
{
evntRecPtr.p->m_errorCode = 701;
dropEvent_sendReply(signal, evntRecPtr);
return;
}
SubRemoveReq* req = (SubRemoveReq*) signal->getDataPtrSend();
req->senderRef = reference();
@ -15483,6 +15513,17 @@ Dbdict::create_file_prepare_start(Signal* signal, SchemaOp* op){
break;
}
{
Uint32 dl;
const ndb_mgm_configuration_iterator * p =
m_ctx.m_config.getOwnConfigIterator();
if(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &dl) && dl)
{
op->m_errorCode = CreateFileRef::NotSupportedWhenDiskless;
break;
}
}
// Loop through all filenames...
if(!c_obj_pool.seize(obj_ptr)){
op->m_errorCode = CreateTableRef::NoMoreTableRecords;

View file

@ -683,6 +683,7 @@ private:
void execGETGCIREQ(Signal *);
void execDIH_RESTARTREQ(Signal *);
void execSTART_RECCONF(Signal *);
void execSTART_FRAGREF(Signal *);
void execSTART_FRAGCONF(Signal *);
void execADD_FRAGCONF(Signal *);
void execADD_FRAGREF(Signal *);

View file

@ -257,6 +257,9 @@ Dbdih::Dbdih(Block_context& ctx):
addRecSignal(GSN_DICT_LOCK_CONF, &Dbdih::execDICT_LOCK_CONF);
addRecSignal(GSN_DICT_LOCK_REF, &Dbdih::execDICT_LOCK_REF);
addRecSignal(GSN_START_FRAGREF,
&Dbdih::execSTART_FRAGREF);
apiConnectRecord = 0;
connectRecord = 0;
fileRecord = 0;

View file

@ -1107,6 +1107,26 @@ void Dbdih::execSTART_FRAGCONF(Signal* signal)
return;
}//Dbdih::execSTART_FRAGCONF()
void Dbdih::execSTART_FRAGREF(Signal* signal)
{
jamEntry();
/**
* Kill starting node
*/
Uint32 errCode = signal->theData[1];
Uint32 nodeId = signal->theData[2];
SystemError * const sysErr = (SystemError*)&signal->theData[0];
sysErr->errorCode = SystemError::StartFragRefError;
sysErr->errorRef = reference();
sysErr->data1 = errCode;
sysErr->data2 = 0;
sendSignal(calcNdbCntrBlockRef(nodeId), GSN_SYSTEM_ERROR, signal,
SystemError::SignalLength, JBB);
return;
}//Dbdih::execSTART_FRAGCONF()
void Dbdih::execSTART_MEREF(Signal* signal)
{
jamEntry();

View file

@ -8073,15 +8073,15 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal);
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
jam();
closeScanLab(signal);
return;
} else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
jam();
closeScanLab(signal);
return;
} else {
jam();
/*

View file

@ -205,6 +205,13 @@ void Ndbcntr::execSYSTEM_ERROR(Signal* signal)
killingNode, data1);
break;
case SystemError::StartFragRefError:
BaseString::snprintf(buf, sizeof(buf),
"Node %d killed this node because "
"it replied StartFragRef error code: %u.",
killingNode, data1);
break;
case SystemError::CopySubscriptionRef:
BaseString::snprintf(buf, sizeof(buf),
"Node %d killed this node because "

View file

@ -2480,7 +2480,8 @@ Suma::execSUB_STOP_REQ(Signal* signal){
TablePtr tabPtr;
tabPtr.i = subPtr.p->m_table_ptrI;
if (!(tabPtr.p = c_tables.getPtr(tabPtr.i)) ||
if (tabPtr.i == RNIL ||
!(tabPtr.p = c_tables.getPtr(tabPtr.i)) ||
tabPtr.p->m_tableId != subPtr.p->m_tableId)
{
jam();

View file

@ -26,12 +26,12 @@ public:
void init() { m_confs.clear(); m_nRefs = 0; }
template<typename SignalClass>
void init(SafeCounterManager& mgr,
bool init(SafeCounterManager& mgr,
NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData)
{
init();
SafeCounter tmp(mgr, m_sc);
tmp.init<SignalClass>(rg, GSN, senderData);
return tmp.init<SignalClass>(rg, GSN, senderData);
}
bool ignoreRef(SafeCounterManager& mgr, Uint32 nodeId)

View file

@ -230,10 +230,13 @@ inline
bool
SafeCounter::init(NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData){
bool b = init<Ref>(rg.m_block, GSN, senderData);
m_nodes = rg.m_nodes;
m_count = m_nodes.count();
return b;
if (init<Ref>(rg.m_block, GSN, senderData))
{
m_nodes = rg.m_nodes;
m_count = m_nodes.count();
return true;
}
return false;
}
template<typename Ref>
@ -241,10 +244,13 @@ inline
bool
SafeCounter::init(NodeReceiverGroup rg, Uint32 senderData){
bool b = init<Ref>(rg.m_block, Ref::GSN, senderData);
m_nodes = rg.m_nodes;
m_count = m_nodes.count();
return b;
if (init<Ref>(rg.m_block, Ref::GSN, senderData))
{
m_nodes = rg.m_nodes;
m_count = m_nodes.count();
return true;
}
return false;
}
inline

View file

@ -30,6 +30,7 @@ extern my_bool opt_core;
#define MAX_LINE_LENGTH 255
#define KEY_INTERNAL 0
#define MAX_INT_RNIL 0xfffffeff
#define MAX_PORT_NO 65535
#define _STR_VALUE(x) #x
#define STR_VALUE(x) _STR_VALUE(x)
@ -422,7 +423,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
UNDEFINED,
"1",
STR_VALUE(MAX_INT_RNIL) },
STR_VALUE(MAX_PORT_NO) },
{
CFG_DB_NO_REPLICAS,
@ -877,7 +878,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
false,
ConfigInfo::CI_INT,
"8",
"1",
"3",
STR_VALUE(MAX_INT_RNIL) },
{
@ -1510,7 +1511,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
NDB_PORT,
"0",
STR_VALUE(MAX_INT_RNIL) },
STR_VALUE(MAX_PORT_NO) },
{
KEY_INTERNAL,
@ -1522,7 +1523,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
UNDEFINED,
"0",
STR_VALUE(MAX_INT_RNIL) },
STR_VALUE(MAX_PORT_NO) },
{
CFG_NODE_ARBIT_RANK,
@ -1664,7 +1665,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"0",
STR_VALUE(MAX_INT_RNIL) },
STR_VALUE(MAX_PORT_NO) },
{
CFG_TCP_SEND_BUFFER_SIZE,
@ -1770,7 +1771,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"0",
STR_VALUE(MAX_INT_RNIL) },
STR_VALUE(MAX_PORT_NO) },
{
CFG_SHM_SIGNUM,
@ -1992,7 +1993,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"0",
STR_VALUE(MAX_INT_RNIL) },
STR_VALUE(MAX_PORT_NO) },
{
CFG_SCI_HOST1_ID_0,

View file

@ -1236,9 +1236,14 @@ NdbDictionary::Datafile::getTablespace() const {
return m_impl.m_filegroup_name.c_str();
}
Uint32
NdbDictionary::Datafile::getTablespaceId() const {
return m_impl.m_filegroup_id;
void
NdbDictionary::Datafile::getTablespaceId(NdbDictionary::ObjectId* dst) const
{
if (dst)
{
NdbDictObjectImpl::getImpl(* dst).m_id = m_impl.m_filegroup_id;
NdbDictObjectImpl::getImpl(* dst).m_version = m_impl.m_filegroup_version;
}
}
NdbDictionary::Object::Status
@ -1322,9 +1327,14 @@ NdbDictionary::Undofile::getLogfileGroup() const {
return m_impl.m_filegroup_name.c_str();
}
Uint32
NdbDictionary::Undofile::getLogfileGroupId() const {
return m_impl.m_filegroup_id;
void
NdbDictionary::Undofile::getLogfileGroupId(NdbDictionary::ObjectId * dst)const
{
if (dst)
{
NdbDictObjectImpl::getImpl(* dst).m_id = m_impl.m_filegroup_id;
NdbDictObjectImpl::getImpl(* dst).m_version = m_impl.m_filegroup_version;
}
}
NdbDictionary::Object::Status
@ -1841,7 +1851,8 @@ NdbDictionary::Dictionary::createLogfileGroup(const LogfileGroup & lg,
ObjectId * obj)
{
return m_impl.createLogfileGroup(NdbLogfileGroupImpl::getImpl(lg),
obj ? &obj->m_impl : 0);
obj ?
& NdbDictObjectImpl::getImpl(* obj) : 0);
}
int
@ -1864,7 +1875,8 @@ NdbDictionary::Dictionary::createTablespace(const Tablespace & lg,
ObjectId * obj)
{
return m_impl.createTablespace(NdbTablespaceImpl::getImpl(lg),
obj ? &obj->m_impl : 0);
obj ?
& NdbDictObjectImpl::getImpl(* obj) : 0);
}
int
@ -1899,7 +1911,7 @@ NdbDictionary::Dictionary::createDatafile(const Datafile & df,
{
return m_impl.createDatafile(NdbDatafileImpl::getImpl(df),
force,
obj ? &obj->m_impl : 0);
obj ? & NdbDictObjectImpl::getImpl(* obj) : 0);
}
int
@ -1925,7 +1937,7 @@ NdbDictionary::Dictionary::createUndofile(const Undofile & df,
{
return m_impl.createUndofile(NdbUndofileImpl::getImpl(df),
force,
obj ? &obj->m_impl : 0);
obj ? & NdbDictObjectImpl::getImpl(* obj) : 0);
}
int

View file

@ -46,14 +46,22 @@ public:
NdbDictionary::Object::Status m_status;
bool change();
static NdbDictObjectImpl & getImpl(NdbDictionary::ObjectId & t) {
return t.m_impl;
}
static const NdbDictObjectImpl & getImpl(const NdbDictionary::ObjectId & t){
return t.m_impl;
}
protected:
friend class NdbDictionary::ObjectId;
NdbDictObjectImpl(NdbDictionary::Object::Type type) :
m_type(type),
m_status(NdbDictionary::Object::New) {
m_id = -1;
}
friend class NdbDictionary::ObjectId;
};
/**

View file

@ -121,7 +121,15 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
* batch.
*/
batch_byte_size= max_batch_byte_size;
if (batch_size == 0)
{
batch_byte_size= max_batch_byte_size;
}
else
{
batch_byte_size= batch_size * tot_size;
}
if (batch_byte_size * parallelism > max_scan_batch_size) {
batch_byte_size= max_scan_batch_size / parallelism;
}

View file

@ -119,7 +119,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
int
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 scan_flags,
Uint32 parallel)
Uint32 parallel,
Uint32 batch)
{
m_ordered = m_descending = false;
Uint32 fragCount = m_currentTable->m_fragmentCount;
@ -191,8 +192,11 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
tupScan = false;
}
theParallelism = parallel;
if (rangeScan && (scan_flags & SF_OrderBy))
parallel = fragCount;
theParallelism = parallel;
if(fix_receivers(parallel) == -1){
setErrorCodeAbort(4000);
return -1;
@ -211,6 +215,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
req->tableSchemaVersion = m_accessTable->m_version;
req->storedProcId = 0xFFFF;
req->buddyConPtr = theNdbCon->theBuddyConPtr;
req->first_batch_size = batch; // Save user specified batch size
Uint32 reqInfo = 0;
ScanTabReq::setParallelism(reqInfo, parallel);
@ -768,13 +773,14 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
* The number of records sent by each LQH is calculated and the kernel
* is informed of this number by updating the SCAN_TABREQ signal
*/
Uint32 batch_size, batch_byte_size, first_batch_size;
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
Uint32 batch_size = req->first_batch_size; // User specified
Uint32 batch_byte_size, first_batch_size;
theReceiver.calculate_batch_size(key_size,
theParallelism,
batch_size,
batch_byte_size,
first_batch_size);
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
req->batch_byte_size= batch_byte_size;
req->first_batch_size= first_batch_size;
@ -1268,13 +1274,14 @@ NdbIndexScanOperation::getKeyFromSCANTABREQ(Uint32* data, Uint32 size)
int
NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 scan_flags,
Uint32 parallel)
Uint32 parallel,
Uint32 batch)
{
const bool order_by = scan_flags & SF_OrderBy;
const bool order_desc = scan_flags & SF_Descending;
const bool read_range_no = scan_flags & SF_ReadRangeNo;
int res = NdbScanOperation::readTuples(lm, scan_flags, 0);
int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch);
if(!res && read_range_no)
{
m_read_range_no = 1;
@ -1567,13 +1574,68 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
return -1;
}
bool holdLock = false;
if (theSCAN_TABREQ)
{
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
holdLock = ScanTabReq::getHoldLockFlag(req->requestInfo);
}
/**
* When using locks, force close of scan directly
*/
if (holdLock && theError.code == 0 &&
(m_sent_receivers_count + m_conf_receivers_count + m_api_receivers_count))
{
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
Uint32* theData = tSignal.getDataPtrSend();
Uint64 transId = theNdbCon->theTransactionId;
theData[0] = theNdbCon->theTCConPtr;
theData[1] = 1;
theData[2] = transId;
theData[3] = (Uint32) (transId >> 32);
tSignal.setLength(4);
int ret = tp->sendSignal(&tSignal, nodeId);
if (ret)
{
setErrorCode(4008);
return -1;
}
/**
* If no receiver is outstanding...
* set it to 1 as execCLOSE_SCAN_REP resets it
*/
m_sent_receivers_count = m_sent_receivers_count ? m_sent_receivers_count : 1;
while(theError.code == 0 && (m_sent_receivers_count + m_conf_receivers_count))
{
int return_code = poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId, forceSend);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
theNdbCon->theReleaseOnClose = true;
return -1;
}
}
return 0;
}
/**
* Wait for outstanding
*/
while(theError.code == 0 && m_sent_receivers_count)
{
int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
false);
int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId, forceSend);
switch(return_code){
case 0:
break;

View file

@ -420,6 +420,7 @@ ErrorBundle ErrorCodes[] = {
{ 1514, DMEC, SE, "Currently there is a limit of one logfile group" },
{ 773, DMEC, SE, "Out of string memory, please modify StringMemory config parameter" },
{ 775, DMEC, SE, "Create file is not supported when Diskless=1" },
/**
* FunctionNotImplemented

View file

@ -1151,70 +1151,76 @@ runScanVariants(NDBT_Context* ctx, NDBT_Step* step)
{
for(int flags = 0; flags < 4; flags++)
{
for (int par = 0; par < 16; par += 1 + (rand() % 3))
for (int batch = 0; batch < 100; batch += (1 + batch + (batch >> 3)))
{
bool disk = flags & 1;
bool tups = flags & 2;
g_info << "lm: " << lm
<< " disk: " << disk
<< " tup scan: " << tups
<< " par: " << par
<< endl;
NdbConnection* pCon = pNdb->startTransaction();
NdbScanOperation* pOp = pCon->getNdbScanOperation(pTab->getName());
if (pOp == NULL) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
if( pOp->readTuples((NdbOperation::LockMode)lm,
tups ? NdbScanOperation::SF_TupScan : 0,
par) != 0)
for (int par = 0; par < 16; par += 1 + (rand() % 3))
{
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
int check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
// Define attributes to read
bool found_disk = false;
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getStorageType() == NdbDictionary::Column::StorageTypeDisk)
{
found_disk = true;
if (!disk)
continue;
}
bool disk = flags & 1;
bool tups = flags & 2;
g_info << "lm: " << lm
<< " disk: " << disk
<< " tup scan: " << tups
<< " par: " << par
<< " batch: " << batch
<< endl;
if((pOp->getValue(pTab->getColumn(a)->getName())) == 0) {
NdbConnection* pCon = pNdb->startTransaction();
NdbScanOperation* pOp = pCon->getNdbScanOperation(pTab->getName());
if (pOp == NULL) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
}
if (! (disk && !found_disk))
{
check = pCon->execute(NoCommit);
if( pOp->readTuples((NdbOperation::LockMode)lm,
tups ? NdbScanOperation::SF_TupScan : 0,
par,
batch) != 0)
{
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
int check = pOp->interpret_exit_ok();
if( check == -1 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
int res;
int row = 0;
while((res = pOp->nextResult()) == 0);
// Define attributes to read
bool found_disk = false;
for(int a = 0; a<pTab->getNoOfColumns(); a++){
if (pTab->getColumn(a)->getStorageType() ==
NdbDictionary::Column::StorageTypeDisk)
{
found_disk = true;
if (!disk)
continue;
}
if((pOp->getValue(pTab->getColumn(a)->getName())) == 0) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
}
if (! (disk && !found_disk))
{
check = pCon->execute(NoCommit);
if( check == -1 ) {
ERR(pCon->getNdbError());
return NDBT_FAILED;
}
int res;
int row = 0;
while((res = pOp->nextResult()) == 0);
}
pCon->close();
}
pCon->close();
}
}
}
return NDBT_OK;
}

View file

@ -1559,6 +1559,56 @@ static int runCreateDropNR(NDBT_Context* ctx, NDBT_Step* step)
DBUG_RETURN(result);
}
static
int
runSubscribeUnsubscribe(NDBT_Context* ctx, NDBT_Step* step)
{
char buf[1024];
const NdbDictionary::Table & tab = * ctx->getTab();
sprintf(buf, "%s_EVENT", tab.getName());
Ndb* ndb = GETNDB(step);
int loops = 5 * ctx->getNumLoops();
while (--loops)
{
NdbEventOperation *pOp= ndb->createEventOperation(buf);
if (pOp == 0)
{
g_err << "createEventOperation: "
<< ndb->getNdbError().code << " "
<< ndb->getNdbError().message << endl;
return NDBT_FAILED;
}
int n_columns= tab.getNoOfColumns();
for (int j = 0; j < n_columns; j++)
{
pOp->getValue(tab.getColumn(j)->getName());
pOp->getPreValue(tab.getColumn(j)->getName());
}
if ( pOp->execute() )
{
g_err << "pOp->execute(): "
<< pOp->getNdbError().code << " "
<< pOp->getNdbError().message << endl;
ndb->dropEventOperation(pOp);
return NDBT_FAILED;
}
if (ndb->dropEventOperation(pOp))
{
g_err << "pOp->execute(): "
<< ndb->getNdbError().code << " "
<< ndb->getNdbError().message << endl;
return NDBT_FAILED;
}
}
return NDBT_OK;
}
NDBT_TESTSUITE(test_event);
TESTCASE("BasicEventOperation",
"Verify that we can listen to Events"
@ -1673,6 +1723,13 @@ TESTCASE("CreateDropNR",
"NOTE! No errors are allowed!" ){
FINALIZER(runCreateDropNR);
}
TESTCASE("SubscribeUnsubscribe",
"A bunch of threads doing subscribe/unsubscribe in loop"
"NOTE! No errors are allowed!" ){
INITIALIZER(runCreateEvent);
STEPS(runSubscribeUnsubscribe, 16);
FINALIZER(runDropEvent);
}
NDBT_TESTSUITE_END(test_event);
int main(int argc, const char** argv){

View file

@ -533,9 +533,11 @@ BackupRestore::object(Uint32 type, const void * ptr)
if (!m_no_restore_disk)
{
NdbDictionary::Datafile old(*(NdbDictionary::Datafile*)ptr);
NdbDictionary::Tablespace * ts = m_tablespaces[old.getTablespaceId()];
NdbDictionary::ObjectId objid;
old.getTablespaceId(&objid);
NdbDictionary::Tablespace * ts = m_tablespaces[objid.getObjectId()];
debug << "Connecting datafile " << old.getPath()
<< " to tablespace: oldid: " << old.getTablespaceId()
<< " to tablespace: oldid: " << objid.getObjectId()
<< " newid: " << ts->getObjectId() << endl;
old.setTablespace(* ts);
info << "Creating datafile \"" << old.getPath() << "\"..." << flush;
@ -554,10 +556,11 @@ BackupRestore::object(Uint32 type, const void * ptr)
if (!m_no_restore_disk)
{
NdbDictionary::Undofile old(*(NdbDictionary::Undofile*)ptr);
NdbDictionary::LogfileGroup * lg =
m_logfilegroups[old.getLogfileGroupId()];
NdbDictionary::ObjectId objid;
old.getLogfileGroupId(&objid);
NdbDictionary::LogfileGroup * lg = m_logfilegroups[objid.getObjectId()];
debug << "Connecting undofile " << old.getPath()
<< " to logfile group: oldid: " << old.getLogfileGroupId()
<< " to logfile group: oldid: " << objid.getObjectId()
<< " newid: " << lg->getObjectId()
<< " " << (void*)lg << endl;
old.setLogfileGroup(* lg);