mariadb/ft/partitioned_counter.cc
Leif Walsh bd63072bdf closes #5485 #5368 clean up cmake and get darwin portability:
for 5485:
 - remove icc handling stuff from cmake
 - clean up remaining cmake files
 - create libtokudb_static.a in a cleaner way using cmake OBJECT libs (requires cmake 2.8.9)
 - use POSITION_INDEPENDENT_CODE property instead of manually setting -fPIC
for 5368:
 - fix compilation failures due to small differences between gcc and clang, mostly in tests
 - use toku_fileids_are_equal instead of memcmp to compare fileids (closes #5505)
 - create dummy implementation of partitioned_counter for osx (quick fix for, and closes #5506)
 - add mutex->valid bit under TOKU_PTHREAD_DEBUG
 - initialize mutex of DB_TXN created during recovery for 2PC (closes #5507)


git-svn-id: file:///svn/toku/tokudb@48024 c7de825b-a66e-492c-adef-691d508d4ae1
2013-04-17 00:01:07 -04:00

373 lines
15 KiB
C++

/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <valgrind/helgrind.h>
#include <sys/types.h>
#include <pthread.h>
#include "memory.h"
#include "partitioned_counter.h"
#include "doubly_linked_list.h"
#include "growable_array.h"
#ifdef __APPLE__
// TODO(leif): The __thread declspec is broken in ways I don't understand
// on Darwin. Partitioned counters use them and it would be prohibitive
// to tease them apart before a week after 6.5.0, so instead, we're just
// not going to use them in the most brutal way possible. This is a
// terrible implementation of the API in partitioned_counter.h but it
// should be correct enough to release a non-performant version on OSX for
// development. Soon, we need to either make portable partitioned
// counters, or we need to do this disabling in a portable way.
struct partitioned_counter {
uint64_t v;
};
PARTITIONED_COUNTER create_partitioned_counter(void) {
PARTITIONED_COUNTER XCALLOC(counter);
return counter;
}
void destroy_partitioned_counter(PARTITIONED_COUNTER counter) {
toku_free(counter);
}
void increment_partitioned_counter(PARTITIONED_COUNTER counter, uint64_t delta) {
(void) __sync_fetch_and_add(&counter->v, delta);
}
uint64_t read_partitioned_counter(PARTITIONED_COUNTER counter) {
return counter->v;
}
void partitioned_counters_init(void) {}
void partitioned_counters_destroy(void) {}
#else // __APPLE__
//******************************************************************************
//
// Representation: The representation of a partitioned counter comprises a
// sum, called sum_of_dead; an index, called the ckey, which indexes into a
// thread-local array to find a thread-local part of the counter; and a
// linked list of thread-local parts.
//
// There is also a linked list, for each thread that has a thread-local part
// of any counter, of all the thread-local parts of all the counters.
//
// There is a pthread_key which gives us a hook to clean up thread-local
// state when a thread terminates. For each thread-local part of a counter
// that the thread has, we add in the thread-local sum into the sum_of_dead.
//
// Finally there is a list of all the thread-local arrays so that when we
// destroy the partitioned counter before the threads are done, we can find
// and destroy the thread_local_arrays before destroying the pthread_key.
//
// Abstraction function: The sum is represented by the sum of _sum and the
// sum's of the thread-local parts of the counter.
//
// Representation invariant: Every thread-local part is in the linked list of
// the thread-local parts of its counter, as well as in the linked list of
// the counters of a the thread.
//
//******************************************************************************
//******************************************************************************
// The mutex for the PARTITIONED_COUNTER
// We have a single mutex for all the counters because
// (a) the mutex is obtained infrequently, and
// (b) it helps us avoid race conditions when destroying the counters.
// The alternative that I couldn't make work is to have a mutex per counter.
// But the problem is that the counter can be destroyed before threads
// terminate, or maybe a thread terminates before the counter is destroyed.
// If the counter is destroyed first, then the mutex is no longer available.
//******************************************************************************
static pthread_mutex_t partitioned_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
static void pc_lock (void)
// Effect: Lock the mutex.
{
int r = pthread_mutex_lock(&partitioned_counter_mutex);
assert(r==0);
}
static void pc_unlock (void)
// Effect: Unlock the mutex.
{
int r = pthread_mutex_unlock(&partitioned_counter_mutex);
assert(r==0);
}
//******************************************************************************
// Key creation primivites.
//******************************************************************************
static void pk_create (pthread_key_t *key, void (*destructor)(void*)) {
int r = pthread_key_create(key, destructor);
assert(r==0);
}
static void pk_delete (pthread_key_t key) {
int r = pthread_key_delete(key);
assert(r==0);
}
static void pk_setspecific (pthread_key_t key, const void *value) {
int r = pthread_setspecific(key, value);
assert(r==0);
}
//******************************************************************************
// The counter itself.
// The thread local part of a counter, comprising the thread-local sum a pointer
// to the partitioned_counter, a pointer to the thread_local list head, and two
// linked lists. One of the lists is all the thread-local parts that belong to
// the same counter, and the other is all the thread-local parts that belogn to
// the same thread.
//******************************************************************************
struct local_counter;
struct partitioned_counter {
uint64_t sum_of_dead; // The sum of all thread-local counts from threads that have terminated.
uint64_t pc_key; // A unique integer among all counters that have been created but not yet destroyed.
DoublyLinkedList<struct local_counter *> ll_counter_head; // A linked list of all the thread-local information for this counter.
};
struct local_counter {
uint64_t sum; // The thread-local sum.
PARTITIONED_COUNTER owner_pc; // The partitioned counter that this is part of.
GrowableArray<struct local_counter *> *thread_local_array; // The thread local array for this thread holds this local_counter at offset owner_pc->pc_key.
LinkedListElement<struct local_counter *> ll_in_counter; // Element for the doubly-linked list of thread-local information for this PARTITIONED_COUNTER.
};
// Try to get it it into one cache line by aligning it.
static __thread GrowableArray<struct local_counter *> thread_local_array;
static __thread bool thread_local_array_inited = false;
DoublyLinkedList<GrowableArray<struct local_counter *> *> all_thread_local_arrays;
__thread LinkedListElement<GrowableArray<struct local_counter *> *> thread_local_ll_elt;
// I want this to be static, but I have to use hidden visibility instead because it's a friend function.
static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me);
static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me __attribute__((__unused__)))
// Effect: This function is called whenever a thread terminates using the
// destructor of the thread_destructor_key (defined below). First grab the
// lock, then go through all the partitioned counters and removes the part that
// is local to this thread. We don't actually need the contents of the
// thread_destructor_key except to cause this function to run. The content of
// the key is a static string, so don't try to free it.
{
pc_lock();
for (size_t i=0; i<thread_local_array.get_size(); i++) {
struct local_counter *lc = thread_local_array.fetch_unchecked(i);
if (lc==NULL) continue;
PARTITIONED_COUNTER owner = lc->owner_pc;
owner->sum_of_dead += lc->sum;
owner->ll_counter_head.remove(&lc->ll_in_counter);
toku_free(lc);
}
all_thread_local_arrays.remove(&thread_local_ll_elt);
thread_local_array_inited = false;
thread_local_array.deinit();
pc_unlock();
}
//******************************************************************************
// We employ a system-wide pthread_key simply to get a notification when a
// thread terminates. The key will simply contain a constant string (it's "dont
// care", but it doesn't matter what it is, as long as it's not NULL. We need
// a constructor function to set up the pthread_key. We used a constructor
// function intead of a C++ constructor because that's what we are used to,
// rather than because it's necessarily better. Whenever a thread tries to
// increment a partitioned_counter for the first time, it sets the
// pthread_setspecific for the thread_destructor_key. It's OK if the key gets
// setspecific multiple times, it's always the same value. When a thread (that
// has created a thread-local part of any partitioned counter) terminates, the
// destroy_thread_local_part_of_partitioned_counters will run. It may run
// before or after other pthread_key destructors, but the thread-local
// ll_thread_head variable is still present until the thread is completely done
// running.
//******************************************************************************
static pthread_key_t thread_destructor_key;
//******************************************************************************
// We don't like using up pthread_keys (macos provides only 128 of them),
// so we built our own.
//******************************************************************************
bool *counters_in_use = NULL;
uint64_t counters_in_use_size = 0;
static uint64_t allocate_counter (void)
// Effect: Find an unused counter number, and allocate it, returning the counter number.
// Requires: The pc mutex is held before calling.
{
for (uint64_t i=0; i<counters_in_use_size; i++) {
if (!counters_in_use[i]) {
counters_in_use[i]=true;
return i;
}
}
uint64_t old_size = counters_in_use_size;
if (counters_in_use_size==0) {
counters_in_use_size = 1;
} else {
counters_in_use_size *= 2;
}
XREALLOC_N(counters_in_use_size, counters_in_use);
for (uint64_t i=old_size; i<counters_in_use_size; i++) {
counters_in_use[i] = false;
}
assert(old_size < counters_in_use_size);
counters_in_use[old_size] = true;
return old_size;
}
static void free_counter(uint64_t counternum)
// Effect: Free a counter.
// Requires: The pc mutex is held before calling.
{
assert(counternum < counters_in_use_size);
assert(counters_in_use[counternum]);
counters_in_use[counternum] = false;
}
static void destroy_counters (void) {
toku_free(counters_in_use);
counters_in_use=NULL;
counters_in_use_size=0;
}
//******************************************************************************
// Now for the code that actually creates a counter.
//******************************************************************************
PARTITIONED_COUNTER create_partitioned_counter(void)
// Effect: Create a counter, initialized to zero.
{
PARTITIONED_COUNTER XMALLOC(result);
result->sum_of_dead = 0;
result->pc_key = allocate_counter();
result->ll_counter_head.init();
return result;
}
void destroy_partitioned_counter(PARTITIONED_COUNTER pc)
// Effect: Destroy the counter. No operations on this counter are permitted after.
// Implementation note: Since we have a global lock, we can destroy all the thread-local
// versions as well.
{
pc_lock();
uint64_t pc_key = pc->pc_key;
LinkedListElement<struct local_counter *> *first;
while (pc->ll_counter_head.pop(&first)) {
// We just removed first from the counter list, now we must remove it from the thread-local array.
struct local_counter *lc = first->get_container();
assert(pc == lc->owner_pc);
GrowableArray<struct local_counter *> *tla = lc->thread_local_array;
tla->store_unchecked(pc_key, NULL);
toku_free(lc);
}
toku_free(pc);
free_counter(pc_key);
pc_unlock();
}
static inline struct local_counter *get_thread_local_counter(uint64_t pc_key, GrowableArray<struct local_counter *> *a)
{
if (pc_key >= a->get_size()) {
return NULL;
} else {
return a->fetch_unchecked(pc_key);
}
}
static struct local_counter *get_or_alloc_thread_local_counter(PARTITIONED_COUNTER pc)
{
// Only this thread is allowed to modify thread_local_array, except for setting tla->array[pc_key] to NULL
// when a counter is destroyed (and in that case there should be no race because no other thread should be
// trying to access the same local counter at the same time.
uint64_t pc_key = pc->pc_key;
struct local_counter *lc = get_thread_local_counter(pc->pc_key, &thread_local_array);
if (__builtin_expect(!!(lc == NULL), 0)) {
XMALLOC(lc); // Might as well do the malloc without holding the pc lock. But most of the rest of this work needs the lock.
pc_lock();
// Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters.
if (!thread_local_array_inited) {
pk_setspecific(thread_destructor_key, "dont care");
thread_local_array_inited=true;
thread_local_array.init();
all_thread_local_arrays.insert(&thread_local_ll_elt, &thread_local_array);
}
lc->sum = 0;
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&lc->sum, sizeof(lc->sum)); // the counter increment is kind of racy.
lc->owner_pc = pc;
lc->thread_local_array = &thread_local_array;
// Grow the array if needed, filling in NULLs
while (thread_local_array.get_size() <= pc_key) {
thread_local_array.push(NULL);
}
thread_local_array.store_unchecked(pc_key, lc);
pc->ll_counter_head.insert(&lc->ll_in_counter, lc);
pc_unlock();
}
return lc;
}
void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount)
// Effect: Increment the counter by amount.
// Requires: No overflows. This is a 64-bit unsigned counter.
{
struct local_counter *lc = get_or_alloc_thread_local_counter(pc);
lc->sum += amount;
}
static int sumit(struct local_counter *lc, uint64_t *sum) {
(*sum)+=lc->sum;
return 0;
}
uint64_t read_partitioned_counter(PARTITIONED_COUNTER pc)
// Effect: Return the current value of the counter.
// Implementation note: Sum all the thread-local counts along with the sum_of_the_dead.
{
pc_lock();
uint64_t sum = pc->sum_of_dead;
int r = pc->ll_counter_head.iterate<uint64_t *>(sumit, &sum);
assert(r==0);
pc_unlock();
return sum;
}
void partitioned_counters_init(void)
// Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run.
{
pk_create(&thread_destructor_key, destroy_thread_local_part_of_partitioned_counters);
all_thread_local_arrays.init();
}
void partitioned_counters_destroy(void)
// Effect: Destroy any partitioned counters data structures.
{
pc_lock();
LinkedListElement<GrowableArray<struct local_counter *> *> *a_ll;
while (all_thread_local_arrays.pop(&a_ll)) {
a_ll->get_container()->deinit();
}
pk_delete(thread_destructor_key);
destroy_counters();
pc_unlock();
}
#endif // __APPLE__