diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp index bc24110ea14..0377cf0c84b 100644 --- a/ndb/src/ndbapi/TransporterFacade.cpp +++ b/ndb/src/ndbapi/TransporterFacade.cpp @@ -475,7 +475,8 @@ TransporterFacade::TransporterFacade() : theTransporterRegistry(0), theStopReceive(0), theSendThread(NULL), - theReceiveThread(NULL) + theReceiveThread(NULL), + m_fragmented_signal_id(0) { theOwnId = 0; @@ -833,6 +834,105 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ return (ss == SEND_OK ? 0 : -1); } +#define CHUNK_SZ 100u +int +TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, + LinearSectionPtr ptr[3], Uint32 secs){ + NdbApiSignal tmp_signal(*(SignalHeader*)aSignal); + LinearSectionPtr tmp_ptr[3]; + Uint32 unique_id= m_fragmented_signal_id++; // next unique id + unsigned i; + for (i= 0; i < secs; i++) + tmp_ptr[i]= ptr[i]; + + unsigned start_i= 0; + unsigned chunk_sz= 0; + unsigned fragment_info= 0; + Uint32 *tmp_data= tmp_signal.getDataPtrSend(); + for (i= 0; i < secs;) { + unsigned save_sz= tmp_ptr[i].sz; + tmp_data[i-start_i]= i; + if (chunk_sz + save_sz > CHUNK_SZ) { + // truncate + unsigned send_sz= CHUNK_SZ - chunk_sz; + tmp_ptr[i].sz= send_sz; + if (fragment_info < 2) + fragment_info++; + + // send tmp_signal + tmp_data[i-start_i+1]= unique_id; + tmp_signal.setLength(i-start_i+2); + tmp_signal.m_fragmentInfo= fragment_info; + // do prepare send + { + int ret; + if(getIsNodeSendable(aNode) == true){ + SendStatus ss = theTransporterRegistry->prepareSend + (&tmp_signal, + 1, // JBB + tmp_signal.getDataPtrSend(), + aNode, + &ptr[start_i]); + assert(ss != SEND_MESSAGE_TOO_BIG); + ret = (ss == SEND_OK ? 0 : -1); + } else + ret = -1; + if (ret != SEND_OK) + return ret; + } + + // setup variables for next signal + start_i= i; + chunk_sz= 0; + tmp_ptr[i].sz= save_sz-send_sz; + tmp_ptr[i].p+= send_sz; + } + else + { + chunk_sz+= save_sz; + i++; + } + } + + unsigned a_sz= aSignal->getLength(); + + if (fragment_info > 0) { + // update the original signal to include section info + Uint32 *a_data= aSignal->getDataPtrSend(); + unsigned tmp_sz= i-start_i; + memcpy(a_data+a_sz, + tmp_data, + tmp_sz*sizeof(Uint32)); + a_data[a_sz+tmp_sz]= unique_id; + aSignal->setLength(a_sz+tmp_sz+1); + + // send last fragment + aSignal->m_fragmentInfo= 3; + aSignal->m_noOfSections= i-start_i; + } else { + aSignal->m_noOfSections= secs; + } + + // send aSignal + int ret; + if(getIsNodeSendable(aNode) == true){ + SendStatus ss = theTransporterRegistry->prepareSend + (aSignal, + 1, // JBB + aSignal->getDataPtrSend(), + aNode, + &ptr[start_i]); + assert(ss != SEND_MESSAGE_TOO_BIG); + ret = (ss == SEND_OK ? 0 : -1); + } else + ret = -1; + aSignal->m_noOfSections = 0; + aSignal->m_fragmentInfo = 0; + aSignal->setLength(a_sz); + return ret; +} + +#if 0 int TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, LinearSectionPtr ptr[3], Uint32 secs){ @@ -864,7 +964,7 @@ TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, aSignal->m_noOfSections = 0; return -1; } - +#endif int @@ -886,11 +986,10 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal, aSignal->theSendersBlockRef = tmp; } #endif - SendStatus ss = theTransporterRegistry->prepareSend(aSignal, - 1, // JBB - aSignal->getDataPtrSend(), - aNode, - ptr); + SendStatus ss = + theTransporterRegistry->prepareSend(aSignal, 1, // JBB + aSignal->getDataPtrSend(), + aNode, ptr); assert(ss != SEND_MESSAGE_TOO_BIG); aSignal->m_noOfSections = 0; return (ss == SEND_OK ? 0 : -1); diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp index 5f473975585..b288e2ee8e6 100644 --- a/ndb/src/ndbapi/TransporterFacade.hpp +++ b/ndb/src/ndbapi/TransporterFacade.hpp @@ -224,7 +224,8 @@ private: } m_threads; Uint32 m_max_trans_id; - + Uint32 m_fragmented_signal_id; + /** * execute function */