refs #5155 refs #5308 closes #5309 merge new omt api functions to main (still unused), use templated omt for TOKUTXN->open_fts, OMT becomes a wrapper for omt<OMTVALUE>, other misc fixes

git-svn-id: file:///svn/toku/tokudb@46448 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
Yoni Fogel 2013-04-17 00:01:02 -04:00
parent 0dea54f5c6
commit fffce5d49f
18 changed files with 1212 additions and 1094 deletions

View file

@ -76,7 +76,6 @@ endmacro(set_ldflags_if_supported)
## disable some warnings
set_cflags_if_supported(
-Wno-missing-field-initializers
-Wno-error=strict-overflow
-Wstrict-null-sentinel
-Winit-self
-Wswitch

View file

@ -2664,10 +2664,8 @@ int toku_cachetable_unpin_and_remove (
}
static int
set_filenum_in_array(OMTVALUE hv, uint32_t index, void*arrayv) {
FILENUM *array = (FILENUM *) arrayv;
FT h = (FT) hv;
array[index] = toku_cachefile_filenum(h->cf);
set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array) {
array[index] = toku_cachefile_filenum(ft->cf);
return 0;
}
@ -2679,7 +2677,7 @@ log_open_txn (const TOKUTXN &txn, const uint32_t UU(index), CACHETABLE *const ct
CACHETABLE ct = *ctp;
TOKULOGGER logger = txn->logger;
FILENUMS open_filenums;
uint32_t num_filenums = toku_omt_size(txn->open_fts);
uint32_t num_filenums = txn->open_fts.size();
FILENUM array[num_filenums];
if (toku_txn_is_read_only(txn)) {
goto cleanup;
@ -2691,7 +2689,8 @@ log_open_txn (const TOKUTXN &txn, const uint32_t UU(index), CACHETABLE *const ct
open_filenums.num = num_filenums;
open_filenums.filenums = array;
//Fill in open_filenums
r = toku_omt_iterate(txn->open_fts, set_filenum_in_array, array);
r = txn->open_fts.iterate<FILENUM, set_filenum_in_array>(array);
invariant(r==0);
switch (toku_txn_get_state(txn)) {
case TOKUTXN_LIVE:{

View file

@ -831,7 +831,9 @@ ftleaf_split(
}
curr_src_bn_index++;
assert(B->n_children - curr_dest_bn_index == node->n_children - curr_src_bn_index);
invariant(B->n_children >= curr_dest_bn_index);
invariant(node->n_children >= curr_src_bn_index);
invariant(B->n_children - curr_dest_bn_index == node->n_children - curr_src_bn_index);
// move the rest of the basement nodes
for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
destroy_basement_node(BLB(B, curr_dest_bn_index));

View file

@ -294,7 +294,8 @@ static inline void set_BNC(FTNODE node, int i, NONLEAF_CHILDINFO nl) {
p->u.nonleaf = nl;
}
static inline BASEMENTNODE BLB(FTNODE node, int i) {
assert(0<=i && i<node->n_children);
assert(i<node->n_children);
assert(0<=i);
FTNODE_CHILD_POINTER p = node->bp[i].ptr;
assert(p.tag==BCT_LEAF);
return p.u.leaf;

View file

@ -20,7 +20,9 @@
#include <string.h>
#include <dirent.h>
#include "txn_manager.h"
#include "omt-tmpl.h"
using namespace toku;
// Locking for the logger
// For most purposes we use the big ydb lock.
// To log: grab the buf lock
@ -163,7 +165,7 @@ struct tokutxn {
toku_mutex_t txn_lock;
// Protected by the txn lock:
OMT open_fts; // a collection of the fts that we touched. Indexed by filenum.
omt<FT> open_fts; // a collection of the fts that we touched. Indexed by filenum.
struct txn_roll_info roll_info; // Info used to manage rollback entries
// Protected by the txn manager lock:

File diff suppressed because it is too large Load diff

View file

@ -68,8 +68,130 @@ namespace toku {
* If you are storing structs, you may want to be able to get a pointer to the data actually stored in the omt (see find_zero). To do this, use the second template parameter:
* typedef omt<struct foo, struct foo *> foo_omt_t;
*/
namespace omt_internal {
template<bool subtree_supports_marks>
class subtree_templated {
private:
uint32_t m_index;
public:
static const uint32_t NODE_NULL = UINT32_MAX;
inline void set_to_null(void) {
m_index = NODE_NULL;
}
inline bool is_null(void) const {
return NODE_NULL == get_index();
}
inline uint32_t get_index(void) const {
return m_index;
}
inline void set_index(uint32_t index) {
invariant(index != NODE_NULL);
m_index = index;
}
} __attribute__((__packed__,aligned(4)));
template<>
class subtree_templated<true> {
private:
uint32_t m_bitfield;
static const uint32_t MASK_INDEX = ~(((uint32_t)1) << 31);
static const uint32_t MASK_BIT = ((uint32_t)1) << 31;
inline void set_index_internal(uint32_t new_index) {
m_bitfield = (m_bitfield & MASK_BIT) | new_index;
}
public:
static const uint32_t NODE_NULL = INT32_MAX;
inline void set_to_null(void) {
set_index_internal(NODE_NULL);
}
inline bool is_null(void) const {
return NODE_NULL == get_index();
}
inline uint32_t get_index(void) const {
return m_bitfield & MASK_INDEX;
}
inline void set_index(uint32_t index) {
invariant(index < NODE_NULL);
set_index_internal(index);
}
inline bool get_bit(void) const {
return (m_bitfield & MASK_BIT) != 0;
}
inline void enable_bit(void) {
m_bitfield |= MASK_BIT;
}
inline void disable_bit(void) {
m_bitfield &= MASK_INDEX;
}
} __attribute__((__packed__)) ;
template<typename omtdata_t, bool subtree_supports_marks>
class omt_node_templated {
public:
uint32_t weight;
subtree_templated<subtree_supports_marks> left;
subtree_templated<subtree_supports_marks> right;
omtdata_t value;
// this needs to be in both implementations because we don't have
// a "static if" the caller can use
inline void clear_stolen_bits(void) {}
} __attribute__((__packed__,aligned(4)));
template<typename omtdata_t>
class omt_node_templated<omtdata_t, true> {
public:
uint32_t weight;
subtree_templated<true> left;
subtree_templated<true> right;
omtdata_t value;
inline bool get_marked(void) const {
return left.get_bit();
}
inline void set_marked_bit(void) {
return left.enable_bit();
}
inline void unset_marked_bit(void) {
return left.disable_bit();
}
inline bool get_marks_below(void) const {
return right.get_bit();
}
inline void set_marks_below_bit(void) {
// This function can be called by multiple threads.
// Checking first reduces cache invalidation.
if (!get_marks_below()) {
right.enable_bit();
}
}
inline void unset_marks_below_bit(void) {
right.disable_bit();
}
inline void clear_stolen_bits(void) {
unset_marked_bit();
unset_marks_below_bit();
}
} __attribute__((__packed__,aligned(4)));
}
template<typename omtdata_t,
typename omtdataout_t=omtdata_t>
typename omtdataout_t=omtdata_t,
bool supports_marks=false>
class omt {
public:
/**
@ -268,6 +390,50 @@ public:
int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
int iterate_on_range(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) const;
/**
* Effect: Iterate over the values of the omt, and mark the nodes that are visited.
* Other than the marks, this behaves the same as iterate_on_range.
* Requires: supports_marks == true
* Performance: time=O(i+\log N) where i is the number of times f is called, and N is the number of elements in the omt.
* Notes:
* This function MAY be called concurrently by multiple threads, but
* not concurrently with any other non-const function.
*/
template<typename iterate_extra_t,
int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
int iterate_and_mark_range(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra);
/**
* Effect: Iterate over the values of the omt, from left to right, calling f on each value whose node has been marked.
* Other than the marks, this behaves the same as iterate.
* Requires: supports_marks == true
* Performance: time=O(i+\log N) where i is the number of times f is called, and N is the number of elements in the omt.
*/
template<typename iterate_extra_t,
int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
int iterate_over_marked(iterate_extra_t *const iterate_extra) const;
/**
* Effect: Delete all elements from the omt, whose nodes have been marked.
* Requires: supports_marks == true
* Performance: time=O(N + i\log N) where i is the number of marked elements, {c,sh}ould be faster
*/
void delete_all_marked(void);
/**
* Effect: Verify that the internal state of the marks in the tree are self-consistent.
* Crashes the system if the marks are in a bad state.
* Requires: supports_marks == true
* Performance: time=O(N)
* Notes:
* Even though this is a const function, it requires exclusive access.
* Rationale:
* The current implementation of the marks relies on a sort of
* "cache" bit representing the state of bits below it in the tree.
* This allows glass-box testing that these bits are correct.
*/
void verify_marks_consistent(void) const;
/**
* Effect: Iterate over the values of the omt, from left to right, calling f on each value.
* The first argument passed to f is a pointer to the value stored in the omt.
@ -388,16 +554,9 @@ public:
private:
typedef uint32_t node_idx;
enum {
NODE_NULL = UINT32_MAX
};
struct omt_node {
uint32_t weight;
node_idx left;
node_idx right;
omtdata_t value;
} __attribute__((__packed__));
typedef omt_internal::subtree_templated<supports_marks> subtree;
typedef omt_internal::omt_node_templated<omtdata_t, supports_marks> omt_node;
static_assert(std::is_pod<subtree>::value, "not POD");
struct omt_array {
uint32_t start_idx;
@ -406,9 +565,9 @@ private:
};
struct omt_tree {
node_idx root;
node_idx free_idx;
struct omt_node *nodes;
subtree root;
uint32_t free_idx;
omt_node *nodes;
};
bool is_array;
@ -423,7 +582,7 @@ private:
void create_internal(const uint32_t new_capacity);
uint32_t nweight(const node_idx idx) const;
uint32_t nweight(const subtree &subtree) const;
node_idx node_malloc(void);
@ -432,27 +591,27 @@ private:
void maybe_resize_array(const uint32_t n);
__attribute__((nonnull))
void fill_array_with_subtree_values(omtdata_t *const array, const node_idx tree_idx) const;
void fill_array_with_subtree_values(omtdata_t *const array, const subtree &subtree) const;
void convert_to_array(void);
__attribute__((nonnull))
void rebuild_from_sorted_array(node_idx *const n_idxp, const omtdata_t *const values, const uint32_t numvalues);
void rebuild_from_sorted_array(subtree *const subtree, const omtdata_t *const values, const uint32_t numvalues);
void convert_to_tree(void);
void maybe_resize_or_convert(const uint32_t n);
bool will_need_rebalance(const node_idx n_idx, const int leftmod, const int rightmod) const;
bool will_need_rebalance(const subtree &subtree, const int leftmod, const int rightmod) const;
__attribute__((nonnull))
void insert_internal(node_idx *const n_idxp, const omtdata_t &value, const uint32_t idx, node_idx **const rebalance_idx);
void insert_internal(subtree *const subtreep, const omtdata_t &value, const uint32_t idx, subtree **const rebalance_subtree);
void set_at_internal_array(const omtdata_t &value, const uint32_t idx);
void set_at_internal(const node_idx n_idx, const omtdata_t &value, const uint32_t idx);
void set_at_internal(const subtree &subtree, const omtdata_t &value, const uint32_t idx);
void delete_internal(node_idx *const n_idxp, const uint32_t idx, omt_node *const copyn, node_idx **const rebalance_idx);
void delete_internal(subtree *const subtreep, const uint32_t idx, omt_node *const copyn, subtree **const rebalance_subtree);
template<typename iterate_extra_t,
int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
@ -462,7 +621,7 @@ private:
template<typename iterate_extra_t,
int (*f)(omtdata_t *, const uint32_t, iterate_extra_t *const)>
void iterate_ptr_internal(const uint32_t left, const uint32_t right,
const node_idx n_idx, const uint32_t idx,
const subtree &subtree, const uint32_t idx,
iterate_extra_t *const iterate_extra);
template<typename iterate_extra_t,
@ -473,21 +632,34 @@ private:
template<typename iterate_extra_t,
int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
int iterate_internal(const uint32_t left, const uint32_t right,
const node_idx n_idx, const uint32_t idx,
const subtree &subtree, const uint32_t idx,
iterate_extra_t *const iterate_extra) const;
template<typename iterate_extra_t,
int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
int iterate_and_mark_range_internal(const uint32_t left, const uint32_t right,
const subtree &subtree, const uint32_t idx,
iterate_extra_t *const iterate_extra);
template<typename iterate_extra_t,
int (*f)(const omtdata_t &, const uint32_t, iterate_extra_t *const)>
int iterate_over_marked_internal(const subtree &subtree, const uint32_t idx,
iterate_extra_t *const iterate_extra) const;
uint32_t verify_marks_consistent_internal(const subtree &subtree, const bool allow_marks) const;
void fetch_internal_array(const uint32_t i, omtdataout_t *value) const;
void fetch_internal(const node_idx idx, const uint32_t i, omtdataout_t *value) const;
void fetch_internal(const subtree &subtree, const uint32_t i, omtdataout_t *value) const;
__attribute__((nonnull))
void fill_array_with_subtree_idxs(node_idx *const array, const node_idx tree_idx) const;
void fill_array_with_subtree_idxs(node_idx *const array, const subtree &subtree) const;
__attribute__((nonnull))
void rebuild_subtree_from_idxs(node_idx *const n_idxp, const node_idx *const idxs, const uint32_t numvalues);
void rebuild_subtree_from_idxs(subtree *const subtree, const node_idx *const idxs, const uint32_t numvalues);
__attribute__((nonnull))
void rebalance(node_idx *const n_idxp);
void rebalance(subtree *const subtree);
__attribute__((nonnull))
static void copyout(omtdata_t *const out, const omt_node *const n);
@ -507,7 +679,7 @@ private:
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_zero(const node_idx n_idx, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_zero(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
@ -515,7 +687,7 @@ private:
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_plus(const node_idx n_idx, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_plus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
@ -523,7 +695,7 @@ private:
template<typename omtcmp_t,
int (*h)(const omtdata_t &, const omtcmp_t &)>
int find_internal_minus(const node_idx n_idx, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
int find_internal_minus(const subtree &subtree, const omtcmp_t &extra, omtdataout_t *value, uint32_t *const idxp) const;
};
} // namespace toku

808
ft/omt.cc
View file

@ -5,809 +5,143 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
#include <ctype.h>
#include <errno.h>
#if defined(HAVE_MALLOC_H)
# include <malloc.h>
#elif defined(HAVE_SYS_MALLOC_H)
# include <sys/malloc.h>
#endif
#include <stdio.h>
#include <stdlib.h>
//#include <toku_assert.h>
#include <memory.h>
//#include <stdint.h>
#include <string.h>
#include "toku_assert.h"
#include "memory.h"
//#include <errno.h>
#include <db.h>
#include "omt.h"
#include "fttypes.h"
typedef uint32_t node_idx;
static const node_idx NODE_NULL = UINT32_MAX;
typedef struct omt_node *OMT_NODE;
struct omt_node {
uint32_t weight; /* Size of subtree rooted at this node (including this one). */
node_idx left; /* Index of left subtree. */
node_idx right; /* Index of right subtree. */
OMTVALUE value; /* The value stored in the node. */
} __attribute__((__packed__));
struct omt_array {
uint32_t start_idx;
uint32_t num_values;
OMTVALUE *values;
};
struct omt_tree {
node_idx root;
OMT_NODE nodes;
node_idx free_idx;
};
struct omt {
bool is_array;
uint32_t capacity;
union {
struct omt_array a;
struct omt_tree t;
} i;
};
static inline int
omt_create_no_array(OMT *omtp) {
OMT XMALLOC(result);
if (result==NULL) return ENOMEM;
result->is_array = true;
result->i.a.num_values = 0;
result->i.a.start_idx = 0;
*omtp = result;
return 0;
}
static int omt_create_internal(OMT *omtp, uint32_t num_starting_nodes) {
OMT result;
int r = omt_create_no_array(&result);
if (r) return r;
if (num_starting_nodes < 2) num_starting_nodes = 2;
result->capacity = 2*num_starting_nodes;
XMALLOC_N(result->capacity, result->i.a.values);
*omtp = result;
return 0;
}
int
toku_omt_create_steal_sorted_array(OMT *omtp, OMTVALUE **valuesp, uint32_t numvalues, uint32_t capacity) {
if (numvalues>capacity || !*valuesp) return EINVAL;
int r = omt_create_no_array(omtp);
if (r) return r;
OMT result = *omtp;
result->capacity = capacity;
result->i.a.num_values = numvalues;
result->i.a.values = *valuesp;
*valuesp = NULL; //Remove caller's reference.
OMT XMALLOC(omt);
omt->create_steal_sorted_array(valuesp, numvalues, capacity);
*omtp = omt;
return 0;
}
static inline uint32_t nweight(OMT omt, node_idx idx) {
if (idx==NODE_NULL) return 0;
else return (omt->i.t.nodes+idx)->weight;
}
static inline uint32_t omt_size(OMT omt) {
return omt->is_array ? omt->i.a.num_values : nweight(omt, omt->i.t.root);
}
static inline node_idx omt_node_malloc(OMT omt) {
assert(omt->i.t.free_idx < omt->capacity);
return omt->i.t.free_idx++;
}
static inline void omt_node_free(OMT omt, node_idx idx) {
assert(idx < omt->capacity);
}
static inline void fill_array_with_subtree_values(OMT omt, OMTVALUE *array, node_idx tree_idx) {
if (tree_idx==NODE_NULL) return;
OMT_NODE tree = omt->i.t.nodes+tree_idx;
fill_array_with_subtree_values(omt, array, tree->left);
array[nweight(omt, tree->left)] = tree->value;
fill_array_with_subtree_values(omt, array+nweight(omt, tree->left)+1, tree->right);
}
// Example: numvalues=4, halfway=2, left side is values of size 2
// right side is values+3 of size 1
// numvalues=3, halfway=1, left side is values of size 1
// right side is values+2 of size 1
// numvalues=2, halfway=1, left side is values of size 1
// right side is values+2 of size 0
// numvalues=1, halfway=0, left side is values of size 0
// right side is values of size 0.
static inline void rebuild_from_sorted_array(OMT omt, node_idx *n_idxp,
OMTVALUE *values, uint32_t numvalues) {
if (numvalues==0) {
*n_idxp = NODE_NULL;
} else {
uint32_t halfway = numvalues/2;
node_idx newidx = omt_node_malloc(omt);
OMT_NODE newnode = omt->i.t.nodes+newidx;
newnode->weight = numvalues;
newnode->value = values[halfway];
*n_idxp = newidx; // update everything before the recursive calls so the second call can be a tail call.
rebuild_from_sorted_array(omt, &newnode->left, values, halfway);
rebuild_from_sorted_array(omt, &newnode->right, values+halfway+1, numvalues-(halfway+1));
}
}
static inline int maybe_resize_array(OMT omt, uint32_t n) {
uint32_t new_size = n<=2 ? 4 : 2*n;
uint32_t room = omt->capacity - omt->i.a.start_idx;
if (room<n || omt->capacity/2>=new_size) {
OMTVALUE *XMALLOC_N(new_size, tmp_values);
if (tmp_values==NULL) return get_error_errno();
memcpy(tmp_values, omt->i.a.values+omt->i.a.start_idx,
omt->i.a.num_values*sizeof(*tmp_values));
omt->i.a.start_idx = 0;
omt->capacity = new_size;
toku_free(omt->i.a.values);
omt->i.a.values = tmp_values;
}
return 0;
}
static int omt_convert_to_tree(OMT omt) {
if (!omt->is_array) return 0;
uint32_t num_nodes = omt_size(omt);
uint32_t new_size = num_nodes*2;
new_size = new_size < 4 ? 4 : new_size;
OMT_NODE XMALLOC_N(new_size, new_nodes);
if (new_nodes==NULL) return get_error_errno();
OMTVALUE *values = omt->i.a.values;
OMTVALUE *tmp_values = values + omt->i.a.start_idx;
omt->is_array = false;
omt->i.t.nodes = new_nodes;
omt->capacity = new_size;
omt->i.t.free_idx = 0;
omt->i.t.root = NODE_NULL;
rebuild_from_sorted_array(omt, &omt->i.t.root, tmp_values, num_nodes);
toku_free(values);
return 0;
}
static int omt_convert_to_array(OMT omt) {
if (omt->is_array) return 0;
uint32_t num_values = omt_size(omt);
uint32_t new_size = 2*num_values;
new_size = new_size < 4 ? 4 : new_size;
OMTVALUE *XMALLOC_N(new_size, tmp_values);
if (tmp_values==NULL) return get_error_errno();
fill_array_with_subtree_values(omt, tmp_values, omt->i.t.root);
toku_free(omt->i.t.nodes);
omt->is_array = true;
omt->capacity = new_size;
omt->i.a.num_values = num_values;
omt->i.a.values = tmp_values;
omt->i.a.start_idx = 0;
return 0;
}
static inline int maybe_resize_or_convert(OMT omt, uint32_t n) {
if (omt->is_array) return maybe_resize_array(omt, n);
uint32_t new_size = n<=2 ? 4 : 2*n;
/* Rebuild/realloc the nodes array iff any of the following:
* The array is smaller than the number of elements we want.
* We are increasing the number of elements and there is no free space.
* The array is too large. */
//Rebuilding means we first turn it to an array.
//Lets pause at the array form.
uint32_t num_nodes = nweight(omt, omt->i.t.root);
if ((omt->capacity/2 >= new_size) ||
(omt->i.t.free_idx>=omt->capacity && num_nodes<n) ||
(omt->capacity<n)) {
return omt_convert_to_array(omt);
}
return 0;
}
static inline void fill_array_with_subtree_idxs(OMT omt, node_idx *array, node_idx tree_idx) {
if (tree_idx==NODE_NULL) return;
OMT_NODE tree = omt->i.t.nodes+tree_idx;
fill_array_with_subtree_idxs(omt, array, tree->left);
array[nweight(omt, tree->left)] = tree_idx;
fill_array_with_subtree_idxs(omt, array+nweight(omt, tree->left)+1, tree->right);
}
/* Reuses existing OMT_NODE structures (used for rebalancing). */
static inline void rebuild_subtree_from_idxs(OMT omt, node_idx *n_idxp, node_idx *idxs,
uint32_t numvalues) {
if (numvalues==0) {
*n_idxp=NODE_NULL;
} else {
uint32_t halfway = numvalues/2;
node_idx newidx = idxs[halfway];
OMT_NODE newnode = omt->i.t.nodes+newidx;
newnode->weight = numvalues;
// value is already in there.
rebuild_subtree_from_idxs(omt, &newnode->left, idxs, halfway);
rebuild_subtree_from_idxs(omt, &newnode->right, idxs+halfway+1, numvalues-(halfway+1));
*n_idxp = newidx;
}
}
static inline void rebalance(OMT omt, node_idx *n_idxp) {
node_idx idx = *n_idxp;
if (idx==omt->i.t.root) {
//Try to convert to an array.
//If this fails, (malloc) nothing will have changed.
//In the failure case we continue on to the standard rebalance
//algorithm.
int r = omt_convert_to_array(omt);
if (r==0) return;
}
OMT_NODE n = omt->i.t.nodes+idx;
node_idx *tmp_array;
size_t mem_needed = n->weight*sizeof(*tmp_array);
size_t mem_free = (omt->capacity-omt->i.t.free_idx)*sizeof(*omt->i.t.nodes);
bool malloced;
if (mem_needed<=mem_free) {
//There is sufficient free space at the end of the nodes array
//to hold enough node indexes to rebalance.
malloced = false;
tmp_array = (node_idx*)(omt->i.t.nodes+omt->i.t.free_idx);
}
else {
malloced = true;
XMALLOC_N(n->weight, tmp_array);
if (tmp_array==NULL) return; //Don't rebalance. Still a working tree.
}
fill_array_with_subtree_idxs(omt, tmp_array, idx);
rebuild_subtree_from_idxs(omt, n_idxp, tmp_array, n->weight);
if (malloced) toku_free(tmp_array);
}
static inline bool will_need_rebalance(OMT omt, node_idx n_idx, int leftmod, int rightmod) {
if (n_idx==NODE_NULL) return false;
OMT_NODE n = omt->i.t.nodes+n_idx;
// one of the 1's is for the root.
// the other is to take ceil(n/2)
uint32_t weight_left = nweight(omt, n->left) + leftmod;
uint32_t weight_right = nweight(omt, n->right) + rightmod;
return (bool)((1+weight_left < (1+1+weight_right)/2)
||
(1+weight_right < (1+1+weight_left)/2));
}
static inline void insert_internal(OMT omt, node_idx *n_idxp, OMTVALUE value, uint32_t index, node_idx **rebalance_idx) {
if (*n_idxp==NODE_NULL) {
assert(index==0);
node_idx newidx = omt_node_malloc(omt);
OMT_NODE newnode = omt->i.t.nodes+newidx;
newnode->weight = 1;
newnode->left = NODE_NULL;
newnode->right = NODE_NULL;
newnode->value = value;
*n_idxp = newidx;
} else {
node_idx idx = *n_idxp;
OMT_NODE n = omt->i.t.nodes+idx;
n->weight++;
if (index <= nweight(omt, n->left)) {
if (*rebalance_idx==NULL && will_need_rebalance(omt, idx, 1, 0)) {
*rebalance_idx = n_idxp;
}
insert_internal(omt, &n->left, value, index, rebalance_idx);
} else {
if (*rebalance_idx==NULL && will_need_rebalance(omt, idx, 0, 1)) {
*rebalance_idx = n_idxp;
}
uint32_t sub_index = index-nweight(omt, n->left)-1;
insert_internal(omt, &n->right, value, sub_index, rebalance_idx);
}
}
}
static inline void set_at_internal_array(OMT omt, OMTVALUE v, uint32_t index) {
omt->i.a.values[omt->i.a.start_idx+index] = v;
}
static inline void set_at_internal(OMT omt, node_idx n_idx, OMTVALUE v, uint32_t index) {
assert(n_idx!=NODE_NULL);
OMT_NODE n = omt->i.t.nodes+n_idx;
if (index<nweight(omt, n->left))
set_at_internal(omt, n->left, v, index);
else if (index==nweight(omt, n->left)) {
n->value = v;
} else {
set_at_internal(omt, n->right, v, index-nweight(omt, n->left)-1);
}
}
static inline void delete_internal(OMT omt, node_idx *n_idxp, uint32_t index, OMTVALUE *vp, node_idx **rebalance_idx) {
assert(*n_idxp!=NODE_NULL);
OMT_NODE n = omt->i.t.nodes+*n_idxp;
if (index < nweight(omt, n->left)) {
n->weight--;
if (*rebalance_idx==NULL && will_need_rebalance(omt, *n_idxp, -1, 0)) {
*rebalance_idx = n_idxp;
}
delete_internal(omt, &n->left, index, vp, rebalance_idx);
} else if (index == nweight(omt, n->left)) {
if (n->left==NODE_NULL) {
uint32_t idx = *n_idxp;
*n_idxp = n->right;
*vp = n->value;
omt_node_free(omt, idx);
} else if (n->right==NODE_NULL) {
uint32_t idx = *n_idxp;
*n_idxp = n->left;
*vp = n->value;
omt_node_free(omt, idx);
} else {
OMTVALUE zv;
// delete the successor of index, get the value, and store it here.
if (*rebalance_idx==NULL && will_need_rebalance(omt, *n_idxp, 0, -1)) {
*rebalance_idx = n_idxp;
}
delete_internal(omt, &n->right, 0, &zv, rebalance_idx);
n->value = zv;
n->weight--;
}
} else {
n->weight--;
if (*rebalance_idx==NULL && will_need_rebalance(omt, *n_idxp, 0, -1)) {
*rebalance_idx = n_idxp;
}
delete_internal(omt, &n->right, index-nweight(omt, n->left)-1, vp, rebalance_idx);
}
}
static inline void fetch_internal_array(OMT V, uint32_t i, OMTVALUE *v) {
*v = V->i.a.values[V->i.a.start_idx+i];
}
static inline void fetch_internal(OMT V, node_idx idx, uint32_t i, OMTVALUE *v) {
OMT_NODE n = V->i.t.nodes+idx;
if (i < nweight(V, n->left)) {
fetch_internal(V, n->left, i, v);
} else if (i == nweight(V, n->left)) {
*v = n->value;
} else {
fetch_internal(V, n->right, i-nweight(V, n->left)-1, v);
}
}
static inline int iterate_internal_array(OMT omt,
uint32_t left, uint32_t right,
int (*f)(OMTVALUE, uint32_t, void*), void*v) {
int r;
uint32_t i;
for (i = left; i < right; i++) {
r = f(omt->i.a.values[i+omt->i.a.start_idx], i, v);
if (r!=0) return r;
}
return 0;
}
static inline int iterate_internal(OMT omt, uint32_t left, uint32_t right,
node_idx n_idx, uint32_t idx,
int (*f)(OMTVALUE, uint32_t, void*), void*v) {
int r;
if (n_idx==NODE_NULL) return 0;
OMT_NODE n = omt->i.t.nodes+n_idx;
uint32_t idx_root = idx+nweight(omt,n->left);
if (left< idx_root && (r=iterate_internal(omt, left, right, n->left, idx, f, v))) return r;
if (left<=idx_root && idx_root<right && (r=f(n->value, idx_root, v))) return r;
if (idx_root+1<right) return iterate_internal(omt, left, right, n->right, idx_root+1, f, v);
return 0;
}
static inline int find_internal_zero_array(OMT omt, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, uint32_t *index) {
uint32_t min = omt->i.a.start_idx;
uint32_t limit = omt->i.a.start_idx + omt->i.a.num_values;
uint32_t best_pos = NODE_NULL;
uint32_t best_zero = NODE_NULL;
while (min!=limit) {
uint32_t mid = (min + limit) / 2;
int hv = h(omt->i.a.values[mid], extra);
if (hv<0) {
min = mid+1;
}
else if (hv>0) {
best_pos = mid;
limit = mid;
}
else {
best_zero = mid;
limit = mid;
}
}
if (best_zero!=NODE_NULL) {
//Found a zero
if (value!=NULL) *value = omt->i.a.values[best_zero];
*index = best_zero - omt->i.a.start_idx;
return 0;
}
if (best_pos!=NODE_NULL) *index = best_pos - omt->i.a.start_idx;
else *index = omt->i.a.num_values;
return DB_NOTFOUND;
}
static inline int find_internal_zero(OMT omt, node_idx n_idx, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, uint32_t *index)
// requires: index!=NULL
{
if (n_idx==NODE_NULL) {
*index = 0;
return DB_NOTFOUND;
}
OMT_NODE n = omt->i.t.nodes+n_idx;
int hv = h(n->value, extra);
if (hv<0) {
int r = find_internal_zero(omt, n->right, h, extra, value, index);
*index += nweight(omt, n->left)+1;
return r;
} else if (hv>0) {
return find_internal_zero(omt, n->left, h, extra, value, index);
} else {
int r = find_internal_zero(omt, n->left, h, extra, value, index);
if (r==DB_NOTFOUND) {
*index = nweight(omt, n->left);
if (value!=NULL) *value = n->value;
r = 0;
}
return r;
}
}
static inline int find_internal_plus_array(OMT omt, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, uint32_t *index) {
uint32_t min = omt->i.a.start_idx;
uint32_t limit = omt->i.a.start_idx + omt->i.a.num_values;
uint32_t best = NODE_NULL;
while (min!=limit) {
uint32_t mid = (min + limit) / 2;
int hv = h(omt->i.a.values[mid], extra);
if (hv>0) {
best = mid;
limit = mid;
}
else {
min = mid+1;
}
}
if (best==NODE_NULL) return DB_NOTFOUND;
if (value!=NULL) *value = omt->i.a.values[best];
*index = best - omt->i.a.start_idx;
return 0;
}
static inline int find_internal_minus_array(OMT omt, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, uint32_t *index) {
uint32_t min = omt->i.a.start_idx;
uint32_t limit = omt->i.a.start_idx + omt->i.a.num_values;
uint32_t best = NODE_NULL;
while (min!=limit) {
uint32_t mid = (min + limit) / 2;
int hv = h(omt->i.a.values[mid], extra);
if (hv<0) {
best = mid;
min = mid+1;
}
else {
limit = mid;
}
}
if (best==NODE_NULL) return DB_NOTFOUND;
if (value!=NULL) *value = omt->i.a.values[best];
*index = best - omt->i.a.start_idx;
return 0;
}
// If direction <0 then find the largest i such that h(V_i,extra)<0.
static inline int find_internal_minus(OMT omt, node_idx n_idx, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, uint32_t *index)
// requires: index!=NULL
{
if (n_idx==NODE_NULL) return DB_NOTFOUND;
OMT_NODE n = omt->i.t.nodes+n_idx;
int hv = h(n->value, extra);
if (hv<0) {
int r = find_internal_minus(omt, n->right, h, extra, value, index);
if (r==0) *index += nweight(omt, n->left)+1;
else if (r==DB_NOTFOUND) {
*index = nweight(omt, n->left);
if (value!=NULL) *value = n->value;
r = 0;
}
return r;
} else {
return find_internal_minus(omt, n->left, h, extra, value, index);
}
}
// If direction >0 then find the smallest i such that h(V_i,extra)>0.
static inline int find_internal_plus(OMT omt, node_idx n_idx, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, uint32_t *index)
// requires: index!=NULL
{
if (n_idx==NODE_NULL) return DB_NOTFOUND;
OMT_NODE n = omt->i.t.nodes+n_idx;
int hv = h(n->value, extra);
if (hv>0) {
int r = find_internal_plus(omt, n->left, h, extra, value, index);
if (r==DB_NOTFOUND) {
*index = nweight(omt, n->left);
if (value!=NULL) *value = n->value;
r = 0;
}
return r;
} else {
int r = find_internal_plus(omt, n->right, h, extra, value, index);
if (r==0) *index += nweight(omt, n->left)+1;
return r;
}
}
//TODO: Put all omt API functions here.
int toku_omt_create (OMT *omtp) {
return omt_create_internal(omtp, 2);
OMT XMALLOC(omt);
omt->create();
*omtp = omt;
return 0;
}
void toku_omt_destroy(OMT *omtp) {
OMT omt=*omtp;
if (omt->is_array) toku_free(omt->i.a.values);
else toku_free(omt->i.t.nodes);
omt->destroy();
toku_free(omt);
*omtp=NULL;
}
uint32_t toku_omt_size(OMT V) {
return omt_size(V);
return V->size();
}
int toku_omt_create_from_sorted_array(OMT *omtp, OMTVALUE *values, uint32_t numvalues) {
OMT omt = NULL;
int r;
if ((r = omt_create_internal(&omt, numvalues))) return r;
memcpy(omt->i.a.values, values, numvalues*sizeof(*values));
omt->i.a.num_values = numvalues;
OMT XMALLOC(omt);
omt->create_from_sorted_array(values, numvalues);
*omtp=omt;
return 0;
}
int toku_omt_insert_at(OMT omt, OMTVALUE value, uint32_t index) {
int r;
if (index>omt_size(omt)) return EINVAL;
if ((r=maybe_resize_or_convert(omt, 1+omt_size(omt)))) return r;
if (omt->is_array && index!=omt->i.a.num_values &&
(index!=0 || omt->i.a.start_idx==0)) {
if ((r=omt_convert_to_tree(omt))) return r;
}
if (omt->is_array) {
if (index==omt->i.a.num_values) {
omt->i.a.values[omt->i.a.start_idx+(omt->i.a.num_values)] = value;
}
else {
omt->i.a.values[--omt->i.a.start_idx] = value;
}
omt->i.a.num_values++;
}
else {
node_idx* rebalance_idx = NULL;
insert_internal(omt, &omt->i.t.root, value, index, &rebalance_idx);
if (rebalance_idx) rebalance(omt, rebalance_idx);
}
return 0;
return omt->insert_at(value, index);
}
int toku_omt_set_at (OMT omt, OMTVALUE value, uint32_t index) {
if (index>=omt_size(omt)) return EINVAL;
if (omt->is_array) {
set_at_internal_array(omt, value, index);
}
else {
set_at_internal(omt, omt->i.t.root, value, index);
}
return 0;
return omt->set_at(value, index);
}
int toku_omt_delete_at(OMT omt, uint32_t index) {
OMTVALUE v;
int r;
if (index>=omt_size(omt)) return EINVAL;
if ((r=maybe_resize_or_convert(omt, omt_size(omt)-1))) return r;
if (omt->is_array && index!=0 && index!=omt->i.a.num_values-1) {
if ((r=omt_convert_to_tree(omt))) return r;
}
if (omt->is_array) {
//Testing for 0 does not rule out it being the last entry.
//Test explicitly for num_values-1
if (index!=omt->i.a.num_values-1) omt->i.a.start_idx++;
omt->i.a.num_values--;
}
else {
node_idx* rebalance_idx = NULL;
delete_internal(omt, &omt->i.t.root, index, &v, &rebalance_idx);
if (rebalance_idx) rebalance(omt, rebalance_idx);
}
return 0;
return omt->delete_at(index);
}
int toku_omt_fetch(OMT V, uint32_t i, OMTVALUE *v) {
if (i>=omt_size(V)) return EINVAL;
if (V->is_array) {
fetch_internal_array(V, i, v);
}
else {
fetch_internal(V, V->i.t.root, i, v);
}
return 0;
int toku_omt_fetch(OMT omt, uint32_t i, OMTVALUE *v) {
return omt->fetch(i, v);
}
struct functor {
int (*f)(OMTVALUE, uint32_t, void *);
void *v;
};
static_assert(std::is_pod<functor>::value, "not POD");
int call_functor(const OMTVALUE &v, uint32_t idx, functor *const ftor);
int call_functor(const OMTVALUE &v, uint32_t idx, functor *const ftor) {
return ftor->f(const_cast<OMTVALUE>(v), idx, ftor->v);
}
int toku_omt_iterate(OMT omt, int (*f)(OMTVALUE, uint32_t, void*), void*v) {
if (omt->is_array) {
return iterate_internal_array(omt, 0, omt_size(omt), f, v);
}
return iterate_internal(omt, 0, nweight(omt, omt->i.t.root), omt->i.t.root, 0, f, v);
struct functor ftor = { .f = f, .v = v };
return omt->iterate<functor, call_functor>(&ftor);
}
int toku_omt_iterate_on_range(OMT omt, uint32_t left, uint32_t right, int (*f)(OMTVALUE, uint32_t, void*), void*v) {
if (right>omt_size(omt)) return EINVAL;
if (omt->is_array) {
return iterate_internal_array(omt, left, right, f, v);
}
return iterate_internal(omt, left, right, omt->i.t.root, 0, f, v);
struct functor ftor = { .f = f, .v = v };
return omt->iterate_on_range<functor, call_functor>(left, right, &ftor);
}
struct heftor {
int (*h)(OMTVALUE, void *v);
void *v;
};
static_assert(std::is_pod<heftor>::value, "not POD");
int call_heftor(const OMTVALUE &v, const heftor &htor);
int call_heftor(const OMTVALUE &v, const heftor &htor) {
return htor.h(const_cast<OMTVALUE>(v), htor.v);
}
int toku_omt_insert(OMT omt, OMTVALUE value, int(*h)(OMTVALUE, void*v), void *v, uint32_t *index) {
int r;
uint32_t idx;
r = toku_omt_find_zero(omt, h, v, NULL, &idx);
if (r==0) {
if (index) *index = idx;
return DB_KEYEXIST;
}
if (r!=DB_NOTFOUND) return r;
if ((r = toku_omt_insert_at(omt, value, idx))) return r;
if (index) *index = idx;
return 0;
struct heftor htor = { .h = h, .v = v };
return omt->insert<heftor, call_heftor>(value, htor, index);
}
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, uint32_t *index) {
uint32_t tmp_index;
if (index==NULL) index=&tmp_index;
int r;
if (V->is_array) {
r = find_internal_zero_array(V, h, extra, value, index);
}
else {
r = find_internal_zero(V, V->i.t.root, h, extra, value, index);
}
return r;
struct heftor htor = { .h = h, .v = extra };
return V->find_zero<heftor, call_heftor>(htor, value, index);
}
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, uint32_t *index) {
uint32_t tmp_index;
int r;
if (index==NULL) index=&tmp_index;
if (direction==0) {
abort();
} else if (direction<0) {
if (V->is_array) {
r = find_internal_minus_array(V, h, extra, value, index);
}
else {
r = find_internal_minus(V, V->i.t.root, h, extra, value, index);
}
struct heftor htor = { .h = h, .v = extra };
return V->find<heftor, call_heftor>(htor, direction, value, index);
}
int toku_omt_split_at(OMT omt, OMT *newomtp, uint32_t index) {
OMT XMALLOC(newomt);
int r = omt->split_at(newomt, index);
if (r != 0) {
toku_free(newomt);
} else {
if (V->is_array) {
r = find_internal_plus_array(V, h, extra, value, index);
}
else {
r = find_internal_plus( V, V->i.t.root, h, extra, value, index);
}
*newomtp = newomt;
}
return r;
}
int toku_omt_split_at(OMT omt, OMT *newomtp, uint32_t index) {
int r;
OMT newomt;
if (index>omt_size(omt)) return EINVAL;
if ((r=omt_convert_to_array(omt))) return r;
uint32_t newsize = omt_size(omt)-index;
if ((r=toku_omt_create_from_sorted_array(&newomt,
omt->i.a.values+omt->i.a.start_idx+index,
newsize))) return r;
omt->i.a.num_values = index;
if ((r=maybe_resize_array(omt, index))) {
//Restore size.
omt->i.a.num_values += newsize;
toku_omt_destroy(&newomt);
return r;
}
*newomtp = newomt;
return 0;
}
int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) {
int r;
OMT newomt = 0;
uint32_t newsize = omt_size(leftomt)+omt_size(rightomt);
if ((r = omt_create_internal(&newomt, newsize))) return r;
if (leftomt->is_array) {
memcpy(newomt->i.a.values,
leftomt->i.a.values+leftomt->i.a.start_idx,
leftomt->i.a.num_values*sizeof(*newomt->i.a.values));
}
else {
fill_array_with_subtree_values(leftomt, newomt->i.a.values, leftomt->i.t.root);
}
if (rightomt->is_array) {
memcpy(newomt->i.a.values+omt_size(leftomt),
rightomt->i.a.values+rightomt->i.a.start_idx,
rightomt->i.a.num_values*sizeof(*newomt->i.a.values));
}
else {
fill_array_with_subtree_values(rightomt, newomt->i.a.values+omt_size(leftomt), rightomt->i.t.root);
}
newomt->i.a.num_values = newsize;
toku_omt_destroy(&leftomt);
toku_omt_destroy(&rightomt);
OMT XMALLOC(newomt);
newomt->merge(leftomt, rightomt);
toku_free(leftomt);
toku_free(rightomt);
*newomtp = newomt;
return 0;
}
int toku_omt_clone_noptr(OMT *dest, OMT src) {
uint32_t size = omt_size(src);
if (size == 0) {
toku_omt_create(dest);
return 0;
}
OMTVALUE *XMALLOC_N(size, a);
if (src->is_array) {
memcpy(a, src->i.a.values + src->i.a.start_idx, size * (sizeof *src->i.a.values));
} else {
fill_array_with_subtree_values(src, a, src->i.t.root);
}
int r = toku_omt_create_steal_sorted_array(dest, &a, size, size);
if (r != 0) { goto err; }
OMT XMALLOC(omt);
omt->clone(*src);
*dest = omt;
return 0;
err:
toku_free(a);
return r;
}
void toku_omt_clear(OMT omt) {
if (omt->is_array) {
omt->i.a.start_idx = 0;
omt->i.a.num_values = 0;
}
else {
omt->i.t.free_idx = 0;
omt->i.t.root = NODE_NULL;
int r = omt_convert_to_array(omt);
assert((!omt->is_array) == (r!=0));
//If we fail to convert (malloc), then nothing has changed.
//Continue anyway.
}
omt->clear();
}
unsigned long toku_omt_memory_size (OMT omt) {
if (omt->is_array) {
return sizeof(*omt)+omt->capacity*sizeof(omt->i.a.values[0]);
}
return sizeof(*omt)+omt->capacity*sizeof(omt->i.t.nodes[0]);
size_t toku_omt_memory_size (OMT omt) {
return omt->memory_size();
}

View file

@ -57,8 +57,9 @@
// The programming API:
//typedef struct value *OMTVALUE; // A slight improvement over using void*.
#include "omt-tmpl.h"
typedef void *OMTVALUE;
typedef struct omt *OMT;
typedef struct toku::omt<OMTVALUE> *OMT;
int toku_omt_create (OMT *omtp);
@ -321,7 +322,7 @@ void toku_omt_clear(OMT omt);
// Note: Will not reallocate or resize any memory, since returning void precludes calling malloc.
// Performance: time=O(1)
unsigned long toku_omt_memory_size (OMT omt);
size_t toku_omt_memory_size (OMT omt);
// Effect: Return the size (in bytes) of the omt, as it resides in main memory. Don't include any of the OMTVALUES.

View file

@ -128,12 +128,10 @@ done:
return 0;
}
static int find_ft_from_filenum (OMTVALUE v, void *filenumvp) {
FILENUM *CAST_FROM_VOIDP(filenump, filenumvp);
FT CAST_FROM_VOIDP(h, v);
static int find_ft_from_filenum (const FT &h, const FILENUM &filenum) {
FILENUM thisfnum = toku_cachefile_filenum(h->cf);
if (thisfnum.fileid<filenump->fileid) return -1;
if (thisfnum.fileid>filenump->fileid) return +1;
if (thisfnum.fileid<filenum.fileid) return -1;
if (thisfnum.fileid>filenum.fileid) return +1;
return 0;
}
@ -155,12 +153,10 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key,
}
assert(r==0);
OMTVALUE hv;
hv=NULL;
r = toku_omt_find_zero(txn->open_fts, find_ft_from_filenum, &filenum, &hv, NULL);
assert(r==0);
FT h;
CAST_FROM_VOIDP(h, hv);
h = NULL;
r = txn->open_fts.find_zero<FILENUM, find_ft_from_filenum>(filenum, &h, NULL);
assert(r==0);
if (oplsn.lsn != 0) { // if we are executing the recovery algorithm
LSN treelsn = toku_ft_checkpoint_lsn(h);
@ -530,12 +526,10 @@ toku_rollback_change_fdescriptor(FILENUM filenum,
// noted it,
assert(r == 0);
OMTVALUE ftv;
ftv = NULL;
r = toku_omt_find_zero(txn->open_fts, find_ft_from_filenum, &filenum, &ftv, NULL);
assert(r == 0);
FT ft;
CAST_FROM_VOIDP(ft, ftv);
ft = NULL;
r = txn->open_fts.find_zero<FILENUM, find_ft_from_filenum>(filenum, &ft, NULL);
assert(r == 0);
DESCRIPTOR_S d;
toku_fill_dbt(&d.dbt, old_descriptor.data, old_descriptor.len);

View file

@ -41,10 +41,8 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
}
static int
note_ft_used_in_txns_parent(OMTVALUE ftv, uint32_t UU(index), void *txnv) {
TOKUTXN CAST_FROM_VOIDP(child, txnv);
note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) {
TOKUTXN parent = child->parent;
FT CAST_FROM_VOIDP(ft, ftv);
toku_txn_maybe_note_ft(parent, ft);
if (ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
//Pass magic "no rollback needed" flag to parent.
@ -194,7 +192,7 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
}
// Note the open brts, the omts must be merged
r = toku_omt_iterate(txn->open_fts, note_ft_used_in_txns_parent, txn);
r = txn->open_fts.iterate<struct tokutxn, note_ft_used_in_txns_parent>(txn);
assert(r==0);
// Merge the list of headers that must be checkpointed before commit

View file

@ -127,9 +127,7 @@ void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
}
}
static int find_filenum (OMTVALUE v, void *hv) {
FT CAST_FROM_VOIDP(h, v);
FT CAST_FROM_VOIDP(hfind, hv);
static int find_filenum (const FT &h, const FT &hfind) {
FILENUM fnum = toku_cachefile_filenum(h->cf);
FILENUM fnumfind = toku_cachefile_filenum(hfind->cf);
if (fnum.fileid<fnumfind.fileid) return -1;
@ -140,15 +138,15 @@ static int find_filenum (OMTVALUE v, void *hv) {
//Notify a transaction that it has touched a brt.
void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) {
toku_txn_lock(txn);
OMTVALUE ftv;
FT ftv;
uint32_t idx;
int r = toku_omt_find_zero(txn->open_fts, find_filenum, ft, &ftv, &idx);
int r = txn->open_fts.find_zero<FT, find_filenum>(ft, &ftv, &idx);
if (r == 0) {
// already there
assert((FT) ftv == ft);
assert(ftv == ft);
goto exit;
}
r = toku_omt_insert_at(txn->open_fts, ft, idx);
r = txn->open_fts.insert_at(ft, idx);
assert_zero(r);
// TODO(leif): if there's anything that locks the reflock and then
// the txn lock, this may deadlock, because it grabs the reflock.

414
ft/tests/marked-omt-test.cc Normal file
View file

@ -0,0 +1,414 @@
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: omt-test.cc 46193 2012-07-26 17:12:18Z yfogel $"
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <db.h>
#include <string.h>
#include <algorithm>
#include "memory.h"
#include "toku_pthread.h"
#include "toku_random.h"
#include "rwlock.h"
#include "omt-tmpl.h"
#include "test.h"
namespace toku {
namespace test {
static inline uint32_t fudge(const uint32_t x) { return x + 300; }
static inline uint32_t defudge(const uint32_t fx) { return fx - 300; }
static int test_iterator(const uint32_t &v, const uint32_t idx, bool *const UU(unused)) {
invariant(defudge(v) == idx);
return 0;
}
static int check_iterator_before(const uint32_t &v, const uint32_t idx, bool *const called) {
invariant(defudge(v) == idx);
invariant(idx % 10 < 5);
called[idx] = true;
return 0;
}
static int check_iterator_after(const uint32_t &v, const uint32_t UU(idx), bool *const called) {
invariant(defudge(v) % 10 >= 5);
called[defudge(v)] = true;
return 0;
}
static int die(const uint32_t &UU(v), const uint32_t UU(idx), void *const UU(unused)) {
abort();
return 0; // hahaha
}
static void run_test(uint32_t nelts) {
assert(nelts % 10 == 0); // run_test depends on nelts being a multiple of 10
omt<uint32_t, uint32_t, true> omt;
omt.create();
omt.verify_marks_consistent();
for (uint32_t i = 0; i < nelts; ++i) {
omt.insert_at(fudge(i), i);
}
omt.verify_marks_consistent();
int r;
for (uint32_t i = 0; i < nelts / 10; ++i) {
r = omt.iterate_and_mark_range<bool, test_iterator>(i * 10, i * 10 + 5, nullptr);
invariant_zero(r);
omt.verify_marks_consistent();
}
bool called[nelts];
ZERO_ARRAY(called);
r = omt.iterate_over_marked<bool, check_iterator_before>(called);
invariant_zero(r);
for (uint32_t i = 0; i < nelts; ++i) {
if (i % 10 < 5) {
invariant(called[i]);
} else {
invariant(!called[i]);
}
}
omt.verify_marks_consistent();
invariant(omt.size() == nelts);
omt.delete_all_marked();
omt.verify_marks_consistent();
invariant(omt.size() * 2 == nelts);
r = omt.iterate_over_marked<void, die>(nullptr);
invariant_zero(r);
ZERO_ARRAY(called);
r = omt.iterate<bool, check_iterator_after>(called);
invariant_zero(r);
omt.verify_marks_consistent();
for (uint32_t i = 0; i < nelts; ++i) {
if (i % 10 < 5) {
invariant(!called[i]);
} else {
invariant(called[i]);
}
}
omt.destroy();
}
typedef omt<uint32_t, uint32_t, true> stress_omt;
static int int_heaviside(const uint32_t &v, const uint32_t &target) {
return (v > target) - (v < target);
}
struct stress_shared {
stress_omt *omt;
volatile bool running;
struct rwlock lock;
toku_mutex_t mutex;
int num_marker_threads;
};
struct reader_extra {
int tid;
stress_shared *shared;
uint64_t iterations;
uint64_t last_iteration;
char buf_read[8];
char buf_write[8];
struct random_data rand_read;
struct random_data rand_write;
};
static void generate_range(struct random_data *rng, const struct stress_shared &shared, uint32_t *begin, uint32_t *limit) {
const uint32_t nelts = shared.omt->size();
double range_limit_d = nelts;
range_limit_d /= 1000;
range_limit_d /= shared.num_marker_threads;
range_limit_d += 1;
uint32_t range_limit = static_cast<uint32_t>(range_limit_d);
if (range_limit < 5) {
range_limit = 5;
}
if (range_limit > 1000) {
range_limit = 1000;
}
*begin = rand_choices(rng, nelts);
if (*begin + range_limit > nelts) {
range_limit = nelts - *begin;
}
*limit = *begin + rand_choices(rng, range_limit);
}
struct pair {
uint32_t begin;
uint32_t limit;
};
static int mark_read_iterator(const uint32_t &UU(v), const uint32_t idx, struct pair * const pair) {
invariant(defudge(v) == idx);
invariant(idx >= pair->begin);
invariant(idx < pair->limit);
return 0;
}
static void *stress_mark_worker(void *extrav) {
struct reader_extra *CAST_FROM_VOIDP(extra, extrav);
struct stress_shared &shared = *extra->shared;
toku_mutex_t &mutex = shared.mutex;
while (shared.running) {
toku_mutex_lock(&mutex);
rwlock_read_lock(&shared.lock, &mutex);
toku_mutex_unlock(&mutex);
struct pair range;
generate_range(&extra->rand_read, shared, &range.begin, &range.limit);
shared.omt->iterate_and_mark_range<pair, mark_read_iterator>(range.begin, range.limit, &range);
++extra->iterations;
toku_mutex_lock(&mutex);
rwlock_read_unlock(&shared.lock);
toku_mutex_unlock(&mutex);
usleep(1);
}
return nullptr;
}
template<typename T>
class array_ftor {
int m_count;
T *m_array;
public:
array_ftor(int size) : m_count(0) {
XMALLOC_N(size, m_array);
}
~array_ftor() {
toku_free(m_array);
}
void operator() (const T &x) { m_array[m_count++] = x; }
template<class callback_t>
void iterate(callback_t &cb) const {
for (int i = 0; i < m_count; ++i) {
cb(m_array[i]);
}
}
};
static int use_array_ftor(const uint32_t &v, const uint32_t UU(idx), array_ftor<uint32_t> *const fp) {
array_ftor<uint32_t> &f = *fp;
f(v);
return 0;
}
class inserter {
stress_omt *m_omt;
public:
inserter(stress_omt *omt) : m_omt(omt) {}
void operator() (const uint32_t &x) {
m_omt->insert<uint32_t, int_heaviside>(x, x, nullptr);
}
};
/*
* split the range evenly/not evenly between marker threads
* context tells it the range
* context also holds iteration number
*
* N threads
* N 'contexts' holds iteration number, seed
*
* create rng based on seed
* loop:
* generate random range. Mark that range, increment iteration number
*
*
*
*
* for each context
* create rng based on context->last_seed
* loop (iteration number times)
* mark (in array) random range
* context->last_seed := context->seed
* check the array and the omt
*
*/
static void simulate_reader_marks_on_array(struct reader_extra *const reader, const struct stress_shared &shared, bool *const should_be_marked) {
if (verbose) {
fprintf(stderr, "thread %d ran %" PRIu64 " iterations\n", reader->tid, reader->iterations - reader->last_iteration);
}
for (; reader->last_iteration < reader->iterations; ++reader->last_iteration) {
uint32_t begin;
uint32_t limit;
generate_range(&reader->rand_write, shared, &begin, &limit);
for (uint32_t i = begin; i < limit; i++) {
should_be_marked[i] = true;
}
}
}
static int copy_marks(const uint32_t &v, const uint32_t idx, bool * const is_marked) {
invariant(defudge(v) == idx);
is_marked[idx] = true;
return 0;
}
static inline uint32_t count_true(const bool *const bools, uint32_t n) {
uint32_t count = 0;
for (uint32_t i = 0; i < n; ++i) {
if (bools[i]) {
++count;
}
}
return count;
}
static void stress_deleter(struct reader_extra *const readers, int num_marker_threads, stress_omt *omt) {
// Verify (iterate_over_marked) agrees exactly with iterate_and_mark_range (multithreaded)
stress_shared &shared = *readers[0].shared;
bool should_be_marked[omt->size()];
ZERO_ARRAY(should_be_marked);
for (int i = 0; i < num_marker_threads; i++) {
simulate_reader_marks_on_array(&readers[i], shared, should_be_marked);
}
bool is_marked_according_to_iterate[omt->size()];
ZERO_ARRAY(is_marked_according_to_iterate);
omt->verify_marks_consistent();
omt->iterate_over_marked<bool, copy_marks>(&is_marked_according_to_iterate[0]);
omt->verify_marks_consistent();
invariant(!memcmp(should_be_marked, is_marked_according_to_iterate, sizeof(should_be_marked)));
if (verbose) {
double frac_marked = count_true(should_be_marked, omt->size());
frac_marked /= omt->size();
fprintf(stderr, "Marked: %0.4f\n", frac_marked);
omt->verify_marks_consistent();
}
array_ftor<uint32_t> aftor(omt->size());
omt->iterate_over_marked<array_ftor<uint32_t>, use_array_ftor>(&aftor);
omt->delete_all_marked();
omt->verify_marks_consistent();
omt->iterate_over_marked<void, die>(nullptr);
inserter ins(omt);
aftor.iterate(ins);
omt->verify_marks_consistent();
}
static void *stress_delete_worker(void *extrav) {
reader_extra *CAST_FROM_VOIDP(readers, extrav);
stress_shared &shared = *readers[0].shared;
int num_marker_threads = shared.num_marker_threads;
toku_mutex_t &mutex = shared.mutex;
const double repetitions = 20;
for (int i = 0; i < repetitions; ++i) {
// sleep 0 - 0.15s
// early iterations sleep for a short time
// later iterations sleep longer
int sleep_for = 1000 * 100 * (1.5 * (i+1) / repetitions);
usleep(sleep_for);
toku_mutex_lock(&mutex);
rwlock_write_lock(&shared.lock, &mutex);
toku_mutex_unlock(&mutex);
stress_deleter(readers, num_marker_threads, shared.omt);
toku_mutex_lock(&mutex);
rwlock_write_unlock(&shared.lock);
toku_mutex_unlock(&mutex);
}
__sync_bool_compare_and_swap(&shared.running, true, false);
return nullptr;
}
static void stress_test(int nelts) {
stress_omt omt;
omt.create();
for (int i = 0; i < nelts; ++i) {
omt.insert_at(fudge(i), i);
}
const int num_marker_threads = 5;
struct stress_shared extra;
extra.omt = &omt;
toku_mutex_init(&extra.mutex, NULL);
rwlock_init(&extra.lock);
extra.running = true;
extra.num_marker_threads = num_marker_threads;
struct reader_extra readers[num_marker_threads];
ZERO_ARRAY(readers);
srandom(time(NULL));
toku_pthread_t marker_threads[num_marker_threads];
for (int i = 0; i < num_marker_threads; ++i) {
struct reader_extra &reader = readers[i];
reader.tid = i;
reader.shared = &extra;
int r;
int seed = random();
r = myinitstate_r(seed, reader.buf_read, 8, &reader.rand_read);
invariant_zero(r);
r = myinitstate_r(seed, reader.buf_write, 8, &reader.rand_write);
invariant_zero(r);
toku_pthread_create(&marker_threads[i], NULL, stress_mark_worker, &reader);
}
toku_pthread_t deleter_thread;
toku_pthread_create(&deleter_thread, NULL, stress_delete_worker, &readers[0]);
toku_pthread_join(deleter_thread, NULL);
for (int i = 0; i < num_marker_threads; ++i) {
toku_pthread_join(marker_threads[i], NULL);
}
rwlock_destroy(&extra.lock);
toku_mutex_destroy(&extra.mutex);
omt.destroy();
}
} // end namespace test
} // end namespace toku
int test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
for (int i = 10; i <= 80; i*=2) {
toku::test::run_test(i);
}
toku::test::run_test(9000);
toku::test::stress_test(1000 * 100);
return 0;
}

View file

@ -214,7 +214,7 @@ default_parse_args (int argc, const char *argv[]) {
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0],"-v")==0) {
verbose=1;
++verbose;
} else if (strcmp(argv[0],"-q")==0) {
verbose=0;
} else {

View file

@ -127,11 +127,8 @@ toku_txn_create_txn (
}
assert(logger->rollback_cachefile);
OMT open_fts;
{
int r = toku_omt_create(&open_fts);
assert_zero(r);
}
omt<FT> open_fts;
open_fts.create();
struct txn_roll_info roll_info = {
.num_rollback_nodes = 0,
@ -401,13 +398,10 @@ void toku_txn_close_txn(TOKUTXN txn) {
toku_txn_destroy_txn(txn);
}
static int remove_txn (OMTVALUE hv, uint32_t UU(idx), void *txnv)
static int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn)
// Effect: This function is called on every open FT that a transaction used.
// This function removes the transaction from that FT.
{
FT CAST_FROM_VOIDP(h, hv);
TOKUTXN CAST_FROM_VOIDP(txn, txnv);
if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) {
h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
}
@ -421,7 +415,7 @@ static int remove_txn (OMTVALUE hv, uint32_t UU(idx), void *txnv)
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_fts, remove_txn, txn);
txn->open_fts.iterate<struct tokutxn, remove_txn>(txn);
}
void toku_txn_complete_txn(TOKUTXN txn) {
@ -437,9 +431,7 @@ void toku_txn_complete_txn(TOKUTXN txn) {
}
void toku_txn_destroy_txn(TOKUTXN txn) {
if (txn->open_fts) {
toku_omt_destroy(&txn->open_fts);
}
txn->open_fts.destroy();
xids_destroy(&txn->xids);
toku_mutex_destroy(&txn->txn_lock);
toku_free(txn);
@ -528,7 +520,7 @@ toku_txn_is_read_only(TOKUTXN txn) {
// Did no work.
invariant(txn->roll_info.num_rollentries == 0);
invariant(txn->do_fsync_lsn.lsn == ZERO_LSN.lsn);
invariant(toku_omt_size(txn->open_fts) == 0);
invariant(txn->open_fts.size() == 0);
invariant(txn->num_pin==0);
return true;
}

92
portability/toku_random.h Normal file
View file

@ -0,0 +1,92 @@
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: omt-test.cc 46193 2012-07-26 17:12:18Z yfogel $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ifndef TOKU_RANDOM_H
#define TOKU_RANDOM_H
#include <config.h>
#include <toku_portability.h>
#include <toku_assert.h>
#include <stdlib.h>
#include <stdint.h>
#include <limits.h>
namespace toku {
#if defined(HAVE_RANDOM_R)
// Definition of randu62 and randu64 assume myrandom_r generates 31 low-order bits
static_assert(RAND_MAX == INT32_MAX, "Unexpected RAND_MAX");
static inline int
myinitstate_r(unsigned int seed, char *statebuf, size_t statelen, struct random_data *buf)
{
return initstate_r(seed, statebuf, statelen, buf);
}
static inline int32_t
myrandom_r(struct random_data *buf)
{
int32_t x;
int r = random_r(buf, &x);
lazy_assert_zero(r);
return x;
}
#elif defined(HAVE_NRAND48)
struct random_data {
unsigned short xsubi[3];
};
static int
myinitstate_r(unsigned int seed, char *UU(statebuf), size_t UU(statelen), struct random_data *buf)
{
buf->xsubi[0] = (seed & 0xffff0000) >> 16;
buf->xsubi[0] = (seed & 0x0000ffff);
buf->xsubi[2] = (seed & 0x00ffff00) >> 8;
return 0;
}
static inline int32_t
myrandom_r(struct random_data *buf)
{
int32_t x = nrand48(buf->xsubi);
return x;
}
#else
# error "no suitable reentrant random function available (checked random_r and nrand48)"
#endif
static inline uint64_t
randu62(struct random_data *buf)
{
uint64_t a = myrandom_r(buf);
uint64_t b = myrandom_r(buf);
return (a | (b << 31));
}
static inline uint64_t
randu64(struct random_data *buf)
{
uint64_t r62 = randu62(buf);
uint64_t c = myrandom_r(buf);
return (r62 | ((c & 0x3) << 62));
}
static inline uint32_t
rand_choices(struct random_data *buf, uint32_t choices) {
invariant(choices >= 2);
invariant(choices < INT32_MAX);
uint32_t bits = 2;
while (bits < choices) {
bits *= 2;
}
--bits;
uint32_t result;
do {
result = myrandom_r(buf) & bits;
} while (result >= choices);
return result;
}
} // end namespace toku
#endif // TOKU_RANDOM_H

View file

@ -335,7 +335,7 @@ default_parse_args (int argc, char * const argv[]) {
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0],"-v")==0) {
verbose=1;
++verbose;
} else if (strcmp(argv[0],"-q")==0) {
verbose=0;
} else {

View file

@ -29,52 +29,9 @@
#include <valgrind/drd.h>
#include <math.h>
#if defined(HAVE_RANDOM_R)
static inline int32_t
myrandom_r(struct random_data *buf)
{
int32_t x;
int r = random_r(buf, &x);
CKERR(r);
return x;
}
#elif defined(HAVE_NRAND48)
struct random_data {
unsigned short xsubi[3];
};
static int
initstate_r(unsigned int seed, char *UU(statebuf), size_t UU(statelen), struct random_data *buf)
{
buf->xsubi[0] = (seed & 0xffff0000) >> 16;
buf->xsubi[0] = (seed & 0x0000ffff);
buf->xsubi[2] = (seed & 0x00ffff00) >> 8;
return 0;
}
static inline int32_t
myrandom_r(struct random_data *buf)
{
int32_t x = nrand48(buf->xsubi);
return x;
}
#else
# error "no suitable reentrant random function available (checked random_r and nrand48)"
#endif
#include "toku_random.h"
static inline uint64_t
randu62(struct random_data *buf)
{
uint64_t a = myrandom_r(buf);
uint64_t b = myrandom_r(buf);
return (a | (b << 31));
}
static inline uint64_t
randu64(struct random_data *buf)
{
uint64_t r62 = randu62(buf);
uint64_t c = myrandom_r(buf);
return (r62 | ((c & 0x3) << 62));
}
using namespace toku;
#if !defined(HAVE_MEMALIGN)
# if defined(HAVE_VALLOC)
@ -527,7 +484,7 @@ static void *worker(void *arg_v) {
struct random_data random_data;
ZERO_STRUCT(random_data);
char *XCALLOC_N(8, random_buf);
r = initstate_r(random(), random_buf, 8, &random_data);
r = myinitstate_r(random(), random_buf, 8, &random_data);
assert_zero(r);
arg->random_data = &random_data;
DB_ENV *env = arg->env;