diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp index fb2401bdd9e..b491e00ba65 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -1117,7 +1117,7 @@ NdbEventBuffer::nextEvent() m_available_data.remove_first(); // add it to used list - m_used_data.append(data); + m_used_data.append_used_data(data); #ifdef VM_TRACE op->m_data_done_count++; @@ -1144,6 +1144,10 @@ NdbEventBuffer::nextEvent() (void)tBlob->atNextEvent(); tBlob = tBlob->theNext; } + EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); + while (gci_ops && op->getGCI() > gci_ops->m_gci) + gci_ops = m_available_data.next_gci_ops(); + assert(gci_ops && (op->getGCI() == gci_ops->m_gci)); DBUG_RETURN_EVENT(op->m_facade); } // the next event belonged to an event op that is no @@ -1158,15 +1162,21 @@ NdbEventBuffer::nextEvent() #ifdef VM_TRACE m_latest_command= m_latest_command_save; #endif + + // free all "per gci unique" collected operations + EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); + while (gci_ops) + gci_ops = m_available_data.next_gci_ops(); DBUG_RETURN_EVENT(0); } NdbEventOperationImpl* NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types) { - if (*iter < m_available_data.m_gci_op_count) + EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops(); + if (*iter < gci_ops->m_gci_op_count) { - EventBufData_list::Gci_op g = m_available_data.m_gci_op_list[(*iter)++]; + EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++]; if (event_types != NULL) *event_types = g.event_types; return g.op; @@ -1318,7 +1328,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep) #ifdef VM_TRACE assert(bucket->m_data.m_count); #endif - m_complete_data.m_data.append(bucket->m_data); + m_complete_data.m_data.append_list(&bucket->m_data, gci); } reportStatus(); bzero(bucket, sizeof(Gci_container)); @@ -1389,7 +1399,7 @@ NdbEventBuffer::complete_outof_order_gcis() #ifdef VM_TRACE assert(bucket->m_data.m_count); #endif - m_complete_data.m_data.append(bucket->m_data); + m_complete_data.m_data.append_list(&bucket->m_data, start_gci); #ifdef VM_TRACE ndbout_c(" moved %lld rows -> %lld", bucket->m_data.m_count, m_complete_data.m_data.m_count); @@ -1599,7 +1609,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, data->m_event_op = op; if (! is_blob_event || ! is_data_event) { - bucket->m_data.append(data); + bucket->m_data.append_data(data); } else { @@ -1615,7 +1625,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, if (ret != 0) // main event was created { main_data->m_event_op = op->theMainOp; - bucket->m_data.append(main_data); + bucket->m_data.append_data(main_data); if (use_hash) { main_data->m_pkhash = main_hpos.pkhash; @@ -2097,7 +2107,7 @@ NdbEventBuffer::move_data() if (!m_complete_data.m_data.is_empty()) { // move this list to last in m_available_data - m_available_data.append(m_complete_data.m_data); + m_available_data.append_list(&m_complete_data.m_data, 0); bzero(&m_complete_data, sizeof(m_complete_data)); } @@ -2160,6 +2170,19 @@ NdbEventBuffer::free_list(EventBufData_list &list) list.m_count = list.m_sz = 0; } +void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci) +{ + move_gci_ops(list, gci); + + if (m_tail) + m_tail->m_next= list->m_head; + else + m_head= list->m_head; + m_tail= list->m_tail; + m_count+= list->m_count; + m_sz+= list->m_sz; +} + void EventBufData_list::add_gci_op(Gci_op g) { @@ -2188,6 +2211,44 @@ EventBufData_list::add_gci_op(Gci_op g) } } +void +EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci) +{ + assert(!m_is_not_multi_list); + if (!list->m_is_not_multi_list) + { + assert(gci == 0); + if (m_gci_ops_list_tail) + m_gci_ops_list_tail->m_next = list->m_gci_ops_list; + else + { + m_gci_ops_list = list->m_gci_ops_list; + } + m_gci_ops_list_tail = list->m_gci_ops_list_tail; + goto end; + } + { + Gci_ops *new_gci_ops = new Gci_ops; + if (m_gci_ops_list_tail) + m_gci_ops_list_tail->m_next = new_gci_ops; + else + { + assert(m_gci_ops_list == 0); + m_gci_ops_list = new_gci_ops; + } + m_gci_ops_list_tail = new_gci_ops; + + new_gci_ops->m_gci_op_list = list->m_gci_op_list; + new_gci_ops->m_gci_op_count = list->m_gci_op_count; + new_gci_ops->m_gci = gci; + new_gci_ops->m_next = 0; + } +end: + list->m_gci_op_list = 0; + list->m_gci_ops_list_tail = 0; + list->m_gci_op_alloc = 0; +} + NdbEventOperation* NdbEventBuffer::createEventOperation(const char* eventName, NdbError &theError) diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp index 3e7d313fe02..9ce85a61361 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -68,8 +68,12 @@ public: ~EventBufData_list(); void remove_first(); - void append(EventBufData *data); - void append(const EventBufData_list &list); + // append data and insert data into Gci_op list with add_gci_op + void append_data(EventBufData *data); + // append data and insert data but ignore Gci_op list + void append_used_data(EventBufData *data); + // append list to another, will call move_gci_ops + void append_list(EventBufData_list *list, Uint64 gci); int is_empty(); @@ -77,13 +81,60 @@ public: unsigned m_count; unsigned m_sz; - // distinct ops per gci (assume no hash needed) - struct Gci_op { NdbEventOperationImpl* op; Uint32 event_types; }; - Gci_op* m_gci_op_list; - Uint32 m_gci_op_count; - Uint32 m_gci_op_alloc; + /* + distinct ops per gci (assume no hash needed) + + list may be in 2 versions + + 1. single list with on gci only + - one linear array + Gci_op *m_gci_op_list; + Uint32 m_gci_op_count; + Uint32 m_gci_op_alloc != 0; + + 2. multi list with several gcis + - linked list of gci's + - one linear array per gci + Gci_ops *m_gci_ops_list; + Gci_ops *m_gci_ops_list_tail; + Uint32 m_is_not_multi_list == 0; + + */ + struct Gci_op // 1 + 2 + { + NdbEventOperationImpl* op; + Uint32 event_types; + }; + struct Gci_ops // 2 + { + Uint64 m_gci; + Gci_op *m_gci_op_list; + Gci_ops *m_next; + Uint32 m_gci_op_count; + }; + union + { + Gci_op *m_gci_op_list; // 1 + Gci_ops *m_gci_ops_list; // 2 + }; + union + { + Uint32 m_gci_op_count; // 1 + Gci_ops *m_gci_ops_list_tail;// 2 + }; + union + { + Uint32 m_gci_op_alloc; // 1 + Uint32 m_is_not_multi_list; // 2 + }; + Gci_ops *first_gci_ops(); + Gci_ops *next_gci_ops(); private: + // case 1 above; add Gci_op to single list void add_gci_op(Gci_op g); + // case 2 above; move single list or multi list from + // one list to another + void move_gci_ops(EventBufData_list *list, Uint64 gci); }; inline @@ -92,7 +143,7 @@ EventBufData_list::EventBufData_list() m_count(0), m_sz(0), m_gci_op_list(NULL), - m_gci_op_count(0), + m_gci_ops_list_tail(0), m_gci_op_alloc(0) { } @@ -100,7 +151,14 @@ EventBufData_list::EventBufData_list() inline EventBufData_list::~EventBufData_list() { - delete [] m_gci_op_list; + if (m_is_not_multi_list) + delete [] m_gci_op_list; + else + { + Gci_ops *op = first_gci_ops(); + while (op) + op = next_gci_ops(); + } } inline @@ -120,11 +178,8 @@ void EventBufData_list::remove_first() } inline -void EventBufData_list::append(EventBufData *data) +void EventBufData_list::append_used_data(EventBufData *data) { - Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation }; - add_gci_op(g); - data->m_next= 0; if (m_tail) m_tail->m_next= data; @@ -143,19 +198,33 @@ void EventBufData_list::append(EventBufData *data) } inline -void EventBufData_list::append(const EventBufData_list &list) +void EventBufData_list::append_data(EventBufData *data) { - Uint32 i; - for (i = 0; i < list.m_gci_op_count; i++) - add_gci_op(list.m_gci_op_list[i]); + Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation }; + add_gci_op(g); - if (m_tail) - m_tail->m_next= list.m_head; - else - m_head= list.m_head; - m_tail= list.m_tail; - m_count+= list.m_count; - m_sz+= list.m_sz; + append_used_data(data); +} + +inline EventBufData_list::Gci_ops * +EventBufData_list::first_gci_ops() +{ + assert(!m_is_not_multi_list); + return m_gci_ops_list; +} + +inline EventBufData_list::Gci_ops * +EventBufData_list::next_gci_ops() +{ + assert(!m_is_not_multi_list); + Gci_ops *first = m_gci_ops_list; + m_gci_ops_list = first->m_next; + if (first->m_gci_op_list) + delete [] first->m_gci_op_list; + delete first; + if (m_gci_ops_list == 0) + m_gci_ops_list_tail = 0; + return m_gci_ops_list; } // GCI bucket has also a hash over data, with key event op, table PK.