diff --git a/ft/dbufio.cc b/ft/dbufio.cc index 818547d1617..5130c2f1864 100644 --- a/ft/dbufio.cc +++ b/ft/dbufio.cc @@ -66,14 +66,12 @@ static void enq (DBUFIO_FILESET bfs, struct dbufio_file *f) { static void panic (DBUFIO_FILESET bfs, int r) { if (bfs->panic) return; - // may need a cilk fake mutex here to convince the race detector that it's OK. bfs->panic_errno = r; // Don't really care about a race on this variable... Writes to it are atomic, so at least one good panic reason will be stored. bfs->panic = true; return; } static bool paniced (DBUFIO_FILESET bfs) { - // may need a cilk fake mutex here to convince the race detector that it's OK. return bfs->panic; } diff --git a/ft/ft-ops.cc b/ft/ft-ops.cc index b129a9bd807..547735dc77f 100644 --- a/ft/ft-ops.cc +++ b/ft/ft-ops.cc @@ -139,16 +139,6 @@ basement nodes, bulk fetch, and partial fetch: #include -#if defined(HAVE_CILK) -#include -#define cilk_worker_count (__cilkrts_get_nworkers()) -#else -#define cilk_spawn -#define cilk_sync -#define cilk_for for -#define cilk_worker_count 1 -#endif - static const uint32_t this_version = FT_LAYOUT_VERSION; /* Status is intended for display to humans to help understand system behavior. @@ -4617,7 +4607,6 @@ __attribute__((nonnull)) void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) { invariant(node->height > 0); - // TODO: could be cilkified for (int i = 0; i < node->n_children; ++i) { if (BP_STATE(node, i) != PT_AVAIL) { continue; diff --git a/ft/ft-serialize.cc b/ft/ft-serialize.cc index ca56fce5be2..3b1359c5599 100644 --- a/ft/ft-serialize.cc +++ b/ft/ft-serialize.cc @@ -8,16 +8,6 @@ #include "ft.h" #include "ft-internal.h" -#if defined(HAVE_CILK) -#include -#define cilk_worker_count (__cilkrts_get_nworkers()) -#else -#define cilk_spawn -#define cilk_sync -#define cilk_for for -#define cilk_worker_count 1 -#endif - // not version-sensitive because we only serialize a descriptor using the current layout_version uint32_t toku_serialize_descriptor_size(const DESCRIPTOR desc) { diff --git a/ft/ft_node-serialize.cc b/ft/ft_node-serialize.cc index fcc03f9bb70..e65546fe9ce 100644 --- a/ft/ft_node-serialize.cc +++ b/ft/ft_node-serialize.cc @@ -11,17 +11,6 @@ #include #include -#if defined(HAVE_CILK) -#include -#define cilk_worker_count (__cilkrts_get_nworkers()) -#else -#define cilk_spawn -#define cilk_sync -#define cilk_for for -#define cilk_worker_count 1 -#endif - - static FT_UPGRADE_STATUS_S ft_upgrade_status; #define UPGRADE_STATUS_INIT(k,t,l) { \ @@ -2266,9 +2255,8 @@ deserialize_ftnode_from_rbuf( // for partitions staying compressed, create sub_block setup_ftnode_partitions(node, bfe, true); - // Previously, this code was a for loop with spawns inside and a sync at the end. - // But now the loop is parallelizeable since we don't have a dependency on the work done so far. - cilk_for (int i = 0; i < node->n_children; i++) { + // This loop is parallelizeable, since we don't have a dependency on the work done so far. + for (int i = 0; i < node->n_children; i++) { uint32_t curr_offset = BP_START(*ndd,i); uint32_t curr_size = BP_SIZE(*ndd,i); // the compressed, serialized partitions start at where rb is currently pointing, diff --git a/ft/ftloader.cc b/ft/ftloader.cc index 54ae2260d15..37e2c22aec7 100644 --- a/ft/ftloader.cc +++ b/ft/ftloader.cc @@ -27,20 +27,6 @@ #include "log-internal.h" #include "ft.h" -#if defined(__cilkplusplus) -#error DISABLING CILK ARTS CILK -#endif - -#if defined(HAVE_CILK) -#include -#define cilk_worker_count (__cilkrts_get_nworkers()) -#else -#define cilk_spawn -#define cilk_sync -#define cilk_for for -#define cilk_worker_count 1 -#endif - static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL; void ft_loader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) { os_fwrite_fun=fwrite_fun; @@ -426,7 +412,7 @@ static unsigned ft_loader_get_fractal_workers_count(FTLOADER bl) { static void ft_loader_set_fractal_workers_count(FTLOADER bl) { ft_loader_lock(bl); if (bl->fractal_workers == 0) - bl->fractal_workers = cilk_worker_count; + bl->fractal_workers = 1; ft_loader_unlock(bl); } @@ -924,16 +910,11 @@ static int finish_primary_rows_internal (FTLOADER bl) int *MALLOC_N(bl->N, ra); if (ra==NULL) return get_error_errno(); -#if defined(HAVE_CILK) - #pragma cilk grainsize = 1 -#endif - - cilk_for (int i = 0; i < bl->N; i++) { + for (int i = 0; i < bl->N; i++) { //printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows); ra[i] = sort_and_write_rows(bl->rows[i], &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]); zero_rowset(&bl->rows[i]); } - // Implicit cilk_sync after that cilk_for loop. // accept any of the error codes (in this case, the last one). int r = 0; @@ -1078,14 +1059,9 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro // if primary_rowset is NULL then treat it as empty. { int error_count = 0; - // cilk++ bug int error_codes[bl-N]; int *XMALLOC_N(bl->N, error_codes); - // Do parallelize this loop with cilk_grainsize = 1 so that every iteration will run in parallel. -#if defined(HAVE_CILK) - #pragma cilk grainsize = 1 -#endif - cilk_for (int i = 0; i < bl->N; i++) { + for (int i = 0; i < bl->N; i++) { unsigned int klimit,vlimit; // maximum row sizes. toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit); @@ -1321,7 +1297,6 @@ static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], i FTLOADER bl, struct rowset *rowset) /* Effect: Given two sorted arrays of rows, a and b, merge them using the comparison function, and write them into dest. - * This function is a cilk function with parallelism, and is suitable for use in a mergesort. * Arguments: * dest write the rows here * a,b the rows being merged @@ -1346,9 +1321,8 @@ static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], i if (r!=0) return r; // for example if we found a duplicate, called the error_callback, and now we return an error code. } int ra, rb; - ra = cilk_spawn merge_row_arrays(dest, a, a2, b, b2, which_db, dest_db, compare, bl, rowset); - rb = merge_row_arrays(dest+a2+b2, a+a2, an-a2, b+b2, bn-b2, which_db, dest_db, compare, bl, rowset); - cilk_sync; + ra = merge_row_arrays(dest, a, a2, b, b2, which_db, dest_db, compare, bl, rowset); + rb = merge_row_arrays(dest+a2+b2, a+a2, an-a2, b+b2, bn-b2, which_db, dest_db, compare, bl, rowset); if (ra!=0) return ra; else return rb; } @@ -1365,12 +1339,11 @@ int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_d if (n<=1) return 0; // base case is sorted int mid = n/2; int r1, r2; - r1 = cilk_spawn mergesort_row_array (rows, mid, which_db, dest_db, compare, bl, rowset); + r1 = mergesort_row_array (rows, mid, which_db, dest_db, compare, bl, rowset); // Don't spawn this one explicitly r2 = mergesort_row_array (rows+mid, n-mid, which_db, dest_db, compare, bl, rowset); - cilk_sync; if (r1!=0) return r1; if (r2!=0) return r2; @@ -1458,14 +1431,12 @@ static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile) } // RFP maybe this should be buried in the ft_loader struct -// This was previously a cilk lock, but now we need it to work for pthreads too. static toku_mutex_t update_progress_lock = TOKU_MUTEX_INITIALIZER; static int update_progress (int N, FTLOADER bl, const char *UU(message)) { - // Need a lock here because of cilk and also the various pthreads. // Must protect the increment and the call to the poll_function. toku_mutex_lock(&update_progress_lock); bl->progress+=N; @@ -2278,7 +2249,7 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, int rr = queue_deq(q, &item, NULL, NULL); if (rr == EOF) break; if (rr != 0) { - ft_loader_set_panic(bl, rr, true); // error after cilk sync + ft_loader_set_panic(bl, rr, true); break; } } @@ -2313,12 +2284,12 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, invariant(maxkey.data != NULL); if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) { - ft_loader_set_panic(bl, r, true); // error after cilk sync + ft_loader_set_panic(bl, r, true); if (result == 0) result = r; break; } - cilk_spawn finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method); + finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method); lbuf = NULL; r = allocate_block(&out, &lblock); @@ -2361,7 +2332,6 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, } } - cilk_sync; if (result == 0) { result = ft_loader_get_error(&bl->error_callback); // if there were any prior errors then exit @@ -3014,7 +2984,7 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st result = r; break; } else { - cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node. + write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node. n_subtrees_used += n_per_block; } } @@ -3037,7 +3007,7 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st if (r) { result = r; } else { - cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); + write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); n_blocks_left -= n_first; n_subtrees_used += n_first; } @@ -3056,14 +3026,13 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st if (r) { result = r; } else { - cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); + write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); n_subtrees_used += n_blocks_left; } } if (result == 0) invariant(n_subtrees_used == sts->n_subtrees); - cilk_sync; if (result == 0) // pick up write_nonleaf_node errors result = ft_loader_get_error(&bl->error_callback); diff --git a/ft/fttypes.h b/ft/fttypes.h index d671f494125..3f547d98217 100644 --- a/ft/fttypes.h +++ b/ft/fttypes.h @@ -71,7 +71,6 @@ typedef struct pair_attr_s { } PAIR_ATTR; static inline PAIR_ATTR make_pair_attr(long size) { -#if 1 || (!defined(__cplusplus) && !defined(__cilkplusplus)) PAIR_ATTR result={ .size = size, .nonleaf_size = 0, @@ -80,9 +79,6 @@ static inline PAIR_ATTR make_pair_attr(long size) { .cache_pressure_size = 0, .is_valid = true }; -#else - PAIR_ATTR result = {size, 0, 0, 0, 0, true}; -#endif return result; } diff --git a/ft/quicklz.h b/ft/quicklz.h index d2f22e66d1f..d3d06aed220 100644 --- a/ft/quicklz.h +++ b/ft/quicklz.h @@ -129,7 +129,7 @@ typedef struct #endif -#if defined (__cplusplus) || defined (__cilkplusplus) +#if defined (__cplusplus) extern "C" { #endif @@ -140,7 +140,7 @@ size_t qlz_compress(const void *source, char *destination, size_t size, qlz_stat size_t qlz_decompress(const char *source, void *destination, qlz_state_decompress *state); int qlz_get_setting(int setting); -#if defined (__cplusplus) || defined (__cilkplusplus) +#if defined (__cplusplus) } #endif diff --git a/toku_include/toku_portability.h b/toku_include/toku_portability.h index 423a4eeee6e..35ed137340e 100644 --- a/toku_include/toku_portability.h +++ b/toku_include/toku_portability.h @@ -79,7 +79,7 @@ typedef int64_t toku_off_t; #define static_assert(foo, bar) #endif -#if defined(__cplusplus) || defined(__cilkplusplus) +#if defined(__cplusplus) # define cast_to_typeof(v) (decltype(v)) #else # define cast_to_typeof(v) (__typeof__(v)) @@ -102,11 +102,11 @@ typedef int64_t toku_off_t; #if defined(HAVE_ALLOCA_H) # include #endif -#if defined(__cplusplus) || defined(__cilkplusplus) +#if defined(__cplusplus) # include #endif -#if defined(__cplusplus) || defined(__cilkplusplus) +#if defined(__cplusplus) # define cast_to_typeof(v) (decltype(v)) #else # define cast_to_typeof(v) (__typeof__(v)) @@ -118,7 +118,7 @@ typedef int64_t toku_off_t; #endif -#if defined(__cplusplus) || defined(__cilkplusplus) +#if defined(__cplusplus) // decltype() here gives a reference-to-pointer instead of just a pointer, // just use __typeof__ # define CAST_FROM_VOIDP(name, value) name = static_cast<__typeof__(name)>(value) @@ -137,7 +137,7 @@ typedef int64_t toku_off_t; #define UU(x) x __attribute__((__unused__)) -#if defined(__cplusplus) || defined(__cilkplusplus) +#if defined(__cplusplus) extern "C" { #endif @@ -242,7 +242,7 @@ extern void *realloc(void*, size_t) __THROW __attribute__((__deprecat # endif #endif -#if defined(__cplusplus) || defined(__cilkplusplus) +#if defined(__cplusplus) }; #endif