fixes #5797 cilk artifacts removed

git-svn-id: file:///svn/toku/tokudb@51213 c7de825b-a66e-492c-adef-691d508d4ae1
This commit is contained in:
John Esmet 2013-04-17 00:01:24 -04:00 committed by Yoni Fogel
parent 00ac50e3e9
commit 1fd162242d
8 changed files with 22 additions and 92 deletions

View file

@ -66,14 +66,12 @@ static void enq (DBUFIO_FILESET bfs, struct dbufio_file *f) {
static void panic (DBUFIO_FILESET bfs, int r) { static void panic (DBUFIO_FILESET bfs, int r) {
if (bfs->panic) return; if (bfs->panic) return;
// may need a cilk fake mutex here to convince the race detector that it's OK.
bfs->panic_errno = r; // Don't really care about a race on this variable... Writes to it are atomic, so at least one good panic reason will be stored. bfs->panic_errno = r; // Don't really care about a race on this variable... Writes to it are atomic, so at least one good panic reason will be stored.
bfs->panic = true; bfs->panic = true;
return; return;
} }
static bool paniced (DBUFIO_FILESET bfs) { static bool paniced (DBUFIO_FILESET bfs) {
// may need a cilk fake mutex here to convince the race detector that it's OK.
return bfs->panic; return bfs->panic;
} }

View file

@ -139,16 +139,6 @@ basement nodes, bulk fetch, and partial fetch:
#include <stdint.h> #include <stdint.h>
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
#define cilk_worker_count 1
#endif
static const uint32_t this_version = FT_LAYOUT_VERSION; static const uint32_t this_version = FT_LAYOUT_VERSION;
/* Status is intended for display to humans to help understand system behavior. /* Status is intended for display to humans to help understand system behavior.
@ -4617,7 +4607,6 @@ __attribute__((nonnull))
void void
toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) { toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) {
invariant(node->height > 0); invariant(node->height > 0);
// TODO: could be cilkified
for (int i = 0; i < node->n_children; ++i) { for (int i = 0; i < node->n_children; ++i) {
if (BP_STATE(node, i) != PT_AVAIL) { if (BP_STATE(node, i) != PT_AVAIL) {
continue; continue;

View file

@ -8,16 +8,6 @@
#include "ft.h" #include "ft.h"
#include "ft-internal.h" #include "ft-internal.h"
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
#define cilk_worker_count 1
#endif
// not version-sensitive because we only serialize a descriptor using the current layout_version // not version-sensitive because we only serialize a descriptor using the current layout_version
uint32_t uint32_t
toku_serialize_descriptor_size(const DESCRIPTOR desc) { toku_serialize_descriptor_size(const DESCRIPTOR desc) {

View file

@ -11,17 +11,6 @@
#include <util/sort.h> #include <util/sort.h>
#include <util/threadpool.h> #include <util/threadpool.h>
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
#define cilk_worker_count 1
#endif
static FT_UPGRADE_STATUS_S ft_upgrade_status; static FT_UPGRADE_STATUS_S ft_upgrade_status;
#define UPGRADE_STATUS_INIT(k,t,l) { \ #define UPGRADE_STATUS_INIT(k,t,l) { \
@ -2266,9 +2255,8 @@ deserialize_ftnode_from_rbuf(
// for partitions staying compressed, create sub_block // for partitions staying compressed, create sub_block
setup_ftnode_partitions(node, bfe, true); setup_ftnode_partitions(node, bfe, true);
// Previously, this code was a for loop with spawns inside and a sync at the end. // This loop is parallelizeable, since we don't have a dependency on the work done so far.
// But now the loop is parallelizeable since we don't have a dependency on the work done so far. for (int i = 0; i < node->n_children; i++) {
cilk_for (int i = 0; i < node->n_children; i++) {
uint32_t curr_offset = BP_START(*ndd,i); uint32_t curr_offset = BP_START(*ndd,i);
uint32_t curr_size = BP_SIZE(*ndd,i); uint32_t curr_size = BP_SIZE(*ndd,i);
// the compressed, serialized partitions start at where rb is currently pointing, // the compressed, serialized partitions start at where rb is currently pointing,

View file

@ -27,20 +27,6 @@
#include "log-internal.h" #include "log-internal.h"
#include "ft.h" #include "ft.h"
#if defined(__cilkplusplus)
#error DISABLING CILK ARTS CILK
#endif
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
#define cilk_worker_count 1
#endif
static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL; static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL;
void ft_loader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) { void ft_loader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) {
os_fwrite_fun=fwrite_fun; os_fwrite_fun=fwrite_fun;
@ -426,7 +412,7 @@ static unsigned ft_loader_get_fractal_workers_count(FTLOADER bl) {
static void ft_loader_set_fractal_workers_count(FTLOADER bl) { static void ft_loader_set_fractal_workers_count(FTLOADER bl) {
ft_loader_lock(bl); ft_loader_lock(bl);
if (bl->fractal_workers == 0) if (bl->fractal_workers == 0)
bl->fractal_workers = cilk_worker_count; bl->fractal_workers = 1;
ft_loader_unlock(bl); ft_loader_unlock(bl);
} }
@ -924,16 +910,11 @@ static int finish_primary_rows_internal (FTLOADER bl)
int *MALLOC_N(bl->N, ra); int *MALLOC_N(bl->N, ra);
if (ra==NULL) return get_error_errno(); if (ra==NULL) return get_error_errno();
#if defined(HAVE_CILK) for (int i = 0; i < bl->N; i++) {
#pragma cilk grainsize = 1
#endif
cilk_for (int i = 0; i < bl->N; i++) {
//printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows); //printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
ra[i] = sort_and_write_rows(bl->rows[i], &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]); ra[i] = sort_and_write_rows(bl->rows[i], &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]);
zero_rowset(&bl->rows[i]); zero_rowset(&bl->rows[i]);
} }
// Implicit cilk_sync after that cilk_for loop.
// accept any of the error codes (in this case, the last one). // accept any of the error codes (in this case, the last one).
int r = 0; int r = 0;
@ -1078,14 +1059,9 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
// if primary_rowset is NULL then treat it as empty. // if primary_rowset is NULL then treat it as empty.
{ {
int error_count = 0; int error_count = 0;
// cilk++ bug int error_codes[bl-N];
int *XMALLOC_N(bl->N, error_codes); int *XMALLOC_N(bl->N, error_codes);
// Do parallelize this loop with cilk_grainsize = 1 so that every iteration will run in parallel. for (int i = 0; i < bl->N; i++) {
#if defined(HAVE_CILK)
#pragma cilk grainsize = 1
#endif
cilk_for (int i = 0; i < bl->N; i++) {
unsigned int klimit,vlimit; // maximum row sizes. unsigned int klimit,vlimit; // maximum row sizes.
toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit); toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit);
@ -1321,7 +1297,6 @@ static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], i
FTLOADER bl, FTLOADER bl,
struct rowset *rowset) struct rowset *rowset)
/* Effect: Given two sorted arrays of rows, a and b, merge them using the comparison function, and write them into dest. /* Effect: Given two sorted arrays of rows, a and b, merge them using the comparison function, and write them into dest.
* This function is a cilk function with parallelism, and is suitable for use in a mergesort.
* Arguments: * Arguments:
* dest write the rows here * dest write the rows here
* a,b the rows being merged * a,b the rows being merged
@ -1346,9 +1321,8 @@ static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], i
if (r!=0) return r; // for example if we found a duplicate, called the error_callback, and now we return an error code. if (r!=0) return r; // for example if we found a duplicate, called the error_callback, and now we return an error code.
} }
int ra, rb; int ra, rb;
ra = cilk_spawn merge_row_arrays(dest, a, a2, b, b2, which_db, dest_db, compare, bl, rowset); ra = merge_row_arrays(dest, a, a2, b, b2, which_db, dest_db, compare, bl, rowset);
rb = merge_row_arrays(dest+a2+b2, a+a2, an-a2, b+b2, bn-b2, which_db, dest_db, compare, bl, rowset); rb = merge_row_arrays(dest+a2+b2, a+a2, an-a2, b+b2, bn-b2, which_db, dest_db, compare, bl, rowset);
cilk_sync;
if (ra!=0) return ra; if (ra!=0) return ra;
else return rb; else return rb;
} }
@ -1365,12 +1339,11 @@ int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_d
if (n<=1) return 0; // base case is sorted if (n<=1) return 0; // base case is sorted
int mid = n/2; int mid = n/2;
int r1, r2; int r1, r2;
r1 = cilk_spawn mergesort_row_array (rows, mid, which_db, dest_db, compare, bl, rowset); r1 = mergesort_row_array (rows, mid, which_db, dest_db, compare, bl, rowset);
// Don't spawn this one explicitly // Don't spawn this one explicitly
r2 = mergesort_row_array (rows+mid, n-mid, which_db, dest_db, compare, bl, rowset); r2 = mergesort_row_array (rows+mid, n-mid, which_db, dest_db, compare, bl, rowset);
cilk_sync;
if (r1!=0) return r1; if (r1!=0) return r1;
if (r2!=0) return r2; if (r2!=0) return r2;
@ -1458,14 +1431,12 @@ static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile)
} }
// RFP maybe this should be buried in the ft_loader struct // RFP maybe this should be buried in the ft_loader struct
// This was previously a cilk lock, but now we need it to work for pthreads too.
static toku_mutex_t update_progress_lock = TOKU_MUTEX_INITIALIZER; static toku_mutex_t update_progress_lock = TOKU_MUTEX_INITIALIZER;
static int update_progress (int N, static int update_progress (int N,
FTLOADER bl, FTLOADER bl,
const char *UU(message)) const char *UU(message))
{ {
// Need a lock here because of cilk and also the various pthreads.
// Must protect the increment and the call to the poll_function. // Must protect the increment and the call to the poll_function.
toku_mutex_lock(&update_progress_lock); toku_mutex_lock(&update_progress_lock);
bl->progress+=N; bl->progress+=N;
@ -2278,7 +2249,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
int rr = queue_deq(q, &item, NULL, NULL); int rr = queue_deq(q, &item, NULL, NULL);
if (rr == EOF) break; if (rr == EOF) break;
if (rr != 0) { if (rr != 0) {
ft_loader_set_panic(bl, rr, true); // error after cilk sync ft_loader_set_panic(bl, rr, true);
break; break;
} }
} }
@ -2313,12 +2284,12 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
invariant(maxkey.data != NULL); invariant(maxkey.data != NULL);
if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) { if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) {
ft_loader_set_panic(bl, r, true); // error after cilk sync ft_loader_set_panic(bl, r, true);
if (result == 0) result = r; if (result == 0) result = r;
break; break;
} }
cilk_spawn finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method); finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method);
lbuf = NULL; lbuf = NULL;
r = allocate_block(&out, &lblock); r = allocate_block(&out, &lblock);
@ -2361,7 +2332,6 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
} }
} }
cilk_sync;
if (result == 0) { if (result == 0) {
result = ft_loader_get_error(&bl->error_callback); // if there were any prior errors then exit result = ft_loader_get_error(&bl->error_callback); // if there were any prior errors then exit
@ -3014,7 +2984,7 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st
result = r; result = r;
break; break;
} else { } else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node. write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node.
n_subtrees_used += n_per_block; n_subtrees_used += n_per_block;
} }
} }
@ -3037,7 +3007,7 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st
if (r) { if (r) {
result = r; result = r;
} else { } else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
n_blocks_left -= n_first; n_blocks_left -= n_first;
n_subtrees_used += n_first; n_subtrees_used += n_first;
} }
@ -3056,14 +3026,13 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st
if (r) { if (r) {
result = r; result = r;
} else { } else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
n_subtrees_used += n_blocks_left; n_subtrees_used += n_blocks_left;
} }
} }
if (result == 0) if (result == 0)
invariant(n_subtrees_used == sts->n_subtrees); invariant(n_subtrees_used == sts->n_subtrees);
cilk_sync;
if (result == 0) // pick up write_nonleaf_node errors if (result == 0) // pick up write_nonleaf_node errors
result = ft_loader_get_error(&bl->error_callback); result = ft_loader_get_error(&bl->error_callback);

View file

@ -71,7 +71,6 @@ typedef struct pair_attr_s {
} PAIR_ATTR; } PAIR_ATTR;
static inline PAIR_ATTR make_pair_attr(long size) { static inline PAIR_ATTR make_pair_attr(long size) {
#if 1 || (!defined(__cplusplus) && !defined(__cilkplusplus))
PAIR_ATTR result={ PAIR_ATTR result={
.size = size, .size = size,
.nonleaf_size = 0, .nonleaf_size = 0,
@ -80,9 +79,6 @@ static inline PAIR_ATTR make_pair_attr(long size) {
.cache_pressure_size = 0, .cache_pressure_size = 0,
.is_valid = true .is_valid = true
}; };
#else
PAIR_ATTR result = {size, 0, 0, 0, 0, true};
#endif
return result; return result;
} }

View file

@ -129,7 +129,7 @@ typedef struct
#endif #endif
#if defined (__cplusplus) || defined (__cilkplusplus) #if defined (__cplusplus)
extern "C" { extern "C" {
#endif #endif
@ -140,7 +140,7 @@ size_t qlz_compress(const void *source, char *destination, size_t size, qlz_stat
size_t qlz_decompress(const char *source, void *destination, qlz_state_decompress *state); size_t qlz_decompress(const char *source, void *destination, qlz_state_decompress *state);
int qlz_get_setting(int setting); int qlz_get_setting(int setting);
#if defined (__cplusplus) || defined (__cilkplusplus) #if defined (__cplusplus)
} }
#endif #endif

View file

@ -79,7 +79,7 @@ typedef int64_t toku_off_t;
#define static_assert(foo, bar) #define static_assert(foo, bar)
#endif #endif
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus)
# define cast_to_typeof(v) (decltype(v)) # define cast_to_typeof(v) (decltype(v))
#else #else
# define cast_to_typeof(v) (__typeof__(v)) # define cast_to_typeof(v) (__typeof__(v))
@ -102,11 +102,11 @@ typedef int64_t toku_off_t;
#if defined(HAVE_ALLOCA_H) #if defined(HAVE_ALLOCA_H)
# include <alloca.h> # include <alloca.h>
#endif #endif
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus)
# include <type_traits> # include <type_traits>
#endif #endif
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus)
# define cast_to_typeof(v) (decltype(v)) # define cast_to_typeof(v) (decltype(v))
#else #else
# define cast_to_typeof(v) (__typeof__(v)) # define cast_to_typeof(v) (__typeof__(v))
@ -118,7 +118,7 @@ typedef int64_t toku_off_t;
#endif #endif
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus)
// decltype() here gives a reference-to-pointer instead of just a pointer, // decltype() here gives a reference-to-pointer instead of just a pointer,
// just use __typeof__ // just use __typeof__
# define CAST_FROM_VOIDP(name, value) name = static_cast<__typeof__(name)>(value) # define CAST_FROM_VOIDP(name, value) name = static_cast<__typeof__(name)>(value)
@ -137,7 +137,7 @@ typedef int64_t toku_off_t;
#define UU(x) x __attribute__((__unused__)) #define UU(x) x __attribute__((__unused__))
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus)
extern "C" { extern "C" {
#endif #endif
@ -242,7 +242,7 @@ extern void *realloc(void*, size_t) __THROW __attribute__((__deprecat
# endif # endif
#endif #endif
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus)
}; };
#endif #endif