From 230daf29c14d6e17cb22c6045c6ac4166cea9685 Mon Sep 17 00:00:00 2001 From: Vlad Lesin Date: Fri, 10 Jan 2025 19:17:06 +0300 Subject: [PATCH] MDEV-34877 Port "Bug #11745929 Change lock priority so that the transaction holding S-lock gets X-lock first" fix from MySQL to MariaDB Code review fixes. --- .../r/avoid_deadlock_with_blocked.result | 70 +++++++++++ .../innodb/t/avoid_deadlock_with_blocked.test | 111 ++++++++++++------ storage/innobase/include/hash0hash.h | 16 +-- storage/innobase/include/lock0lock.h | 4 +- storage/innobase/include/lock0types.h | 34 +++--- storage/innobase/lock/lock0lock.cc | 78 ++++++------ storage/innobase/lock/lock0prdt.cc | 16 +-- 7 files changed, 206 insertions(+), 123 deletions(-) diff --git a/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result b/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result index 880e5229d3c..65cc4a9714a 100644 --- a/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result +++ b/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result @@ -6,6 +6,14 @@ connect con3,localhost,root,,; connection default; CREATE TABLE t1 (id INT PRIMARY KEY) ENGINE=InnoDB STATS_PERSISTENT=0; INSERT INTO t1 (id) VALUES (1); +# Simplest scenario: +# , +# , , +# Before MDEV-34877: +# , , +# After MDEV-34877: +# , , +# Expected: instead of deadlocking, the con1's request should ingore con2's connection con1; BEGIN; SELECT * FROM t1 LOCK IN SHARE MODE; @@ -25,6 +33,44 @@ connection con2; id 1 COMMIT; +# The scenario when we bypass X<-S pair: +# , +# , , +# , , +# , , , +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +BEGIN; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con3; +SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; +BEGIN; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; +SELECT * FROM t1 LOCK IN SHARE MODE;; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; +SELECT * FROM t1 FOR UPDATE; +id +1 +COMMIT; +connection con2; +id +1 +COMMIT; +connection con3; +id +1 +COMMIT; +# A variant of the above scenario: +# , +# , , +# , , +# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them connection con1; BEGIN; SELECT * FROM t1 WHERE id=1 FOR UPDATE; @@ -41,6 +87,13 @@ ROLLBACK; connection con2; ERROR 40001: Deadlock found when trying to get lock; try restarting transaction COMMIT; +# More complicated scenario: +# , +# , , +# , , +# , , , +# , , +# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them connection con1; BEGIN; SELECT * FROM t1 LOCK IN SHARE MODE; @@ -66,6 +119,15 @@ connection con1; ROLLBACK; connection con3; ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +# More complicated scenario. +# , +# , , +# , , +# , , , +# Before MDEV-34877: +# , , +# After MDEV-34877: +# , , connection con1; BEGIN; SELECT * FROM t1 LOCK IN SHARE MODE; @@ -95,6 +157,14 @@ connection con3; id 1 COMMIT; +# A secenario, where con1 has to bypass two transactions: +# +# +# +# Before MDEV-34877: +# +# After MDEV-34877: +# connection con1; BEGIN; SELECT * FROM t1 LOCK IN SHARE MODE; diff --git a/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test b/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test index 00f7ece9de5..5a2d088137b 100644 --- a/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test +++ b/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test @@ -16,14 +16,15 @@ START TRANSACTION WITH CONSISTENT SNAPSHOT; --connection default CREATE TABLE t1 (id INT PRIMARY KEY) ENGINE=InnoDB STATS_PERSISTENT=0; INSERT INTO t1 (id) VALUES (1); -# Simplest scenario: -# , -# , , -# Before MDEV-34877: -# , , -# After MDEV-34877: -# , , -# Expected: instead of deadlocking, the con1's request should ingore con2's + +--echo # Simplest scenario: +--echo # , +--echo # , , +--echo # Before MDEV-34877: +--echo # , , +--echo # After MDEV-34877: +--echo # , , +--echo # Expected: instead of deadlocking, the con1's request should ingore con2's --connection con1 BEGIN; @@ -43,11 +44,46 @@ INSERT INTO t1 (id) VALUES (1); --reap COMMIT; -# A variant of the above scenario: -# , -# , , -# , , -# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them +--echo # The scenario when we bypass X<-S pair: +--echo # , +--echo # , , +--echo # , , +--echo # , , , + +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + BEGIN; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con3 + SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; + BEGIN; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; + --send SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; + SELECT * FROM t1 FOR UPDATE; + COMMIT; + +--connection con2 + --reap + COMMIT; + +--connection con3 + --reap + COMMIT; + +# +--echo # A variant of the above scenario: +--echo # , +--echo # , , +--echo # , , +--echo # Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them --connection con1 BEGIN; SELECT * FROM t1 WHERE id=1 FOR UPDATE; @@ -67,13 +103,13 @@ INSERT INTO t1 (id) VALUES (1); --reap COMMIT; -# More complicated scenario: -# , -# , , -# , , -# , , , -# , , -# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them +--echo # More complicated scenario: +--echo # , +--echo # , , +--echo # , , +--echo # , , , +--echo # , , +--echo # Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them --connection con1 BEGIN; @@ -102,20 +138,19 @@ INSERT INTO t1 (id) VALUES (1); --reap ROLLBACK; - --connection con3 --error ER_LOCK_DEADLOCK --reap -# More complicated scenario. -# , -# , , -# , , -# , , , -# Before MDEV-34877: -# , , -# After MDEV-34877: -# , , +--echo # More complicated scenario. +--echo # , +--echo # , , +--echo # , , +--echo # , , , +--echo # Before MDEV-34877: +--echo # , , +--echo # After MDEV-34877: +--echo # , , --connection con1 @@ -149,14 +184,14 @@ INSERT INTO t1 (id) VALUES (1); --reap COMMIT; -# A secenario, where con1 has to bypass two transactions: -# -# -# -# Before MDEV-34877: -# -# After MDEV-34877: -# +--echo # A secenario, where con1 has to bypass two transactions: +--echo # +--echo # +--echo # +--echo # Before MDEV-34877: +--echo # +--echo # After MDEV-34877: +--echo # --connection con1 BEGIN; SELECT * FROM t1 LOCK IN SHARE MODE; diff --git a/storage/innobase/include/hash0hash.h b/storage/innobase/include/hash0hash.h index c8534024593..4fc21a90f1f 100644 --- a/storage/innobase/include/hash0hash.h +++ b/storage/innobase/include/hash0hash.h @@ -107,25 +107,11 @@ public: @param element the being-removed element @param next the next-element pointer in T */ template - void remove(T &element, T *T::*next) noexcept + void remove(const T &element, T *T::*next) noexcept { remove(search(next, [&element](const T *p){return p==&element;}), next); } - /** Delete an element. - @tparam T type of the element - @param remove the being-removed element - @param next the next-element pointer in T */ - template - void remove(const T &remove, T *T::*next) - { - T *prev; - for (prev= static_cast(node); prev && prev->*next != &remove; - prev= prev->*next); - ut_a(prev); - prev->*next= remove.*next; - } - /** Insert an element after another. @tparam T type of the element @param after the element after which to insert diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h index b1baf3b6d09..182df448be6 100644 --- a/storage/innobase/include/lock0lock.h +++ b/storage/innobase/include/lock0lock.h @@ -60,7 +60,7 @@ struct conflicting_lock_info { lock must be inserted into linked list of locks for the certain cell of record locks hash table. */ lock_t *insert_after; - /** Bypassed lock */ + /** Fisrst bypassed lock */ ut_d(const lock_t *bypassed;) }; @@ -1180,7 +1180,7 @@ without checking for deadlocks or conflicts. @param[in] holds_trx_mutex whether the caller holds trx->mutex @return created lock */ lock_t* -lock_rec_create_low( +lock_rec_create( const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t page_id, diff --git a/storage/innobase/include/lock0types.h b/storage/innobase/include/lock0types.h index 8e4ceeee3a6..cf07b818dcf 100644 --- a/storage/innobase/include/lock0types.h +++ b/storage/innobase/include/lock0types.h @@ -232,32 +232,36 @@ struct ib_lock_t return(static_cast(type_mode & LOCK_MODE_MASK)); } - bool is_rec_granted_exclusive_not_gap() const + static bool is_rec_exclusive_not_gap(unsigned type_mode) { + ut_ad(!(type_mode & LOCK_INSERT_INTENTION) || (type_mode & LOCK_GAP)); return (type_mode & (LOCK_MODE_MASK | LOCK_GAP)) == LOCK_X; } - static inline bool is_rec_granted_X_not_ii_gap(unsigned type_mode) + bool is_rec_exclusive_not_gap() const { - return (type_mode & (LOCK_INSERT_INTENTION | LOCK_GAP | - LOCK_MODE_MASK)) == LOCK_X; - } - - bool is_rec_granted_X_not_ii_gap() const { - return is_rec_granted_X_not_ii_gap(type_mode); + ut_ad(!is_table()); + return is_rec_exclusive_not_gap(type_mode); } /** Checks if a lock suits for bypassing. @param blocking_trx transaction for which the lock is checked - @param has_s_lock_or_stronger if the transaction already holds not gap - and not insert intention S-lock or - stronger for the same heap_no as the - current lock @return true if lock suits, false otherwise */ - inline bool can_be_bypassed(const trx_t *blocking_trx, - bool has_s_lock_or_stronger) const; + inline bool can_be_bypassed(bool has_s_lock_or_stronger) const + { + /* We don't neet do check supremum bit in the lock's bitmap here, + because the function is always called after checking for + bypasse_mode, which already contains check for supremum. */ + ut_ad(!is_insert_intention() || is_gap()); + /* We don't need to check + trx->lock.wait_trx == blocking_trx && mode() == LOCK_X + condition here because there can be the following case: + S1 X2(waits for S1) S3(waits for X2), + bypassing X1 must not conflict with S3. */ + return has_s_lock_or_stronger && is_waiting() && !is_gap(); + } - /** Print the lock object into the given output stream. + /** Print the lock object into the given output stream. @param[in,out] out the output stream @return the given output stream. */ std::ostream& print(std::ostream& out) const; diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 1006ea85d10..f91fce0793b 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -1215,18 +1215,12 @@ func_exit: } #endif /* WITH_WSREP */ -inline bool lock_t::can_be_bypassed(const trx_t *blocking_trx, - bool has_s_lock_or_stronger) const +static inline bool lock_rec_can_be_bypassing(const trx_t *trx, + const lock_t *lock) { - /* There is no need to lock lock_sys.wait_mutex to check - trx->lock.wait_trx here because the current function is executed under - the cell latch, and trx->lock.wait_trx transaction can change wait_trx - field only under the cell latch, wait_trx trx_t object can not be - deinitialized before releasing all its locks, and during releasing the - locks the cell latch will also be requested. So while the cell latch - is held, lock->trx->lock.wait_trx can't be changed. */ - return has_s_lock_or_stronger && is_waiting() && - trx->lock.wait_trx == blocking_trx && is_rec_granted_X_not_ii_gap(); + ut_ad(!lock->is_insert_intention() || lock->is_gap()); + return lock->trx == trx && !(lock->type_mode & (LOCK_WAIT | LOCK_GAP)) && + lock_mode_stronger_or_eq(lock->mode(), LOCK_S); } /** Checks if some other transaction has a conflicting explicit lock request @@ -1246,7 +1240,7 @@ lock_rec_other_has_conflicting(unsigned mode, const hash_cell_t &cell, const trx_t *trx) { bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); - bool bypass_mode= !is_supremum && lock_t::is_rec_granted_X_not_ii_gap(mode); + bool bypass_mode= !is_supremum && lock_t::is_rec_exclusive_not_gap(mode); bool has_s_lock_or_stronger= false; const lock_t *insert_after= nullptr; ut_d(const lock_t *bypassed= nullptr;) @@ -1255,15 +1249,13 @@ lock_rec_other_has_conflicting(unsigned mode, const hash_cell_t &cell, for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock; lock= lock_rec_get_next(heap_no, lock)) { - if (bypass_mode && lock->trx == trx && !lock->is_gap() && - !lock->is_waiting() && !lock->is_insert_intention() && - lock_mode_stronger_or_eq(lock->mode(), LOCK_S)) + if (bypass_mode && lock_rec_can_be_bypassing(trx, lock)) { has_s_lock_or_stronger= true; } else if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) { - if (!bypass_mode || !lock->can_be_bypassed(trx, has_s_lock_or_stronger)) + if (!bypass_mode || !lock->can_be_bypassed(has_s_lock_or_stronger)) return {lock, nullptr, ut_d(nullptr)}; /* Store the first lock to bypass to invoke lock_rec_find_similar_on_page() only for the locks which precede all @@ -1373,7 +1365,7 @@ static void lock_rec_queue_validate_bypass(const lock_t *checked_lock, auto mode = checked_lock->type_mode; const trx_t *trx = checked_lock->trx; bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); - if (is_supremum || !lock_t::is_rec_granted_X_not_ii_gap(mode)) + if (is_supremum || !lock_t::is_rec_exclusive_not_gap(mode)) return; const lock_t *has_s_lock_or_stronger= nullptr; const lock_t *bypassed= nullptr; @@ -1381,9 +1373,7 @@ static void lock_rec_queue_validate_bypass(const lock_t *checked_lock, for (lock_t *lock= lock_sys_t::get_first(*cell, page_id, heap_no); lock; lock= lock_rec_get_next(heap_no, lock)) { - if (lock->trx == trx && !lock->is_gap() && - !lock->is_waiting() && !lock->is_insert_intention() && - lock_mode_stronger_or_eq(lock->mode(), LOCK_S)) + if (lock_rec_can_be_bypassing(trx, lock)) { ut_ad(!bypassed || lock!=checked_lock); has_s_lock_or_stronger= lock; @@ -1391,7 +1381,7 @@ static void lock_rec_queue_validate_bypass(const lock_t *checked_lock, } if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) { - if (!lock->can_be_bypassed(trx, has_s_lock_or_stronger)) + if (!lock->can_be_bypassed(has_s_lock_or_stronger)) return; bypassed = lock; } @@ -1457,7 +1447,7 @@ without checking for deadlocks or conflicts. @param[in] holds_trx_mutex whether the caller holds trx->mutex @return created lock */ lock_t* -lock_rec_create_low( +lock_rec_create( const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t page_id, @@ -1638,7 +1628,7 @@ lock_rec_enqueue_waiting( /* Enqueue the lock request that will wait to be granted, note that we already own the trx mutex. */ - lock_t* lock = lock_rec_create_low( + lock_t* lock = lock_rec_create( c_lock_info, type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true); @@ -1772,22 +1762,19 @@ static void lock_rec_add_to_queue(const conflicting_lock_info &c_lock_info, goto create; } else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) { bool bypass_mode= !is_supremum - && lock_t::is_rec_granted_X_not_ii_gap(type_mode); + && lock_t::is_rec_exclusive_not_gap(type_mode); bool has_s_lock_or_stronger= false; for (lock_t* lock = first_lock;;) { if (!lock_rec_get_nth_bit(lock, heap_no)) goto cont; - if (bypass_mode && lock->trx == trx && !lock->is_gap() - && !lock->is_waiting() - && !lock->is_insert_intention() - && lock_mode_stronger_or_eq(lock->mode(), LOCK_S)) + if (bypass_mode && lock_rec_can_be_bypassing(trx, lock)) { has_s_lock_or_stronger= true; } /* There can be several locks suited for bypassing, skip them all */ else if (lock->is_waiting() && - (!bypass_mode || !lock->can_be_bypassed(trx, + (!bypass_mode || !lock->can_be_bypassed( has_s_lock_or_stronger))) goto create; cont: @@ -1828,9 +1815,9 @@ create: because we should be moving an existing waiting lock request. */ ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx); - lock_rec_create_low(c_lock_info, - type_mode, id, page, heap_no, index, trx, - caller_owns_trx_mutex); + lock_rec_create(c_lock_info, + type_mode, id, page, heap_no, index, trx, + caller_owns_trx_mutex); } /** A helper function for lock_rec_lock_slow(), which grants a Next Key Lock @@ -2015,8 +2002,8 @@ lock_rec_lock( /* Simplified and faster path for the most common cases */ if (!impl) - lock_rec_create_low(null_c_lock_info, mode, id, block->page.frame, heap_no, - index, trx, false); + lock_rec_create(null_c_lock_info, mode, id, block->page.frame, heap_no, + index, trx, false); return DB_SUCCESS_LOCKED_REC; } @@ -2039,7 +2026,7 @@ lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock) heap_no= lock_rec_find_set_bit(wait_lock); bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); bool bypass_mode= - !is_supremum && wait_lock->is_rec_granted_X_not_ii_gap(); + !is_supremum && wait_lock->is_rec_exclusive_not_gap(); bool has_s_lock_or_stronger= false; const lock_t *insert_after= nullptr; ut_d(const lock_t *bypassed= nullptr); @@ -2059,19 +2046,15 @@ lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock) const byte *p= (const byte *) &lock[1]; if (heap_no >= lock_rec_get_n_bits(lock) || !(p[bit_offset] & bit_mask)) continue; - if (bypass_mode && lock->trx == trx && !lock->is_gap() && - !lock->is_waiting() && !lock->is_insert_intention() && - lock_mode_stronger_or_eq(lock->mode(), LOCK_S)) + if (bypass_mode && lock_rec_can_be_bypassing(trx, lock)) { has_s_lock_or_stronger= true; } else if (lock_has_to_wait(wait_lock, lock)) { - if (!bypass_mode || !lock->can_be_bypassed(trx, has_s_lock_or_stronger)) + if (!bypass_mode || !lock->can_be_bypassed(has_s_lock_or_stronger)) return {lock, nullptr, ut_d(nullptr)}; - /* Store the first lock to bypass to invoke - lock_rec_find_similar_on_page() only for the locks which precede all - bypassed locks. */ + /* Store only the first lock to bypass. */ ut_d(if (!bypassed) bypassed= lock;) /* There can be several locks to bypass, insert bypassing lock just @@ -2839,7 +2822,8 @@ lock_rec_move( ut_ad(!lock_sys_t::get_first(donator_cell, donator_id, donator_heap_no)); - ut_d(lock_rec_queue_validate_bypass(lock_sys_t::get_first(receiver_cell, + ut_d(lock_rec_queue_validate_bypass(lock_sys_t::get_first( + receiver_cell, receiver_id, receiver_heap_no), receiver_heap_no)); } @@ -4915,8 +4899,10 @@ reiterate: { ut_ad(!lock->index->table->is_temporary()); bool supremum_bit= lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM); + /* if XA is being prepared, it must not own waiting locks */ + ut_ad(!lock->is_waiting()); bool rec_granted_exclusive_not_gap= - lock->is_rec_granted_exclusive_not_gap(); + lock->is_rec_exclusive_not_gap(); if (UNIV_UNLIKELY(lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE))) continue; /* SPATIAL INDEX locking is broken. */ const auto fold = lock->un_member.rec_lock.page_id.fold(); @@ -5089,7 +5075,9 @@ reiterate: if (!lock->is_table()) { ut_ad(!lock->index->table->is_temporary()); - if (!lock->is_rec_granted_exclusive_not_gap()) + /* if XA is being prepared, it must not own waiting locks */ + ut_ad(!lock->is_waiting()); + if (!lock->is_rec_exclusive_not_gap()) lock_rec_dequeue_from_page(lock, false); else if (UNIV_UNLIKELY(lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE))) diff --git a/storage/innobase/lock/lock0prdt.cc b/storage/innobase/lock/lock0prdt.cc index 4b99d43e48e..d955cf6a5ad 100644 --- a/storage/innobase/lock/lock0prdt.cc +++ b/storage/innobase/lock/lock0prdt.cc @@ -472,7 +472,7 @@ create: because we should be moving an existing waiting lock request. */ ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx); - lock_t* lock = lock_rec_create_low(null_c_lock_info, + lock_t* lock = lock_rec_create(null_c_lock_info, type_mode, block->page.id(), block->page.frame, PRDT_HEAPNO, index, trx, caller_owns_trx_mutex); @@ -738,10 +738,10 @@ lock_prdt_lock( lock_t* lock = lock_sys_t::get_first(g.cell(), id); if (lock == NULL) { - lock = lock_rec_create_low( - null_c_lock_info, - prdt_mode, block->page.id(), block->page.frame, - PRDT_HEAPNO, index, trx, FALSE); + lock = lock_rec_create(null_c_lock_info, + prdt_mode, block->page.id(), + block->page.frame, PRDT_HEAPNO, index, + trx, FALSE); status = LOCK_REC_SUCCESS_CREATED; } else { @@ -831,9 +831,9 @@ lock_place_prdt_page_lock( } if (lock == NULL) { - lock = lock_rec_create_low(null_c_lock_info, - mode, page_id, NULL, PRDT_HEAPNO, - index, trx, FALSE); + lock = lock_rec_create(null_c_lock_info, + mode, page_id, NULL, PRDT_HEAPNO, + index, trx, FALSE); #ifdef PRDT_DIAG printf("GIS_DIAGNOSTIC: page lock %d\n", (int) page_no);