mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-31 19:06:14 +01:00 
			
		
		
		
	 55db59f16d
			
		
	
	
	55db59f16d
	
	
	
		
			
			PFS_atomic class contains wrappers around my_atomic_* operations, which
are macros to GNU atomic operations (__atomic_*). Due to different
implementations of compilers, clang may encounter errors when compiling
on x86_32 architecture.
The following functions are replaced with C++ std::atomic type in
performance schema code base:
  - PFS_atomic::store_*()
      -> my_atomic_store*
        -> __atomic_store_n()
    => std::atomic<T>::store()
  - PFS_atomic::load_*()
      -> my_atomic_load*
        -> __atomic_load_n()
    => std::atomic<T>::load()
  - PFS_atomic::add_*()
      -> my_atomic_add*
        -> __atomic_fetch_add()
    => std::atomic<T>::fetch_add()
  - PFS_atomic::cas_*()
    -> my_atomic_cas*
      -> __atomic_compare_exchange_n()
    => std::atomic<T>::compare_exchange_strong()
and PFS_atomic class could be dropped completely.
Note that in the wrapper memory order passed to original GNU atomic
extensions are hard-coded as `__ATOMIC_SEQ_CST`, which is equivalent to
`std::memory_order_seq_cst` in C++, and is the default parameter for
std::atomic_* functions.
All new code of the whole pull request, including one or several files
that are either new files or modified ones, are contributed under the
BSD-new license. I am contributing on behalf of my employer Amazon Web
Services.
		
	
			
		
			
				
	
	
		
			1626 lines
		
	
	
	
		
			40 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1626 lines
		
	
	
	
		
			40 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (c) 2014, 2023, Oracle and/or its affiliates.
 | |
| 
 | |
|   This program is free software; you can redistribute it and/or modify
 | |
|   it under the terms of the GNU General Public License, version 2.0,
 | |
|   as published by the Free Software Foundation.
 | |
| 
 | |
|   This program is also distributed with certain software (including
 | |
|   but not limited to OpenSSL) that is licensed under separate terms,
 | |
|   as designated in a particular file or component or in included license
 | |
|   documentation.  The authors of MySQL hereby grant you an additional
 | |
|   permission to link the program and your derivative works with the
 | |
|   separately licensed software that they have included with MySQL.
 | |
| 
 | |
|   This program is distributed in the hope that it will be useful,
 | |
|   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|   GNU General Public License, version 2.0, for more details.
 | |
| 
 | |
|   You should have received a copy of the GNU General Public License
 | |
|   along with this program; if not, write to the Free Software Foundation,
 | |
|   51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
 | |
| 
 | |
| #ifndef PFS_BUFFER_CONTAINER_H
 | |
| #define PFS_BUFFER_CONTAINER_H
 | |
| 
 | |
| #include "my_global.h"
 | |
| #include "pfs.h" // PSI_COUNT_VOLATILITY
 | |
| #include "pfs_lock.h"
 | |
| #include "pfs_instr.h"
 | |
| #include "pfs_setup_actor.h"
 | |
| #include "pfs_setup_object.h"
 | |
| #include "pfs_program.h"
 | |
| #include "pfs_prepared_stmt.h"
 | |
| #include "pfs_builtin_memory.h"
 | |
| 
 | |
| #define USE_SCALABLE
 | |
| 
 | |
| class PFS_opaque_container_page;
 | |
| class PFS_opaque_container;
 | |
| 
 | |
| struct PFS_builtin_memory_class;
 | |
| 
 | |
| template <class T>
 | |
| class PFS_buffer_const_iterator;
 | |
| 
 | |
| template <class T>
 | |
| class PFS_buffer_processor;
 | |
| 
 | |
| template <class T, class U, class V>
 | |
| class PFS_buffer_iterator;
 | |
| 
 | |
| template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT, class U, class V>
 | |
| class PFS_buffer_scalable_iterator;
 | |
| 
 | |
| template <class T>
 | |
| class PFS_buffer_default_array;
 | |
| 
 | |
| template <class T>
 | |
| class PFS_buffer_default_allocator;
 | |
| 
 | |
| template <class T, class U, class V>
 | |
| class PFS_buffer_container;
 | |
| 
 | |
| template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT, class U, class V>
 | |
| class PFS_buffer_scalable_container;
 | |
| 
 | |
| template <class B, int COUNT>
 | |
| class PFS_partitioned_buffer_scalable_iterator;
 | |
| 
 | |
| template <class B, int COUNT>
 | |
| class PFS_partitioned_buffer_scalable_container;
 | |
| 
 | |
| 
 | |
| template <class T>
 | |
| class PFS_buffer_default_array
 | |
| {
 | |
| public:
 | |
|   typedef T value_type;
 | |
| 
 | |
|   value_type *allocate(pfs_dirty_state *dirty_state)
 | |
|   {
 | |
|     uint index;
 | |
|     uint monotonic;
 | |
|     uint monotonic_max;
 | |
|     value_type *pfs;
 | |
| 
 | |
|     if (m_full)
 | |
|       return NULL;
 | |
| 
 | |
|     monotonic= m_monotonic.m_u32.fetch_add(1);
 | |
|     monotonic_max= monotonic + static_cast<uint>(m_max);
 | |
| 
 | |
|     while (monotonic < monotonic_max)
 | |
|     {
 | |
|       index= monotonic % m_max;
 | |
|       pfs= m_ptr + index;
 | |
| 
 | |
|       if (pfs->m_lock.free_to_dirty(dirty_state))
 | |
|       {
 | |
|         return pfs;
 | |
|       }
 | |
|       monotonic= m_monotonic.m_u32.fetch_add(1);
 | |
| 
 | |
|     }
 | |
| 
 | |
|     m_full= true;
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   void deallocate(value_type *pfs)
 | |
|   {
 | |
|     pfs->m_lock.allocated_to_free();
 | |
|     m_full= false;
 | |
|   }
 | |
| 
 | |
|   T* get_first()
 | |
|   {
 | |
|     return m_ptr;
 | |
|   }
 | |
| 
 | |
|   T* get_last()
 | |
|   {
 | |
|     return m_ptr + m_max;
 | |
|   }
 | |
| 
 | |
|   bool m_full;
 | |
|   PFS_cacheline_uint32 m_monotonic;
 | |
|   T * m_ptr;
 | |
|   size_t m_max;
 | |
|   /** Container. */
 | |
|   PFS_opaque_container *m_container;
 | |
| };
 | |
| 
 | |
| template <class T>
 | |
| class PFS_buffer_default_allocator
 | |
| {
 | |
| public:
 | |
|   typedef PFS_buffer_default_array<T> array_type;
 | |
| 
 | |
|   PFS_buffer_default_allocator(PFS_builtin_memory_class *klass)
 | |
|     : m_builtin_class(klass)
 | |
|   {}
 | |
| 
 | |
|   int alloc_array(array_type *array)
 | |
|   {
 | |
|     array->m_ptr= NULL;
 | |
|     array->m_full= true;
 | |
|     array->m_monotonic.m_u32= 0;
 | |
| 
 | |
|     if (array->m_max > 0)
 | |
|     {
 | |
|       array->m_ptr= PFS_MALLOC_ARRAY(m_builtin_class,
 | |
|                                      array->m_max, sizeof(T), T, MYF(MY_ZEROFILL));
 | |
|       if (array->m_ptr == NULL)
 | |
|         return 1;
 | |
|       array->m_full= false;
 | |
|     }
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   void free_array(array_type *array)
 | |
|   {
 | |
|     assert(array->m_max > 0);
 | |
| 
 | |
|     PFS_FREE_ARRAY(m_builtin_class,
 | |
|                    array->m_max, sizeof(T), array->m_ptr);
 | |
|     array->m_ptr= NULL;
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   PFS_builtin_memory_class *m_builtin_class;
 | |
| };
 | |
| 
 | |
| template <class T,
 | |
|           class U = PFS_buffer_default_array<T>,
 | |
|           class V = PFS_buffer_default_allocator<T> >
 | |
| class PFS_buffer_container
 | |
| {
 | |
| public:
 | |
|   friend class PFS_buffer_iterator<T, U, V>;
 | |
| 
 | |
|   typedef T value_type;
 | |
|   typedef U array_type;
 | |
|   typedef V allocator_type;
 | |
|   typedef PFS_buffer_const_iterator<T> const_iterator_type;
 | |
|   typedef PFS_buffer_iterator<T, U, V> iterator_type;
 | |
|   typedef PFS_buffer_processor<T> processor_type;
 | |
|   typedef void (*function_type)(value_type *);
 | |
| 
 | |
|   PFS_buffer_container(allocator_type *allocator)
 | |
|   {
 | |
|     m_array.m_full= true;
 | |
|     m_array.m_ptr= NULL;
 | |
|     m_array.m_max= 0;
 | |
|     m_array.m_monotonic.m_u32= 0;
 | |
|     m_lost= 0;
 | |
|     m_max= 0;
 | |
|     m_allocator= allocator;
 | |
|   }
 | |
| 
 | |
|   int init(ulong max_size)
 | |
|   {
 | |
|     if (max_size > 0)
 | |
|     {
 | |
|       m_array.m_max= max_size;
 | |
|       int rc= m_allocator->alloc_array(& m_array);
 | |
|       if (rc != 0)
 | |
|       {
 | |
|         m_allocator->free_array(& m_array);
 | |
|         return 1;
 | |
|       }
 | |
|       m_max= max_size;
 | |
|       m_array.m_full= false;
 | |
|     }
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   void cleanup()
 | |
|   {
 | |
|     m_allocator->free_array(& m_array);
 | |
|   }
 | |
| 
 | |
|   ulong get_row_count() const
 | |
|   {
 | |
|     return m_max;
 | |
|   }
 | |
| 
 | |
|   ulong get_row_size() const
 | |
|   {
 | |
|     return sizeof(value_type);
 | |
|   }
 | |
| 
 | |
|   ulong get_memory() const
 | |
|   {
 | |
|     return get_row_count() * get_row_size();
 | |
|   }
 | |
| 
 | |
|   value_type *allocate(pfs_dirty_state *dirty_state)
 | |
|   {
 | |
|     value_type *pfs;
 | |
| 
 | |
|     pfs= m_array.allocate(dirty_state, m_max);
 | |
|     if (pfs == NULL)
 | |
|     {
 | |
|       m_lost++;
 | |
|     }
 | |
| 
 | |
|     return pfs;
 | |
|   }
 | |
| 
 | |
|   void deallocate(value_type *pfs)
 | |
|   {
 | |
|     m_array.deallocate(pfs);
 | |
|   }
 | |
| 
 | |
|   iterator_type iterate()
 | |
|   {
 | |
|     return PFS_buffer_iterator<T, U, V>(this, 0);
 | |
|   }
 | |
| 
 | |
|   iterator_type iterate(uint index)
 | |
|   {
 | |
|     assert(index <= m_max);
 | |
|     return PFS_buffer_iterator<T, U, V>(this, index);
 | |
|   }
 | |
| 
 | |
|   void apply(function_type fct)
 | |
|   {
 | |
|     value_type *pfs= m_array.get_first();
 | |
|     value_type *pfs_last= m_array.get_last();
 | |
| 
 | |
|     while (pfs < pfs_last)
 | |
|     {
 | |
|       if (pfs->m_lock.is_populated())
 | |
|       {
 | |
|         fct(pfs);
 | |
|       }
 | |
|       pfs++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply_all(function_type fct)
 | |
|   {
 | |
|     value_type *pfs= m_array.get_first();
 | |
|     value_type *pfs_last= m_array.get_last();
 | |
| 
 | |
|     while (pfs < pfs_last)
 | |
|     {
 | |
|       fct(pfs);
 | |
|       pfs++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply(processor_type & proc)
 | |
|   {
 | |
|     value_type *pfs= m_array.get_first();
 | |
|     value_type *pfs_last= m_array.get_last();
 | |
| 
 | |
|     while (pfs < pfs_last)
 | |
|     {
 | |
|       if (pfs->m_lock.is_populated())
 | |
|       {
 | |
|         proc(pfs);
 | |
|       }
 | |
|       pfs++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply_all(processor_type & proc)
 | |
|   {
 | |
|     value_type *pfs= m_array.get_first();
 | |
|     value_type *pfs_last= m_array.get_last();
 | |
| 
 | |
|     while (pfs < pfs_last)
 | |
|     {
 | |
|       proc(pfs);
 | |
|       pfs++;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   inline value_type* get(uint index)
 | |
|   {
 | |
|     assert(index < m_max);
 | |
| 
 | |
|     value_type *pfs= m_array.m_ptr + index;
 | |
|     if (pfs->m_lock.is_populated())
 | |
|     {
 | |
|       return pfs;
 | |
|     }
 | |
| 
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   value_type* get(uint index, bool *has_more)
 | |
|   {
 | |
|     if (index >= m_max)
 | |
|     {
 | |
|       *has_more= false;
 | |
|       return NULL;
 | |
|     }
 | |
| 
 | |
|     *has_more= true;
 | |
|     return get(index);
 | |
|   }
 | |
| 
 | |
|   value_type *sanitize(value_type *unsafe)
 | |
|   {
 | |
|     intptr offset;
 | |
|     value_type *pfs= m_array.get_first();
 | |
|     value_type *pfs_last= m_array.get_last();
 | |
| 
 | |
|     if ((pfs <= unsafe) &&
 | |
|         (unsafe < pfs_last))
 | |
|     {
 | |
|       offset= ((intptr) unsafe - (intptr) pfs) % sizeof(value_type);
 | |
|       if (offset == 0)
 | |
|         return unsafe;
 | |
|     }
 | |
| 
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   ulong m_lost;
 | |
| 
 | |
| private:
 | |
|   value_type* scan_next(uint & index, uint * found_index)
 | |
|   {
 | |
|     assert(index <= m_max);
 | |
| 
 | |
|     value_type *pfs_first= m_array.get_first();
 | |
|     value_type *pfs= pfs_first + index;
 | |
|     value_type *pfs_last= m_array.get_last();
 | |
| 
 | |
|     while (pfs < pfs_last)
 | |
|     {
 | |
|       if (pfs->m_lock.is_populated())
 | |
|       {
 | |
|         uint found= pfs - pfs_first;
 | |
|         *found_index= found;
 | |
|         index= found + 1;
 | |
|         return pfs;
 | |
|       }
 | |
|       pfs++;
 | |
|     }
 | |
| 
 | |
|     index= m_max;
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   ulong m_max;
 | |
|   array_type m_array;
 | |
|   allocator_type *m_allocator;
 | |
| };
 | |
| 
 | |
| template <class T,
 | |
|           int PFS_PAGE_SIZE,
 | |
|           int PFS_PAGE_COUNT,
 | |
|           class U = PFS_buffer_default_array<T>,
 | |
|           class V = PFS_buffer_default_allocator<T> >
 | |
| class PFS_buffer_scalable_container
 | |
| {
 | |
| public:
 | |
|   friend class PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>;
 | |
| 
 | |
|   /**
 | |
|     Type of elements in the buffer.
 | |
|     The following attributes are required:
 | |
|     - pfs_lock m_lock
 | |
|     - PFS_opaque_container_page *m_page
 | |
|   */
 | |
|   typedef T value_type;
 | |
|   /**
 | |
|     Type of pages in the buffer.
 | |
|     The following attributes are required:
 | |
|     - PFS_opaque_container *m_container
 | |
|   */
 | |
|   typedef U array_type;
 | |
|   typedef V allocator_type;
 | |
|   /** This container type */
 | |
|   typedef PFS_buffer_scalable_container<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V> container_type;
 | |
|   typedef PFS_buffer_const_iterator<T> const_iterator_type;
 | |
|   typedef PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V> iterator_type;
 | |
|   typedef PFS_buffer_processor<T> processor_type;
 | |
|   typedef void (*function_type)(value_type *);
 | |
| 
 | |
|   static const size_t MAX_SIZE= PFS_PAGE_SIZE*PFS_PAGE_COUNT;
 | |
| 
 | |
|   PFS_buffer_scalable_container(allocator_type *allocator)
 | |
|   {
 | |
|     m_allocator= allocator;
 | |
|     m_initialized= false;
 | |
|     m_lost= 0;
 | |
|   }
 | |
| 
 | |
|   int init(long max_size)
 | |
|   {
 | |
|     int i;
 | |
| 
 | |
|     m_initialized= true;
 | |
|     m_full= true;
 | |
|     m_max= PFS_PAGE_COUNT * PFS_PAGE_SIZE;
 | |
|     m_max_page_count= PFS_PAGE_COUNT;
 | |
|     m_last_page_size= PFS_PAGE_SIZE;
 | |
|     m_lost= 0;
 | |
|     m_monotonic.m_u32= 0;
 | |
|     m_max_page_index.m_u32= 0;
 | |
| 
 | |
|     for (i=0 ; i < PFS_PAGE_COUNT; i++)
 | |
|     {
 | |
|       m_pages[i]= NULL;
 | |
|     }
 | |
| 
 | |
|     if (max_size == 0)
 | |
|     {
 | |
|       /* No allocation. */
 | |
|       m_max_page_count= 0;
 | |
|     }
 | |
|     else if (max_size > 0)
 | |
|     {
 | |
|       if (max_size % PFS_PAGE_SIZE == 0)
 | |
|       {
 | |
|         m_max_page_count= max_size / PFS_PAGE_SIZE;
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         m_max_page_count= max_size / PFS_PAGE_SIZE + 1;
 | |
|         m_last_page_size= max_size % PFS_PAGE_SIZE;
 | |
|       }
 | |
|       /* Bounded allocation. */
 | |
|       m_full= false;
 | |
| 
 | |
|       if (m_max_page_count > PFS_PAGE_COUNT)
 | |
|       {
 | |
|         m_max_page_count= PFS_PAGE_COUNT;
 | |
|         m_last_page_size= PFS_PAGE_SIZE;
 | |
|       }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       /* max_size = -1 means unbounded allocation */
 | |
|       m_full= false;
 | |
|     }
 | |
| 
 | |
|     assert(m_max_page_count <= PFS_PAGE_COUNT);
 | |
|     assert(0 < m_last_page_size);
 | |
|     assert(m_last_page_size <= PFS_PAGE_SIZE);
 | |
| 
 | |
|     pthread_mutex_init(& m_critical_section, NULL);
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   void cleanup()
 | |
|   {
 | |
|     int i;
 | |
|     array_type *page;
 | |
| 
 | |
|     if (! m_initialized)
 | |
|       return;
 | |
| 
 | |
|     pthread_mutex_lock(& m_critical_section);
 | |
| 
 | |
|     for (i=0 ; i < PFS_PAGE_COUNT; i++)
 | |
|     {
 | |
|       page= m_pages[i];
 | |
|       if (page != NULL)
 | |
|       {
 | |
|         m_allocator->free_array(page);
 | |
|         delete page;
 | |
|         m_pages[i]= NULL;
 | |
|       }
 | |
|     }
 | |
|     pthread_mutex_unlock(& m_critical_section);
 | |
| 
 | |
|     pthread_mutex_destroy(& m_critical_section);
 | |
| 
 | |
|     m_initialized= false;
 | |
|   }
 | |
| 
 | |
|   ulong get_row_count()
 | |
|   {
 | |
|     ulong page_count= m_max_page_index.m_u32.load();
 | |
| 
 | |
|     return page_count * PFS_PAGE_SIZE;
 | |
|   }
 | |
| 
 | |
|   ulong get_row_size() const
 | |
|   {
 | |
|     return sizeof(value_type);
 | |
|   }
 | |
| 
 | |
|   ulong get_memory()
 | |
|   {
 | |
|     return get_row_count() * get_row_size();
 | |
|   }
 | |
| 
 | |
|   value_type *allocate(pfs_dirty_state *dirty_state)
 | |
|   {
 | |
|     if (m_full)
 | |
|     {
 | |
|       m_lost++;
 | |
|       return NULL;
 | |
|     }
 | |
| 
 | |
|     uint index;
 | |
|     uint monotonic;
 | |
|     uint monotonic_max;
 | |
|     uint current_page_count;
 | |
|     value_type *pfs;
 | |
|     array_type *array;
 | |
| 
 | |
|     void *addr;
 | |
|     void * volatile * typed_addr;
 | |
|     void *ptr;
 | |
| 
 | |
|     /*
 | |
|       1: Try to find an available record within the existing pages
 | |
|     */
 | |
|     current_page_count= m_max_page_index.m_u32.load();
 | |
| 
 | |
|     if (current_page_count != 0)
 | |
|     {
 | |
|       monotonic= m_monotonic.m_u32.load();
 | |
|       monotonic_max= monotonic + current_page_count;
 | |
| 
 | |
|       while (monotonic < monotonic_max)
 | |
|       {
 | |
|         /*
 | |
|           Scan in the [0 .. current_page_count - 1] range,
 | |
|           in parallel with m_monotonic (see below)
 | |
|         */
 | |
|         index= monotonic % current_page_count;
 | |
| 
 | |
|         /* Atomic Load, array= m_pages[index] */
 | |
|         addr= & m_pages[index];
 | |
|         typed_addr= static_cast<void * volatile *>(addr);
 | |
|         ptr= my_atomic_loadptr(typed_addr);
 | |
|         array= static_cast<array_type *>(ptr);
 | |
| 
 | |
|         if (array != NULL)
 | |
|         {
 | |
|           pfs= array->allocate(dirty_state);
 | |
|           if (pfs != NULL)
 | |
|           {
 | |
|             /* Keep a pointer to the parent page, for deallocate(). */
 | |
|             pfs->m_page= reinterpret_cast<PFS_opaque_container_page *> (array);
 | |
|             return pfs;
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         /*
 | |
|           Parallel scans collaborate to increase
 | |
|           the common monotonic scan counter.
 | |
| 
 | |
|           Note that when all the existing page are full,
 | |
|           one thread will eventually add a new page,
 | |
|           and cause m_max_page_index to increase,
 | |
|           which fools all the modulo logic for scans already in progress,
 | |
|           because the monotonic counter is not folded to the same place
 | |
|           (sometime modulo N, sometime modulo N+1).
 | |
| 
 | |
|           This is actually ok: since all the pages are full anyway,
 | |
|           there is nothing to miss, so better increase the monotonic
 | |
|           counter faster and then move on to the detection of new pages,
 | |
|           in part 2: below.
 | |
|         */
 | |
|         monotonic= m_monotonic.m_u32.fetch_add(1);
 | |
|       };
 | |
|     }
 | |
| 
 | |
|     /*
 | |
|       2: Try to add a new page, beyond the m_max_page_index limit
 | |
|     */
 | |
|     while (current_page_count < m_max_page_count)
 | |
|     {
 | |
|       /* Peek for pages added by collaborating threads */
 | |
| 
 | |
|       /* (2-a) Atomic Load, array= m_pages[current_page_count] */
 | |
|       addr= & m_pages[current_page_count];
 | |
|       typed_addr= static_cast<void * volatile *>(addr);
 | |
|       ptr= my_atomic_loadptr(typed_addr);
 | |
|       array= static_cast<array_type *>(ptr);
 | |
| 
 | |
|       if (array == NULL)
 | |
|       {
 | |
|         // ==================================================================
 | |
|         // BEGIN CRITICAL SECTION -- buffer expand
 | |
|         // ==================================================================
 | |
| 
 | |
|         /*
 | |
|           On a fresh started server, buffers are typically empty.
 | |
|           When a sudden load spike is seen by the server,
 | |
|           multiple threads may want to expand the buffer at the same time.
 | |
| 
 | |
|           Using a compare and swap to allow multiple pages to be added,
 | |
|           possibly freeing duplicate pages on collisions,
 | |
|           does not work well because the amount of code involved
 | |
|           when creating a new page can be significant (PFS_thread),
 | |
|           causing MANY collisions between (2-b) and (2-d).
 | |
| 
 | |
|           A huge number of collisions (which can happen when thousands
 | |
|           of new connections hits the server after a restart)
 | |
|           leads to a huge memory consumption, and to OOM.
 | |
| 
 | |
|           To mitigate this, we use here a mutex,
 | |
|           to enforce that only ONE page is added at a time,
 | |
|           so that scaling the buffer happens in a predictable
 | |
|           and controlled manner.
 | |
|         */
 | |
|         pthread_mutex_lock(& m_critical_section);
 | |
| 
 | |
|         /*
 | |
|           Peek again for pages added by collaborating threads,
 | |
|           this time as the only thread allowed to expand the buffer
 | |
|         */
 | |
| 
 | |
|         /* (2-b) Atomic Load, array= m_pages[current_page_count] */
 | |
| 
 | |
|         ptr= my_atomic_loadptr(typed_addr);
 | |
|         array= static_cast<array_type *>(ptr);
 | |
| 
 | |
|         if (array == NULL)
 | |
|         {
 | |
|           /* (2-c) Found no page, allocate a new one */
 | |
|           array= new array_type();
 | |
|           builtin_memory_scalable_buffer.count_alloc(sizeof (array_type));
 | |
| 
 | |
|           array->m_max= get_page_logical_size(current_page_count);
 | |
|           int rc= m_allocator->alloc_array(array);
 | |
|           if (rc != 0)
 | |
|           {
 | |
|             m_allocator->free_array(array);
 | |
|             delete array;
 | |
|             builtin_memory_scalable_buffer.count_free(sizeof (array_type));
 | |
|             m_lost++;
 | |
|             pthread_mutex_unlock(& m_critical_section);
 | |
|             return NULL;
 | |
|           }
 | |
| 
 | |
|           /* Keep a pointer to this container, for static_deallocate(). */
 | |
|           array->m_container= reinterpret_cast<PFS_opaque_container *> (this);
 | |
| 
 | |
|           /* (2-d) Atomic STORE, m_pages[current_page_count] = array  */
 | |
|           ptr= array;
 | |
|           my_atomic_storeptr(typed_addr, ptr);
 | |
| 
 | |
|           /* Advertise the new page */
 | |
|           m_max_page_index.m_u32.fetch_add(1);
 | |
|         }
 | |
| 
 | |
|         pthread_mutex_unlock(& m_critical_section);
 | |
| 
 | |
|         // ==================================================================
 | |
|         // END CRITICAL SECTION -- buffer expand
 | |
|         // ==================================================================
 | |
|       }
 | |
| 
 | |
|       assert(array != NULL);
 | |
|       pfs= array->allocate(dirty_state);
 | |
|       if (pfs != NULL)
 | |
|       {
 | |
|         /* Keep a pointer to the parent page, for deallocate(). */
 | |
|         pfs->m_page= reinterpret_cast<PFS_opaque_container_page *> (array);
 | |
|         return pfs;
 | |
|       }
 | |
| 
 | |
|       current_page_count++;
 | |
|     }
 | |
| 
 | |
|     m_lost++;
 | |
|     m_full= true;
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   void deallocate(value_type *safe_pfs)
 | |
|   {
 | |
|     /* Find the containing page */
 | |
|     PFS_opaque_container_page *opaque_page= safe_pfs->m_page;
 | |
|     array_type *page= reinterpret_cast<array_type *> (opaque_page);
 | |
| 
 | |
|     /* Mark the object free */
 | |
|     safe_pfs->m_lock.allocated_to_free();
 | |
| 
 | |
|     /* Flag the containing page as not full. */
 | |
|     page->m_full= false;
 | |
| 
 | |
|     /* Flag the overall container as not full. */
 | |
|     m_full= false;
 | |
|   }
 | |
| 
 | |
|   static void static_deallocate(value_type *safe_pfs)
 | |
|   {
 | |
|     /* Find the containing page */
 | |
|     PFS_opaque_container_page *opaque_page= safe_pfs->m_page;
 | |
|     array_type *page= reinterpret_cast<array_type *> (opaque_page);
 | |
| 
 | |
|     /* Mark the object free */
 | |
|     safe_pfs->m_lock.allocated_to_free();
 | |
| 
 | |
|     /* Flag the containing page as not full. */
 | |
|     page->m_full= false;
 | |
| 
 | |
|     /* Find the containing buffer */
 | |
|     PFS_opaque_container *opaque_container= page->m_container;
 | |
|     PFS_buffer_scalable_container *container;
 | |
|     container= reinterpret_cast<container_type *> (opaque_container);
 | |
| 
 | |
|     /* Flag the overall container as not full. */
 | |
|     container->m_full= false;
 | |
|   }
 | |
| 
 | |
|   iterator_type iterate()
 | |
|   {
 | |
|     return PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>(this, 0);
 | |
|   }
 | |
| 
 | |
|   iterator_type iterate(uint index)
 | |
|   {
 | |
|     assert(index <= m_max);
 | |
|     return PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>(this, index);
 | |
|   }
 | |
| 
 | |
|   void apply(function_type fct)
 | |
|   {
 | |
|     uint i;
 | |
|     array_type *page;
 | |
|     value_type *pfs;
 | |
|     value_type *pfs_last;
 | |
| 
 | |
|     for (i=0 ; i < PFS_PAGE_COUNT; i++)
 | |
|     {
 | |
|       page= m_pages[i];
 | |
|       if (page != NULL)
 | |
|       {
 | |
|         pfs= page->get_first();
 | |
|         pfs_last= page->get_last();
 | |
| 
 | |
|         while (pfs < pfs_last)
 | |
|         {
 | |
|           if (pfs->m_lock.is_populated())
 | |
|           {
 | |
|             fct(pfs);
 | |
|           }
 | |
|           pfs++;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply_all(function_type fct)
 | |
|   {
 | |
|     uint i;
 | |
|     array_type *page;
 | |
|     value_type *pfs;
 | |
|     value_type *pfs_last;
 | |
| 
 | |
|     for (i=0 ; i < PFS_PAGE_COUNT; i++)
 | |
|     {
 | |
|       page= m_pages[i];
 | |
|       if (page != NULL)
 | |
|       {
 | |
|         pfs= page->get_first();
 | |
|         pfs_last= page->get_last();
 | |
| 
 | |
|         while (pfs < pfs_last)
 | |
|         {
 | |
|           fct(pfs);
 | |
|           pfs++;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply(processor_type & proc)
 | |
|   {
 | |
|     uint i;
 | |
|     array_type *page;
 | |
|     value_type *pfs;
 | |
|     value_type *pfs_last;
 | |
| 
 | |
|     for (i=0 ; i < PFS_PAGE_COUNT; i++)
 | |
|     {
 | |
|       page= m_pages[i];
 | |
|       if (page != NULL)
 | |
|       {
 | |
|         pfs= page->get_first();
 | |
|         pfs_last= page->get_last();
 | |
| 
 | |
|         while (pfs < pfs_last)
 | |
|         {
 | |
|           if (pfs->m_lock.is_populated())
 | |
|           {
 | |
|             proc(pfs);
 | |
|           }
 | |
|           pfs++;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply_all(processor_type & proc)
 | |
|   {
 | |
|     uint i;
 | |
|     array_type *page;
 | |
|     value_type *pfs;
 | |
|     value_type *pfs_last;
 | |
| 
 | |
|     for (i=0 ; i < PFS_PAGE_COUNT; i++)
 | |
|     {
 | |
|       page= m_pages[i];
 | |
|       if (page != NULL)
 | |
|       {
 | |
|         pfs= page->get_first();
 | |
|         pfs_last= page->get_last();
 | |
| 
 | |
|         while (pfs < pfs_last)
 | |
|         {
 | |
|           proc(pfs);
 | |
|           pfs++;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   value_type* get(uint index)
 | |
|   {
 | |
|     assert(index < m_max);
 | |
| 
 | |
|     uint index_1= index / PFS_PAGE_SIZE;
 | |
|     array_type *page= m_pages[index_1];
 | |
|     if (page != NULL)
 | |
|     {
 | |
|       uint index_2= index % PFS_PAGE_SIZE;
 | |
| 
 | |
|       if (index_2 >= page->m_max)
 | |
|       {
 | |
|         return NULL;
 | |
|       }
 | |
| 
 | |
|       value_type *pfs= page->m_ptr + index_2;
 | |
| 
 | |
|       if (pfs->m_lock.is_populated())
 | |
|       {
 | |
|         return pfs;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   value_type* get(uint index, bool *has_more)
 | |
|   {
 | |
|     if (index >= m_max)
 | |
|     {
 | |
|       *has_more= false;
 | |
|       return NULL;
 | |
|     }
 | |
| 
 | |
|     uint index_1= index / PFS_PAGE_SIZE;
 | |
|     array_type *page= m_pages[index_1];
 | |
| 
 | |
|     if (page == NULL)
 | |
|     {
 | |
|       *has_more= false;
 | |
|       return NULL;
 | |
|     }
 | |
| 
 | |
|     uint index_2= index % PFS_PAGE_SIZE;
 | |
| 
 | |
|     if (index_2 >= page->m_max)
 | |
|     {
 | |
|       *has_more= false;
 | |
|       return NULL;
 | |
|     }
 | |
| 
 | |
|     *has_more= true;
 | |
|     value_type *pfs= page->m_ptr + index_2;
 | |
| 
 | |
|     if (pfs->m_lock.is_populated())
 | |
|     {
 | |
|       return pfs;
 | |
|     }
 | |
| 
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   value_type *sanitize(value_type *unsafe)
 | |
|   {
 | |
|     intptr offset;
 | |
|     uint i;
 | |
|     array_type *page;
 | |
|     value_type *pfs;
 | |
|     value_type *pfs_last;
 | |
| 
 | |
|     for (i=0 ; i < PFS_PAGE_COUNT; i++)
 | |
|     {
 | |
|       page= m_pages[i];
 | |
|       if (page != NULL)
 | |
|       {
 | |
|         pfs= page->get_first();
 | |
|         pfs_last= page->get_last();
 | |
| 
 | |
|         if ((pfs <= unsafe) &&
 | |
|             (unsafe < pfs_last))
 | |
|         {
 | |
|           offset= ((intptr) unsafe - (intptr) pfs) % sizeof(value_type);
 | |
|           if (offset == 0)
 | |
|             return unsafe;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   ulong m_lost;
 | |
| 
 | |
| private:
 | |
| 
 | |
|   uint get_page_logical_size(uint page_index)
 | |
|   {
 | |
|     if (page_index + 1 < m_max_page_count)
 | |
|       return PFS_PAGE_SIZE;
 | |
|     assert(page_index + 1 == m_max_page_count);
 | |
|     return m_last_page_size;
 | |
|   }
 | |
| 
 | |
|   value_type* scan_next(uint & index, uint * found_index)
 | |
|   {
 | |
|     assert(index <= m_max);
 | |
| 
 | |
|     uint index_1= index / PFS_PAGE_SIZE;
 | |
|     uint index_2= index % PFS_PAGE_SIZE;
 | |
|     array_type *page;
 | |
|     value_type *pfs_first;
 | |
|     value_type *pfs;
 | |
|     value_type *pfs_last;
 | |
| 
 | |
|     while (index_1 < PFS_PAGE_COUNT)
 | |
|     {
 | |
|       page= m_pages[index_1];
 | |
| 
 | |
|       if (page == NULL)
 | |
|       {
 | |
|         index= static_cast<uint>(m_max);
 | |
|         return NULL;
 | |
|       }
 | |
| 
 | |
|       pfs_first= page->get_first();
 | |
|       pfs= pfs_first + index_2;
 | |
|       pfs_last= page->get_last();
 | |
| 
 | |
|       while (pfs < pfs_last)
 | |
|       {
 | |
|         if (pfs->m_lock.is_populated())
 | |
|         {
 | |
|           uint found= index_1 * PFS_PAGE_SIZE + static_cast<uint>(pfs - pfs_first);
 | |
|           *found_index= found;
 | |
|           index= found + 1;
 | |
|           return pfs;
 | |
|         }
 | |
|         pfs++;
 | |
|       }
 | |
| 
 | |
|       index_1++;
 | |
|       index_2= 0;
 | |
|     }
 | |
| 
 | |
|     index= static_cast<uint>(m_max);
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   bool m_initialized;
 | |
|   bool m_full;
 | |
|   size_t m_max;
 | |
|   PFS_cacheline_uint32 m_monotonic;
 | |
|   PFS_cacheline_uint32 m_max_page_index;
 | |
|   ulong m_max_page_count;
 | |
|   ulong m_last_page_size;
 | |
|   array_type * m_pages[PFS_PAGE_COUNT];
 | |
|   allocator_type *m_allocator;
 | |
|   pthread_mutex_t m_critical_section;
 | |
| };
 | |
| 
 | |
| template <class T, class U, class V>
 | |
| class PFS_buffer_iterator
 | |
| {
 | |
|   friend class PFS_buffer_container<T, U, V>;
 | |
| 
 | |
|   typedef T value_type;
 | |
|   typedef PFS_buffer_container<T, U, V> container_type;
 | |
| 
 | |
| public:
 | |
|   value_type* scan_next()
 | |
|   {
 | |
|     uint unused;
 | |
|     return m_container->scan_next(m_index, & unused);
 | |
|   }
 | |
| 
 | |
|   value_type* scan_next(uint * found_index)
 | |
|   {
 | |
|     return m_container->scan_next(m_index, found_index);
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   PFS_buffer_iterator(container_type *container, uint index)
 | |
|     : m_container(container),
 | |
|       m_index(index)
 | |
|   {}
 | |
| 
 | |
|   container_type *m_container;
 | |
|   uint m_index;
 | |
| };
 | |
| 
 | |
| template <class T, int page_size, int page_count, class U, class V>
 | |
| class PFS_buffer_scalable_iterator
 | |
| {
 | |
|   friend class PFS_buffer_scalable_container<T, page_size, page_count, U, V>;
 | |
| 
 | |
|   typedef T value_type;
 | |
|   typedef PFS_buffer_scalable_container<T, page_size, page_count, U, V> container_type;
 | |
| 
 | |
| public:
 | |
|   value_type* scan_next()
 | |
|   {
 | |
|     uint unused;
 | |
|     return m_container->scan_next(m_index, & unused);
 | |
|   }
 | |
| 
 | |
|   value_type* scan_next(uint * found_index)
 | |
|   {
 | |
|     return m_container->scan_next(m_index, found_index);
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   PFS_buffer_scalable_iterator(container_type *container, uint index)
 | |
|     : m_container(container),
 | |
|       m_index(index)
 | |
|   {}
 | |
| 
 | |
|   container_type *m_container;
 | |
|   uint m_index;
 | |
| };
 | |
| 
 | |
| template <class T>
 | |
| class PFS_buffer_processor
 | |
| {
 | |
| public:
 | |
|   virtual ~PFS_buffer_processor()= default;
 | |
|   virtual void operator()(T *element) = 0;
 | |
| };
 | |
| 
 | |
| template <class B, int PFS_PARTITION_COUNT>
 | |
| class PFS_partitioned_buffer_scalable_container
 | |
| {
 | |
| public:
 | |
|   friend class PFS_partitioned_buffer_scalable_iterator<B, PFS_PARTITION_COUNT>;
 | |
| 
 | |
|   typedef typename B::value_type value_type;
 | |
|   typedef typename B::allocator_type allocator_type;
 | |
|   typedef PFS_partitioned_buffer_scalable_iterator<B, PFS_PARTITION_COUNT> iterator_type;
 | |
|   typedef typename B::iterator_type sub_iterator_type;
 | |
|   typedef typename B::processor_type processor_type;
 | |
|   typedef typename B::function_type function_type;
 | |
| 
 | |
|   PFS_partitioned_buffer_scalable_container(allocator_type *allocator)
 | |
|   {
 | |
|     for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       m_partitions[i]= new B(allocator);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   ~PFS_partitioned_buffer_scalable_container()
 | |
|   {
 | |
|     for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       delete m_partitions[i];
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   int init(long max_size)
 | |
|   {
 | |
|     int rc= 0;
 | |
|     // FIXME: we have max_size * PFS_PARTITION_COUNT here
 | |
|     for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       rc|= m_partitions[i]->init(max_size);
 | |
|     }
 | |
|     return rc;
 | |
|   }
 | |
| 
 | |
|   void cleanup()
 | |
|   {
 | |
|     for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       m_partitions[i]->cleanup();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   ulong get_row_count() const
 | |
|   {
 | |
|     ulong sum= 0;
 | |
| 
 | |
|     for (int i=0; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       sum += m_partitions[i]->get_row_count();
 | |
|     }
 | |
| 
 | |
|     return sum;
 | |
|   }
 | |
| 
 | |
|   ulong get_row_size() const
 | |
|   {
 | |
|     return sizeof(value_type);
 | |
|   }
 | |
| 
 | |
|   ulong get_memory() const
 | |
|   {
 | |
|     ulong sum= 0;
 | |
| 
 | |
|     for (int i=0; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       sum += m_partitions[i]->get_memory();
 | |
|     }
 | |
| 
 | |
|     return sum;
 | |
|   }
 | |
| 
 | |
|   long get_lost_counter()
 | |
|   {
 | |
|     long sum= 0;
 | |
| 
 | |
|     for (int i=0; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       sum += m_partitions[i]->m_lost;
 | |
|     }
 | |
| 
 | |
|     return sum;
 | |
|   }
 | |
| 
 | |
|   value_type *allocate(pfs_dirty_state *dirty_state, uint partition)
 | |
|   {
 | |
|     assert(partition < PFS_PARTITION_COUNT);
 | |
| 
 | |
|     return m_partitions[partition]->allocate(dirty_state);
 | |
|   }
 | |
| 
 | |
|   void deallocate(value_type *safe_pfs)
 | |
|   {
 | |
|     /*
 | |
|       One issue here is that we do not know which partition
 | |
|       the record belongs to.
 | |
|       Each record points to the parent page,
 | |
|       and each page points to the parent buffer,
 | |
|       so using static_deallocate here,
 | |
|       which will find the correct partition by itself.
 | |
|     */
 | |
|     B::static_deallocate(safe_pfs);
 | |
|   }
 | |
| 
 | |
|   iterator_type iterate()
 | |
|   {
 | |
|     return iterator_type(this, 0, 0);
 | |
|   }
 | |
| 
 | |
|   iterator_type iterate(uint user_index)
 | |
|   {
 | |
|     uint partition_index;
 | |
|     uint sub_index;
 | |
|     unpack_index(user_index, &partition_index, &sub_index);
 | |
|     return iterator_type(this, partition_index, sub_index);
 | |
|   }
 | |
| 
 | |
|   void apply(function_type fct)
 | |
|   {
 | |
|     for (int i=0; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       m_partitions[i]->apply(fct);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply_all(function_type fct)
 | |
|   {
 | |
|     for (int i=0; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       m_partitions[i]->apply_all(fct);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply(processor_type & proc)
 | |
|   {
 | |
|     for (int i=0; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       m_partitions[i]->apply(proc);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void apply_all(processor_type & proc)
 | |
|   {
 | |
|     for (int i=0; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       m_partitions[i]->apply_all(proc);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   value_type* get(uint user_index)
 | |
|   {
 | |
|     uint partition_index;
 | |
|     uint sub_index;
 | |
|     unpack_index(user_index, &partition_index, &sub_index);
 | |
| 
 | |
|     if (partition_index >= PFS_PARTITION_COUNT)
 | |
|     {
 | |
|       return NULL;
 | |
|     }
 | |
| 
 | |
|     return m_partitions[partition_index]->get(sub_index);
 | |
|   }
 | |
| 
 | |
|   value_type* get(uint user_index, bool *has_more)
 | |
|   {
 | |
|     uint partition_index;
 | |
|     uint sub_index;
 | |
|     unpack_index(user_index, &partition_index, &sub_index);
 | |
| 
 | |
|     if (partition_index >= PFS_PARTITION_COUNT)
 | |
|     {
 | |
|       *has_more= false;
 | |
|       return NULL;
 | |
|     }
 | |
| 
 | |
|     *has_more= true;
 | |
|     return m_partitions[partition_index]->get(sub_index);
 | |
|   }
 | |
| 
 | |
|   value_type *sanitize(value_type *unsafe)
 | |
|   {
 | |
|     value_type *safe= NULL;
 | |
| 
 | |
|     for (int i=0; i < PFS_PARTITION_COUNT; i++)
 | |
|     {
 | |
|       safe= m_partitions[i]->sanitize(unsafe);
 | |
|       if (safe != NULL)
 | |
|       {
 | |
|         return safe;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     return safe;
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   static void pack_index(uint partition_index, uint sub_index, uint *user_index)
 | |
|   {
 | |
|     /* 2^8 = 256 partitions max */
 | |
|     compile_time_assert(PFS_PARTITION_COUNT <= (1 << 8));
 | |
|     /* 2^24 = 16777216 max per partitioned buffer. */
 | |
|     compile_time_assert((B::MAX_SIZE) <= (1 << 24));
 | |
| 
 | |
|     *user_index= (partition_index << 24) + sub_index;
 | |
|   }
 | |
| 
 | |
|   static void unpack_index(uint user_index, uint *partition_index, uint *sub_index)
 | |
|   {
 | |
|     *partition_index= user_index >> 24;
 | |
|     *sub_index= user_index & 0x00FFFFFF;
 | |
|   }
 | |
| 
 | |
|   value_type* scan_next(uint & partition_index, uint & sub_index, uint * found_partition, uint * found_sub_index)
 | |
|   {
 | |
|     value_type *record= NULL;
 | |
|     assert(partition_index < PFS_PARTITION_COUNT);
 | |
| 
 | |
|     while (partition_index < PFS_PARTITION_COUNT)
 | |
|     {
 | |
|       sub_iterator_type sub_iterator= m_partitions[partition_index]->iterate(sub_index);
 | |
|       record= sub_iterator.scan_next(found_sub_index);
 | |
|       if (record != NULL)
 | |
|       {
 | |
|         *found_partition= partition_index;
 | |
|         sub_index= *found_sub_index + 1;
 | |
|         return record;
 | |
|       }
 | |
| 
 | |
|       partition_index++;
 | |
|       sub_index= 0;
 | |
|     }
 | |
| 
 | |
|     *found_partition= PFS_PARTITION_COUNT;
 | |
|     *found_sub_index= 0;
 | |
|     sub_index= 0;
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   B *m_partitions[PFS_PARTITION_COUNT];
 | |
| };
 | |
| 
 | |
| template <class B, int PFS_PARTITION_COUNT>
 | |
| class PFS_partitioned_buffer_scalable_iterator
 | |
| {
 | |
| public:
 | |
|   friend class PFS_partitioned_buffer_scalable_container<B, PFS_PARTITION_COUNT>;
 | |
| 
 | |
|   typedef typename B::value_type value_type;
 | |
|   typedef PFS_partitioned_buffer_scalable_container<B, PFS_PARTITION_COUNT> container_type;
 | |
| 
 | |
|   value_type* scan_next()
 | |
|   {
 | |
|     uint unused_partition;
 | |
|     uint unused_sub_index;
 | |
|     return m_container->scan_next(m_partition, m_sub_index, & unused_partition, & unused_sub_index);
 | |
|   }
 | |
| 
 | |
|   value_type* scan_next(uint *found_user_index)
 | |
|   {
 | |
|     uint found_partition;
 | |
|     uint found_sub_index;
 | |
|     value_type *record;
 | |
|     record=  m_container->scan_next(m_partition, m_sub_index, &found_partition, &found_sub_index);
 | |
|     container_type::pack_index(found_partition, found_sub_index, found_user_index);
 | |
|     return record;
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   PFS_partitioned_buffer_scalable_iterator(container_type *container, uint partition, uint sub_index)
 | |
|     : m_container(container),
 | |
|       m_partition(partition),
 | |
|       m_sub_index(sub_index)
 | |
|   {}
 | |
| 
 | |
|   container_type *m_container;
 | |
|   uint m_partition;
 | |
|   uint m_sub_index;
 | |
| };
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_mutex, 1024, 1024> PFS_mutex_basic_container;
 | |
| typedef PFS_partitioned_buffer_scalable_container<PFS_mutex_basic_container, PSI_COUNT_VOLATILITY> PFS_mutex_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_mutex> PFS_mutex_container;
 | |
| #endif
 | |
| typedef PFS_mutex_container::iterator_type PFS_mutex_iterator;
 | |
| extern PFS_mutex_container global_mutex_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_rwlock, 1024, 1024> PFS_rwlock_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_rwlock> PFS_rwlock_container;
 | |
| #endif
 | |
| typedef PFS_rwlock_container::iterator_type PFS_rwlock_iterator;
 | |
| extern PFS_rwlock_container global_rwlock_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_cond, 256, 256> PFS_cond_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_cond> PFS_cond_container;
 | |
| #endif
 | |
| typedef PFS_cond_container::iterator_type PFS_cond_iterator;
 | |
| extern PFS_cond_container global_cond_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_file, 4 * 1024, 4 * 1024> PFS_file_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_file> PFS_file_container;
 | |
| #endif
 | |
| typedef PFS_file_container::iterator_type PFS_file_iterator;
 | |
| extern PFS_file_container global_file_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_socket, 256, 256> PFS_socket_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_socket> PFS_socket_container;
 | |
| #endif
 | |
| typedef PFS_socket_container::iterator_type PFS_socket_iterator;
 | |
| extern PFS_socket_container global_socket_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_metadata_lock, 1024, 1024> PFS_mdl_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_metadata_lock> PFS_mdl_container;
 | |
| #endif
 | |
| typedef PFS_mdl_container::iterator_type PFS_mdl_iterator;
 | |
| extern PFS_mdl_container global_mdl_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_setup_actor, 128, 1024> PFS_setup_actor_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_setup_actor> PFS_setup_actor_container;
 | |
| #endif
 | |
| typedef PFS_setup_actor_container::iterator_type PFS_setup_actor_iterator;
 | |
| extern PFS_setup_actor_container global_setup_actor_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_setup_object, 128, 1024> PFS_setup_object_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_setup_object> PFS_setup_object_container;
 | |
| #endif
 | |
| typedef PFS_setup_object_container::iterator_type PFS_setup_object_iterator;
 | |
| extern PFS_setup_object_container global_setup_object_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_table, 1024, 1024> PFS_table_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_table> PFS_table_container;
 | |
| #endif
 | |
| typedef PFS_table_container::iterator_type PFS_table_iterator;
 | |
| extern PFS_table_container global_table_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_table_share, 4 * 1024, 4 * 1024> PFS_table_share_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_table_share> PFS_table_share_container;
 | |
| #endif
 | |
| typedef PFS_table_share_container::iterator_type PFS_table_share_iterator;
 | |
| extern PFS_table_share_container global_table_share_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_table_share_index, 8 * 1024, 8 * 1024> PFS_table_share_index_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_table_share_index> PFS_table_share_index_container;
 | |
| #endif
 | |
| typedef PFS_table_share_index_container::iterator_type PFS_table_share_index_iterator;
 | |
| extern PFS_table_share_index_container global_table_share_index_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_table_share_lock, 4 * 1024, 4 * 1024> PFS_table_share_lock_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_table_share_lock> PFS_table_share_lock_container;
 | |
| #endif
 | |
| typedef PFS_table_share_lock_container::iterator_type PFS_table_share_lock_iterator;
 | |
| extern PFS_table_share_lock_container global_table_share_lock_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_program, 1024, 1024> PFS_program_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_program> PFS_program_container;
 | |
| #endif
 | |
| typedef PFS_program_container::iterator_type PFS_program_iterator;
 | |
| extern PFS_program_container global_program_container;
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_prepared_stmt, 1024, 1024> PFS_prepared_stmt_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_prepared_stmt> PFS_prepared_stmt_container;
 | |
| #endif
 | |
| typedef PFS_prepared_stmt_container::iterator_type PFS_prepared_stmt_iterator;
 | |
| extern PFS_prepared_stmt_container global_prepared_stmt_container;
 | |
| 
 | |
| class PFS_account_array : public PFS_buffer_default_array<PFS_account>
 | |
| {
 | |
| public:
 | |
|   PFS_single_stat *m_instr_class_waits_array;
 | |
|   PFS_stage_stat *m_instr_class_stages_array;
 | |
|   PFS_statement_stat *m_instr_class_statements_array;
 | |
|   PFS_transaction_stat *m_instr_class_transactions_array;
 | |
|   PFS_memory_stat *m_instr_class_memory_array;
 | |
| };
 | |
| 
 | |
| class PFS_account_allocator
 | |
| {
 | |
| public:
 | |
|   int alloc_array(PFS_account_array *array);
 | |
|   void free_array(PFS_account_array *array);
 | |
| };
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_account,
 | |
|                                       128,
 | |
|                                       128,
 | |
|                                       PFS_account_array,
 | |
|                                       PFS_account_allocator> PFS_account_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_account,
 | |
|                              PFS_account_array,
 | |
|                              PFS_account_allocator> PFS_account_container;
 | |
| #endif
 | |
| typedef PFS_account_container::iterator_type PFS_account_iterator;
 | |
| extern PFS_account_container global_account_container;
 | |
| 
 | |
| class PFS_host_array : public PFS_buffer_default_array<PFS_host>
 | |
| {
 | |
| public:
 | |
|   PFS_single_stat *m_instr_class_waits_array;
 | |
|   PFS_stage_stat *m_instr_class_stages_array;
 | |
|   PFS_statement_stat *m_instr_class_statements_array;
 | |
|   PFS_transaction_stat *m_instr_class_transactions_array;
 | |
|   PFS_memory_stat *m_instr_class_memory_array;
 | |
| };
 | |
| 
 | |
| class PFS_host_allocator
 | |
| {
 | |
| public:
 | |
|   int alloc_array(PFS_host_array *array);
 | |
|   void free_array(PFS_host_array *array);
 | |
| };
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_host,
 | |
|                                       128,
 | |
|                                       128,
 | |
|                                       PFS_host_array,
 | |
|                                       PFS_host_allocator> PFS_host_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_host,
 | |
|                              PFS_host_array,
 | |
|                              PFS_host_allocator> PFS_host_container;
 | |
| #endif
 | |
| typedef PFS_host_container::iterator_type PFS_host_iterator;
 | |
| extern PFS_host_container global_host_container;
 | |
| 
 | |
| class PFS_thread_array : public PFS_buffer_default_array<PFS_thread>
 | |
| {
 | |
| public:
 | |
|   PFS_single_stat *m_instr_class_waits_array;
 | |
|   PFS_stage_stat *m_instr_class_stages_array;
 | |
|   PFS_statement_stat *m_instr_class_statements_array;
 | |
|   PFS_transaction_stat *m_instr_class_transactions_array;
 | |
|   PFS_memory_stat *m_instr_class_memory_array;
 | |
| 
 | |
|   PFS_events_waits *m_waits_history_array;
 | |
|   PFS_events_stages *m_stages_history_array;
 | |
|   PFS_events_statements *m_statements_history_array;
 | |
|   PFS_events_statements *m_statements_stack_array;
 | |
|   PFS_events_transactions *m_transactions_history_array;
 | |
|   char *m_session_connect_attrs_array;
 | |
| 
 | |
|   char *m_current_stmts_text_array;
 | |
|   char *m_history_stmts_text_array;
 | |
|   unsigned char *m_current_stmts_digest_token_array;
 | |
|   unsigned char *m_history_stmts_digest_token_array;
 | |
| };
 | |
| 
 | |
| class PFS_thread_allocator
 | |
| {
 | |
| public:
 | |
|   int alloc_array(PFS_thread_array *array);
 | |
|   void free_array(PFS_thread_array *array);
 | |
| };
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_thread,
 | |
|                                       256,
 | |
|                                       256,
 | |
|                                       PFS_thread_array,
 | |
|                                       PFS_thread_allocator> PFS_thread_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_thread,
 | |
|                              PFS_thread_array,
 | |
|                              PFS_thread_allocator> PFS_thread_container;
 | |
| #endif
 | |
| typedef PFS_thread_container::iterator_type PFS_thread_iterator;
 | |
| extern PFS_thread_container global_thread_container;
 | |
| 
 | |
| class PFS_user_array : public PFS_buffer_default_array<PFS_user>
 | |
| {
 | |
| public:
 | |
|   PFS_single_stat *m_instr_class_waits_array;
 | |
|   PFS_stage_stat *m_instr_class_stages_array;
 | |
|   PFS_statement_stat *m_instr_class_statements_array;
 | |
|   PFS_transaction_stat *m_instr_class_transactions_array;
 | |
|   PFS_memory_stat *m_instr_class_memory_array;
 | |
| };
 | |
| 
 | |
| class PFS_user_allocator
 | |
| {
 | |
| public:
 | |
|   int alloc_array(PFS_user_array *array);
 | |
|   void free_array(PFS_user_array *array);
 | |
| };
 | |
| 
 | |
| #ifdef USE_SCALABLE
 | |
| typedef PFS_buffer_scalable_container<PFS_user,
 | |
|                                       128,
 | |
|                                       128,
 | |
|                                       PFS_user_array,
 | |
|                                       PFS_user_allocator> PFS_user_container;
 | |
| #else
 | |
| typedef PFS_buffer_container<PFS_user,
 | |
|                              PFS_user_array,
 | |
|                              PFS_user_allocator> PFS_user_container;
 | |
| #endif
 | |
| typedef PFS_user_container::iterator_type PFS_user_iterator;
 | |
| extern PFS_user_container global_user_container;
 | |
| 
 | |
| #endif
 | |
| 
 |