// Cassandra includes: #include #include #include #include #include #include "Thrift.h" #include "transport/TSocket.h" #include "transport/TTransport.h" #include "transport/TBufferTransports.h" #include "protocol/TProtocol.h" #include "protocol/TBinaryProtocol.h" #include "gen-cpp/Cassandra.h" // cassandra includes end #include "cassandra_se.h" struct st_mysql_lex_string { char *str; size_t length; }; using namespace std; using namespace apache::thrift; using namespace apache::thrift::transport; using namespace apache::thrift::protocol; using namespace org::apache::cassandra; /* Implementation of connection to one Cassandra column family (ie., table) */ class Cassandra_se_impl: public Cassandra_se_interface { CassandraClient *cass; /* Connection to cassandra */ std::string column_family; std::string keyspace; ConsistencyLevel::type write_consistency; ConsistencyLevel::type read_consistency; /* How many times to retry an operation before giving up */ int thrift_call_retries_to_do; /* DDL data */ KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */ CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/ std::vector::iterator column_ddl_it; /* The list that was returned by the last key lookup */ std::vector column_data_vec; std::vector::iterator column_data_it; /* Insert preparation */ typedef std::map > ColumnFamilyToMutation; typedef std::map KeyToCfMutationMap; KeyToCfMutationMap batch_mutation; /* Prepare operation here */ int64_t insert_timestamp; std::vector* insert_list; /* Resultset we're reading */ std::vector key_slice_vec; std::vector::iterator key_slice_it; std::string rowkey; /* key of the record we're returning now */ SlicePredicate slice_pred; SliceRange slice_pred_sr; bool get_slices_returned_less; bool get_slice_found_rows; public: Cassandra_se_impl() : cass(NULL), write_consistency(ConsistencyLevel::ONE), read_consistency(ConsistencyLevel::ONE), thrift_call_retries_to_do(0) {} virtual ~Cassandra_se_impl(){ delete cass; } /* Connection and DDL checks */ bool connect(const char *host, int port, const char *keyspace); void set_column_family(const char *cfname) { column_family.assign(cfname); } bool setup_ddl_checks(); void first_ddl_column(); bool next_ddl_column(char **name, int *name_len, char **value, int *value_len); void get_rowkey_type(char **name, char **type); size_t get_ddl_size(); const char* get_default_validator(); /* Settings */ void set_consistency_levels(ulong read_cons_level, ulong write_cons_level); /* Writes */ void clear_insert_buffer(); void start_row_insert(const char *key, int key_len); void add_insert_column(const char *name, int name_len, const char *value, int value_len); void add_insert_delete_column(const char *name, int name_len); void add_row_deletion(const char *key, int key_len, Column_name_enumerator *col_names, LEX_STRING *names, uint nnames); bool do_insert(); /* Reads, point lookups */ bool get_slice(char *key, size_t key_len, bool *found); bool get_next_read_column(char **name, int *name_len, char **value, int *value_len ); void get_read_rowkey(char **value, int *value_len); /* Reads, multi-row scans */ private: bool have_rowkey_to_skip; std::string rowkey_to_skip; bool get_range_slices_param_last_key_as_start_key; public: bool get_range_slices(bool last_key_as_start_key); void finish_reading_range_slices(); bool get_next_range_slice_row(bool *eof); /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */ void clear_read_columns(); void clear_read_all_columns(); void add_read_column(const char *name); /* Reads, MRR scans */ void new_lookup_keys(); int add_lookup_key(const char *key, size_t key_len); bool multiget_slice(); bool get_next_multiget_row(); bool truncate(); bool remove_row(); private: bool retryable_truncate(); bool retryable_do_insert(); bool retryable_remove_row(); bool retryable_setup_ddl_checks(); bool retryable_multiget_slice(); bool retryable_get_range_slices(); bool retryable_get_slice(); std::vector mrr_keys; /* can we use allocator to put these into MRR buffer? */ std::map > mrr_result; std::map >::iterator mrr_result_it; /* Non-inherited utility functions: */ int64_t get_i64_timestamp(); typedef bool (Cassandra_se_impl::*retryable_func_t)(); bool try_operation(retryable_func_t func); }; ///////////////////////////////////////////////////////////////////////////// // Connection and setup ///////////////////////////////////////////////////////////////////////////// Cassandra_se_interface *create_cassandra_se() { return new Cassandra_se_impl; } bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg) { bool res= true; keyspace.assign(keyspace_arg); try { boost::shared_ptr socket = boost::shared_ptr(new TSocket(host, port)); boost::shared_ptr tr = boost::shared_ptr(new TFramedTransport (socket)); boost::shared_ptr p = boost::shared_ptr(new TBinaryProtocol(tr)); cass= new CassandraClient(p); tr->open(); cass->set_keyspace(keyspace_arg); res= false; // success }catch(TTransportException te){ print_error("%s [%d]", te.what(), te.getType()); }catch(InvalidRequestException ire){ print_error("%s [%s]", ire.what(), ire.why.c_str()); }catch(NotFoundException nfe){ print_error("%s", nfe.what()); }catch(TException e){ print_error("Thrift exception: %s", e.what()); }catch (...) { print_error("Unknown exception"); } if (!res && setup_ddl_checks()) res= true; return res; } void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level, ulong write_cons_level) { write_cons_level= (ConsistencyLevel::type)(write_cons_level + 1); read_cons_level= (ConsistencyLevel::type)(read_cons_level + 1); } bool Cassandra_se_impl::retryable_setup_ddl_checks() { try { cass->describe_keyspace(ks_def, keyspace); } catch (NotFoundException nfe) { print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what()); return true; } std::vector::iterator it; for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++) { cf_def= *it; if (!cf_def.name.compare(column_family)) return false; } print_error("Column family %s not found in keyspace %s", column_family.c_str(), keyspace.c_str()); return true; } bool Cassandra_se_impl::setup_ddl_checks() { return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks); } void Cassandra_se_impl::first_ddl_column() { column_ddl_it= cf_def.column_metadata.begin(); } bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len, char **type, int *type_len) { if (column_ddl_it == cf_def.column_metadata.end()) return true; *name= (char*)(*column_ddl_it).name.c_str(); *name_len= (*column_ddl_it).name.length(); *type= (char*)(*column_ddl_it).validation_class.c_str(); *type_len= (*column_ddl_it).validation_class.length(); column_ddl_it++; return false; } void Cassandra_se_impl::get_rowkey_type(char **name, char **type) { if (cf_def.__isset.key_validation_class) *type= (char*)cf_def.key_validation_class.c_str(); else *type= NULL; if (cf_def.__isset.key_alias) *name= (char*)cf_def.key_alias.c_str(); else *name= NULL; } size_t Cassandra_se_impl::get_ddl_size() { return cf_def.column_metadata.size(); } const char* Cassandra_se_impl::get_default_validator() { return cf_def.default_validation_class.c_str(); } ///////////////////////////////////////////////////////////////////////////// // Data writes ///////////////////////////////////////////////////////////////////////////// int64_t Cassandra_se_impl::get_i64_timestamp() { struct timeval td; gettimeofday(&td, NULL); int64_t ms = td.tv_sec; ms = ms * 1000; int64_t usec = td.tv_usec; usec = usec / 1000; ms += usec; return ms; } void Cassandra_se_impl::clear_insert_buffer() { batch_mutation.clear(); } void Cassandra_se_impl::start_row_insert(const char *key, int key_len) { std::string key_to_insert; key_to_insert.assign(key, key_len); batch_mutation[key_to_insert]= ColumnFamilyToMutation(); ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert]; cf_mut[column_family]= std::vector(); insert_list= &cf_mut[column_family]; insert_timestamp= get_i64_timestamp(); } void Cassandra_se_impl::add_row_deletion(const char *key, int key_len, Column_name_enumerator *col_names, LEX_STRING *names, uint nnames) { std::string key_to_delete; key_to_delete.assign(key, key_len); batch_mutation[key_to_delete]= ColumnFamilyToMutation(); ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_delete]; cf_mut[column_family]= std::vector(); std::vector &mutation_list= cf_mut[column_family]; Mutation mut; mut.__isset.deletion= true; mut.deletion.__isset.timestamp= true; mut.deletion.timestamp= get_i64_timestamp(); mut.deletion.__isset.predicate= true; /* Attempting to delete columns with SliceRange causes exception with message "Deletion does not yet support SliceRange predicates". Delete all columns individually. */ SlicePredicate slice_pred; slice_pred.__isset.column_names= true; const char *col_name; while ((col_name= col_names->get_next_name())) slice_pred.column_names.push_back(std::string(col_name)); for (uint i= 0; i < nnames; i++) slice_pred.column_names.push_back(std::string(names[i].str, names[i].length)); mut.deletion.predicate= slice_pred; mutation_list.push_back(mut); } void Cassandra_se_impl::add_insert_column(const char *name, int name_len, const char *value, int value_len) { Mutation mut; mut.__isset.column_or_supercolumn= true; mut.column_or_supercolumn.__isset.column= true; Column& col=mut.column_or_supercolumn.column; if (name_len) col.name.assign(name, name_len); else col.name.assign(name); col.value.assign(value, value_len); col.timestamp= insert_timestamp; col.__isset.value= true; col.__isset.timestamp= true; insert_list->push_back(mut); } void Cassandra_se_impl::add_insert_delete_column(const char *name, int name_len) { Mutation mut; mut.__isset.deletion= true; mut.deletion.__isset.timestamp= true; mut.deletion.timestamp= insert_timestamp; mut.deletion.__isset.predicate= true; SlicePredicate slice_pred; slice_pred.__isset.column_names= true; slice_pred.column_names.push_back(std::string(name, name_len)); mut.deletion.predicate= slice_pred; insert_list->push_back(mut); } bool Cassandra_se_impl::retryable_do_insert() { cass->batch_mutate(batch_mutation, write_consistency); cassandra_counters.row_inserts+= batch_mutation.size(); cassandra_counters.row_insert_batches++; clear_insert_buffer(); return 0; } bool Cassandra_se_impl::do_insert() { /* zero-size mutations are allowed by Cassandra's batch_mutate but lets not do them (we may attempt to do it if there is a bulk insert that stores exactly @@cassandra_insert_batch_size*n elements. */ if (batch_mutation.empty()) return false; return try_operation(&Cassandra_se_impl::retryable_do_insert); } ///////////////////////////////////////////////////////////////////////////// // Reading data ///////////////////////////////////////////////////////////////////////////// /* Make one key lookup. If the record is found, the result is stored locally and the caller should iterate over it. */ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found) { bool res; rowkey.assign(key, key_len); if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice))) *found= get_slice_found_rows; return res; } bool Cassandra_se_impl::retryable_get_slice() { ColumnParent cparent; cparent.column_family= column_family; SlicePredicate slice_pred; SliceRange sr; sr.start = ""; sr.finish = ""; slice_pred.__set_slice_range(sr); cass->get_slice(column_data_vec, rowkey, cparent, slice_pred, read_consistency); if (column_data_vec.size() == 0) { /* No columns found. Cassandra doesn't allow records without any column => this means the seach key doesn't exist */ get_slice_found_rows= false; return false; } get_slice_found_rows= true; column_data_it= column_data_vec.begin(); return false; } bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len, char **value, int *value_len) { bool use_counter=false; while (1) { if (column_data_it == column_data_vec.end()) return true; if ((*column_data_it).__isset.column) break; /* Ok it's a real column. Should be always the case. */ if ((*column_data_it).__isset.counter_column) { use_counter= true; break; } column_data_it++; } ColumnOrSuperColumn& cs= *column_data_it; if (use_counter) { *name_len= cs.counter_column.name.size(); *name= (char*)cs.counter_column.name.c_str(); *value= (char*)&cs.counter_column.value; *value_len= sizeof(cs.counter_column.value); } else { *name_len= cs.column.name.size(); *name= (char*)cs.column.name.c_str(); *value= (char*)cs.column.value.c_str(); *value_len= cs.column.value.length(); } column_data_it++; return false; } /* Return the rowkey for the record that was read */ void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len) { *value= (char*)rowkey.c_str(); *value_len= rowkey.length(); } bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key) { get_range_slices_param_last_key_as_start_key= last_key_as_start_key; return try_operation(&Cassandra_se_impl::retryable_get_range_slices); } bool Cassandra_se_impl::retryable_get_range_slices() { bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key; ColumnParent cparent; cparent.column_family= column_family; /* SlicePredicate can be used to limit columns we will retrieve */ KeyRange key_range; key_range.__isset.start_key= true; key_range.__isset.end_key= true; if (last_key_as_start_key) { key_range.start_key= rowkey; have_rowkey_to_skip= true; rowkey_to_skip= rowkey; } else { have_rowkey_to_skip= false; key_range.start_key.assign("", 0); } key_range.end_key.assign("", 0); key_range.count= read_batch_size; cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range, read_consistency); if (key_slice_vec.size() < (uint)read_batch_size) get_slices_returned_less= true; else get_slices_returned_less= false; key_slice_it= key_slice_vec.begin(); return false; } /* Switch to next row. This may produce an error */ bool Cassandra_se_impl::get_next_range_slice_row(bool *eof) { restart: if (key_slice_it == key_slice_vec.end()) { if (get_slices_returned_less) { *eof= true; return false; } /* We have read through all columns in this batch. Try getting the next batch. */ if (get_range_slices(true)) return true; if (key_slice_vec.empty()) { *eof= true; return false; } } /* (1) - skip the last row that we have read in the previous batch. (2) - Rows that were deleted show up as rows without any columns. Skip them, like CQL does. */ if ((have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) || // (1) key_slice_it->columns.size() == 0) // (2) { key_slice_it++; goto restart; } *eof= false; column_data_vec= key_slice_it->columns; rowkey= key_slice_it->key; column_data_it= column_data_vec.begin(); key_slice_it++; return false; } void Cassandra_se_impl::finish_reading_range_slices() { key_slice_vec.clear(); } void Cassandra_se_impl::clear_read_columns() { slice_pred.column_names.clear(); } void Cassandra_se_impl::clear_read_all_columns() { slice_pred_sr.start = ""; slice_pred_sr.finish = ""; slice_pred.__set_slice_range(slice_pred_sr); } void Cassandra_se_impl::add_read_column(const char *name_arg) { std::string name(name_arg); slice_pred.__isset.column_names= true; slice_pred.column_names.push_back(name); } bool Cassandra_se_impl::truncate() { return try_operation(&Cassandra_se_impl::retryable_truncate); } bool Cassandra_se_impl::retryable_truncate() { cass->truncate(column_family); return 0; } bool Cassandra_se_impl::remove_row() { return try_operation(&Cassandra_se_impl::retryable_remove_row); } bool Cassandra_se_impl::retryable_remove_row() { ColumnPath column_path; column_path.column_family= column_family; cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency); return 0; } /* Try calling a function, catching possible Cassandra errors, and re-trying for "transient" errors. */ bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) { bool res; int n_retries= thrift_call_retries_to_do; do { res= true; try { if ((res= (this->*func_to_call)())) { /* The function call was made successfully (without timeouts, etc), but something inside it returned 'true'. This is supposedly a failure (or "not found" or other negative result). We need to return this to the caller. */ n_retries= 0; } } catch (InvalidRequestException ire) { n_retries= 0; /* there is no point in retrying this operation */ print_error("%s [%s]", ire.what(), ire.why.c_str()); } catch (UnavailableException ue) { cassandra_counters.unavailable_exceptions++; if (!--n_retries) print_error("UnavailableException: %s", ue.what()); } catch (TimedOutException te) { cassandra_counters.timeout_exceptions++; if (!--n_retries) print_error("TimedOutException: %s", te.what()); }catch(TException e){ /* todo: we may use retry for certain kinds of Thrift errors */ n_retries= 0; print_error("Thrift exception: %s", e.what()); } catch (...) { n_retries= 0; /* Don't retry */ print_error("Unknown exception"); } } while (res && n_retries > 0); return res; } ///////////////////////////////////////////////////////////////////////////// // MRR reads ///////////////////////////////////////////////////////////////////////////// void Cassandra_se_impl::new_lookup_keys() { mrr_keys.clear(); } int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len) { mrr_keys.push_back(std::string(key, key_len)); return mrr_keys.size(); } bool Cassandra_se_impl::multiget_slice() { return try_operation(&Cassandra_se_impl::retryable_multiget_slice); } bool Cassandra_se_impl::retryable_multiget_slice() { ColumnParent cparent; cparent.column_family= column_family; SlicePredicate slice_pred; SliceRange sr; sr.start = ""; sr.finish = ""; slice_pred.__set_slice_range(sr); cassandra_counters.multiget_reads++; cassandra_counters.multiget_keys_scanned += mrr_keys.size(); cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred, read_consistency); cassandra_counters.multiget_rows_read += mrr_result.size(); mrr_result_it= mrr_result.begin(); return false; } bool Cassandra_se_impl::get_next_multiget_row() { if (mrr_result_it == mrr_result.end()) return true; /* EOF */ column_data_vec= mrr_result_it->second; rowkey= mrr_result_it->first; column_data_it= column_data_vec.begin(); mrr_result_it++; return false; }