Cassandra SE: make consistency settings user-settable.

This commit is contained in:
Sergey Petrunya 2012-09-22 23:30:29 +04:00
commit c59faf95ae
5 changed files with 117 additions and 9 deletions

View file

@ -23,6 +23,7 @@ using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
void Cassandra_se_interface::print_error(const char *format, ...)
{
va_list ap;
@ -38,10 +39,13 @@ void Cassandra_se_interface::print_error(const char *format, ...)
class Cassandra_se_impl: public Cassandra_se_interface
{
CassandraClient *cass; /* Connection to cassandra */
ConsistencyLevel::type cur_consistency_level;
std::string column_family;
std::string keyspace;
ConsistencyLevel::type write_consistency;
ConsistencyLevel::type read_consistency;
/* DDL data */
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
@ -69,7 +73,9 @@ class Cassandra_se_impl: public Cassandra_se_interface
SlicePredicate slice_pred;
bool get_slices_returned_less;
public:
Cassandra_se_impl() : cass(NULL) {}
Cassandra_se_impl() : cass(NULL),
write_consistency(ConsistencyLevel::ONE),
read_consistency(ConsistencyLevel::ONE) {}
virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */
@ -81,6 +87,9 @@ public:
bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
void get_rowkey_type(char **name, char **type);
/* Settings */
void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
/* Writes */
void clear_insert_buffer();
void start_row_insert(const char *key, int key_len);
@ -166,14 +175,20 @@ bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace
print_error("Unknown exception");
}
cur_consistency_level= ConsistencyLevel::ONE;
if (!res && setup_ddl_checks())
res= true;
return res;
}
void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level,
ulong write_cons_level)
{
write_cons_level= (ConsistencyLevel::type)(write_cons_level + 1);
read_cons_level= (ConsistencyLevel::type)(read_cons_level + 1);
}
bool Cassandra_se_impl::setup_ddl_checks()
{
try {
@ -308,7 +323,7 @@ bool Cassandra_se_impl::do_insert()
try {
cass->batch_mutate(batch_mutation, cur_consistency_level);
cass->batch_mutate(batch_mutation, write_consistency);
cassandra_counters.row_inserts+= batch_mutation.size();
cassandra_counters.row_insert_batches++;
@ -356,7 +371,7 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
try {
cass->get_slice(column_data_vec, rowkey, cparent, slice_pred,
cur_consistency_level);
read_consistency);
if (column_data_vec.size() == 0)
{
@ -471,7 +486,7 @@ bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
cass->get_range_slices(key_slice_vec,
cparent, slice_pred, key_range,
cur_consistency_level);
read_consistency);
res= false;
if (key_slice_vec.size() < (uint)read_batch_size)
@ -589,7 +604,7 @@ bool Cassandra_se_impl::remove_row()
try {
cass->remove(rowkey, column_path, get_i64_timestamp(), cur_consistency_level);
cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
res= false;
} catch (InvalidRequestException ire) {
@ -643,7 +658,7 @@ bool Cassandra_se_impl::multiget_slice()
cassandra_counters.multiget_keys_scanned += mrr_keys.size();
cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
cur_consistency_level);
read_consistency);
cassandra_counters.multiget_rows_read += mrr_result.size();

View file

@ -7,6 +7,19 @@
*/
/* We need to define this here so that ha_cassandra.cc also has access to it */
typedef enum
{
ONE = 1-1,
QUORUM = 2-1,
LOCAL_QUORUM = 3-1,
EACH_QUORUM = 4-1,
ALL = 5-1,
ANY = 6-1,
TWO = 7-1,
THREE = 8-1,
} enum_cassandra_consistency_level;
/*
Interface to one cassandra column family, i.e. one 'table'
*/
@ -19,6 +32,9 @@ public:
/* Init */
virtual bool connect(const char *host, int port, const char *keyspace)=0;
virtual void set_column_family(const char *cfname) = 0;
/* Settings */
virtual void set_consistency_levels(ulong read_cons_level, ulong write_cons_level)=0;
/* Check underlying DDL */
virtual bool setup_ddl_checks()=0;

View file

@ -82,6 +82,35 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in an rnd_read (full scan) batch",
NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
/* These match values in enum_cassandra_consistency_level */
const char *cassandra_consistency_level[] =
{
"ONE",
"QUORUM",
"LOCAL_QUORUM",
"EACH_QUORUM",
"ALL",
"ANY",
"TWO",
"THREE",
NullS
};
TYPELIB cassandra_consistency_level_typelib= {
array_elements(cassandra_consistency_level) - 1, "",
cassandra_consistency_level, NULL
};
static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
"Cassandra consistency level to use for write operations", NULL, NULL,
ONE, &cassandra_consistency_level_typelib);
static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
"Cassandra consistency level to use for read operations", NULL, NULL,
ONE, &cassandra_consistency_level_typelib);
mysql_mutex_t cassandra_default_host_lock;
static char* cassandra_default_thrift_host = NULL;
static char cassandra_default_host_buf[256]="";
@ -130,6 +159,8 @@ static struct st_mysql_sys_var* cassandra_system_variables[]= {
MYSQL_SYSVAR(rnd_batch_size),
MYSQL_SYSVAR(default_thrift_host),
MYSQL_SYSVAR(write_consistency),
MYSQL_SYSVAR(read_consistency),
NULL
};
@ -1297,6 +1328,11 @@ int ha_cassandra::reset()
{
doing_insert_batch= false;
insert_lineno= 0;
if (se)
{
se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
THDVAR(table->in_use, write_consistency));
}
return 0;
}