mirror of
https://github.com/MariaDB/server.git
synced 2026-05-15 19:37:16 +02:00
10.0-base merge
This commit is contained in:
commit
ab83952f29
531 changed files with 48359 additions and 7381 deletions
68
storage/cassandra/CMakeLists.txt
Normal file
68
storage/cassandra/CMakeLists.txt
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
# use the first path that has Thrift.h included, if found
|
||||
|
||||
FIND_PATH(Thrift_INCLUDE_DIRS Thrift.h PATHS
|
||||
$ENV{THRIFT_INCLUDE} # environment variable to be used optionally
|
||||
${Thrift_INCLUDE_DIR} # this may be set
|
||||
/usr/local/include/thrift # list of additional directories to look from
|
||||
/opt/local/include/thrift
|
||||
/usr/include/thrift
|
||||
/opt/include/thrift
|
||||
)
|
||||
|
||||
# Verify that thrift linking library is found
|
||||
FIND_LIBRARY(Thrift_LIBS NAMES thrift PATHS ${Thrift_LIB_PATHS} ${Thrift_LIB})
|
||||
IF(EXISTS ${Thrift_LIBS})
|
||||
GET_FILENAME_COMPONENT(LINK_DIR ${Thrift_LIBS} PATH ABSOLUTE)
|
||||
ELSE()
|
||||
RETURN()
|
||||
ENDIF()
|
||||
|
||||
INCLUDE_DIRECTORIES(AFTER ${Thrift_INCLUDE_DIRS})
|
||||
SET(CMAKE_REQUIRED_INCLUDES ${Thrift_INCLUDE_DIRS})
|
||||
|
||||
STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
|
||||
STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
|
||||
|
||||
CHECK_CXX_SOURCE_COMPILES(
|
||||
"
|
||||
#include <Thrift.h>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
int main() {
|
||||
boost::shared_ptr<char> p(new char(10));
|
||||
return 0;
|
||||
}
|
||||
" CASSANDRASE_OK)
|
||||
|
||||
IF(CASSANDRASE_OK)
|
||||
SET(cassandra_sources
|
||||
ha_cassandra.cc
|
||||
ha_cassandra.h
|
||||
cassandra_se.h
|
||||
cassandra_se.cc
|
||||
gen-cpp/Cassandra.cpp
|
||||
gen-cpp/cassandra_types.h
|
||||
gen-cpp/cassandra_types.cpp
|
||||
gen-cpp/cassandra_constants.h
|
||||
gen-cpp/cassandra_constants.cpp
|
||||
gen-cpp/Cassandra.h)
|
||||
|
||||
LINK_DIRECTORIES(${LINK_DIR})
|
||||
|
||||
MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE MODULE_ONLY LINK_LIBRARIES thrift COMPONENT CassandraSE)
|
||||
IF (INSTALL_SYSCONFDIR)
|
||||
INSTALL(FILES cassandra.cnf DESTINATION ${INSTALL_SYSCONFDIR}/my.cnf.d
|
||||
COMPONENT CassandraSE)
|
||||
ENDIF(INSTALL_SYSCONFDIR)
|
||||
|
||||
IF(RPM)
|
||||
SET(CPACK_COMPONENT_CASSANDRASELIBRARIES_GROUP "CassandraSE" PARENT_SCOPE)
|
||||
SET(CPACK_COMPONENTS_ALL ${CPACK_COMPONENTS_ALL} CassandraSE PARENT_SCOPE)
|
||||
SET(CPACK_RPM_CassandraSE_PACKAGE_REQUIRES "MariaDB-server" PARENT_SCOPE)
|
||||
|
||||
# workarounds for cmake issues #13248 and #12864:
|
||||
SET(CPACK_RPM_CassandraSE_USER_FILELIST ${ignored} "%config(noreplace) /etc/my.cnf.d/*" PARENT_SCOPE)
|
||||
SET(CPACK_RPM_CassandraSE_PACKAGE_PROVIDES "cmake_bug_13248" PARENT_SCOPE)
|
||||
SET(CPACK_RPM_CassandraSE_PACKAGE_OBSOLETES "cmake_bug_13248" PARENT_SCOPE)
|
||||
ENDIF(RPM)
|
||||
|
||||
ENDIF(CASSANDRASE_OK)
|
||||
2
storage/cassandra/cassandra.cnf
Normal file
2
storage/cassandra/cassandra.cnf
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
[mariadb]
|
||||
plugin-load-add=ha_cassandra.so
|
||||
800
storage/cassandra/cassandra_se.cc
Normal file
800
storage/cassandra/cassandra_se.cc
Normal file
|
|
@ -0,0 +1,800 @@
|
|||
|
||||
// Cassandra includes:
|
||||
#include <inttypes.h>
|
||||
#include <netinet/in.h>
|
||||
#include <sys/time.h>
|
||||
#include <stdio.h>
|
||||
#include <stdarg.h>
|
||||
|
||||
#include "Thrift.h"
|
||||
#include "transport/TSocket.h"
|
||||
#include "transport/TTransport.h"
|
||||
#include "transport/TBufferTransports.h"
|
||||
#include "protocol/TProtocol.h"
|
||||
#include "protocol/TBinaryProtocol.h"
|
||||
#include "gen-cpp/Cassandra.h"
|
||||
// cassandra includes end
|
||||
|
||||
#include "cassandra_se.h"
|
||||
|
||||
struct st_mysql_lex_string
|
||||
{
|
||||
char *str;
|
||||
size_t length;
|
||||
};
|
||||
|
||||
using namespace std;
|
||||
using namespace apache::thrift;
|
||||
using namespace apache::thrift::transport;
|
||||
using namespace apache::thrift::protocol;
|
||||
using namespace org::apache::cassandra;
|
||||
|
||||
|
||||
/*
|
||||
Implementation of connection to one Cassandra column family (ie., table)
|
||||
*/
|
||||
class Cassandra_se_impl: public Cassandra_se_interface
|
||||
{
|
||||
CassandraClient *cass; /* Connection to cassandra */
|
||||
|
||||
std::string column_family;
|
||||
std::string keyspace;
|
||||
|
||||
ConsistencyLevel::type write_consistency;
|
||||
ConsistencyLevel::type read_consistency;
|
||||
|
||||
/* How many times to retry an operation before giving up */
|
||||
int thrift_call_retries_to_do;
|
||||
|
||||
|
||||
/* DDL data */
|
||||
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
|
||||
CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/
|
||||
std::vector<ColumnDef>::iterator column_ddl_it;
|
||||
|
||||
/* The list that was returned by the last key lookup */
|
||||
std::vector<ColumnOrSuperColumn> column_data_vec;
|
||||
std::vector<ColumnOrSuperColumn>::iterator column_data_it;
|
||||
|
||||
/* Insert preparation */
|
||||
typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation;
|
||||
typedef std::map<std::string, ColumnFamilyToMutation> KeyToCfMutationMap;
|
||||
|
||||
KeyToCfMutationMap batch_mutation; /* Prepare operation here */
|
||||
int64_t insert_timestamp;
|
||||
std::vector<Mutation>* insert_list;
|
||||
|
||||
/* Resultset we're reading */
|
||||
std::vector<KeySlice> key_slice_vec;
|
||||
std::vector<KeySlice>::iterator key_slice_it;
|
||||
|
||||
std::string rowkey; /* key of the record we're returning now */
|
||||
|
||||
SlicePredicate slice_pred;
|
||||
SliceRange slice_pred_sr;
|
||||
bool get_slices_returned_less;
|
||||
bool get_slice_found_rows;
|
||||
public:
|
||||
Cassandra_se_impl() : cass(NULL),
|
||||
write_consistency(ConsistencyLevel::ONE),
|
||||
read_consistency(ConsistencyLevel::ONE),
|
||||
thrift_call_retries_to_do(0) {}
|
||||
virtual ~Cassandra_se_impl(){ delete cass; }
|
||||
|
||||
/* Connection and DDL checks */
|
||||
bool connect(const char *host, int port, const char *keyspace);
|
||||
void set_column_family(const char *cfname) { column_family.assign(cfname); }
|
||||
|
||||
bool setup_ddl_checks();
|
||||
void first_ddl_column();
|
||||
bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
|
||||
void get_rowkey_type(char **name, char **type);
|
||||
size_t get_ddl_size();
|
||||
const char* get_default_validator();
|
||||
|
||||
/* Settings */
|
||||
void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
|
||||
|
||||
/* Writes */
|
||||
void clear_insert_buffer();
|
||||
void start_row_insert(const char *key, int key_len);
|
||||
void add_insert_column(const char *name, int name_len,
|
||||
const char *value, int value_len);
|
||||
void add_insert_delete_column(const char *name, int name_len);
|
||||
void add_row_deletion(const char *key, int key_len,
|
||||
Column_name_enumerator *col_names,
|
||||
LEX_STRING *names, uint nnames);
|
||||
|
||||
bool do_insert();
|
||||
|
||||
/* Reads, point lookups */
|
||||
bool get_slice(char *key, size_t key_len, bool *found);
|
||||
bool get_next_read_column(char **name, int *name_len,
|
||||
char **value, int *value_len );
|
||||
void get_read_rowkey(char **value, int *value_len);
|
||||
|
||||
/* Reads, multi-row scans */
|
||||
private:
|
||||
bool have_rowkey_to_skip;
|
||||
std::string rowkey_to_skip;
|
||||
|
||||
bool get_range_slices_param_last_key_as_start_key;
|
||||
public:
|
||||
bool get_range_slices(bool last_key_as_start_key);
|
||||
void finish_reading_range_slices();
|
||||
bool get_next_range_slice_row(bool *eof);
|
||||
|
||||
/* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
|
||||
void clear_read_columns();
|
||||
void clear_read_all_columns();
|
||||
void add_read_column(const char *name);
|
||||
|
||||
/* Reads, MRR scans */
|
||||
void new_lookup_keys();
|
||||
int add_lookup_key(const char *key, size_t key_len);
|
||||
bool multiget_slice();
|
||||
|
||||
bool get_next_multiget_row();
|
||||
|
||||
bool truncate();
|
||||
|
||||
bool remove_row();
|
||||
|
||||
private:
|
||||
bool retryable_truncate();
|
||||
bool retryable_do_insert();
|
||||
bool retryable_remove_row();
|
||||
bool retryable_setup_ddl_checks();
|
||||
bool retryable_multiget_slice();
|
||||
bool retryable_get_range_slices();
|
||||
bool retryable_get_slice();
|
||||
|
||||
std::vector<std::string> mrr_keys; /* can we use allocator to put these into MRR buffer? */
|
||||
std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
|
||||
std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
|
||||
|
||||
/* Non-inherited utility functions: */
|
||||
int64_t get_i64_timestamp();
|
||||
|
||||
typedef bool (Cassandra_se_impl::*retryable_func_t)();
|
||||
bool try_operation(retryable_func_t func);
|
||||
};
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// Connection and setup
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
Cassandra_se_interface *create_cassandra_se()
|
||||
{
|
||||
return new Cassandra_se_impl;
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg)
|
||||
{
|
||||
bool res= true;
|
||||
|
||||
keyspace.assign(keyspace_arg);
|
||||
|
||||
try {
|
||||
boost::shared_ptr<TTransport> socket =
|
||||
boost::shared_ptr<TSocket>(new TSocket(host, port));
|
||||
boost::shared_ptr<TTransport> tr =
|
||||
boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
|
||||
boost::shared_ptr<TProtocol> p =
|
||||
boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
|
||||
|
||||
cass= new CassandraClient(p);
|
||||
tr->open();
|
||||
cass->set_keyspace(keyspace_arg);
|
||||
|
||||
res= false; // success
|
||||
}catch(TTransportException te){
|
||||
print_error("%s [%d]", te.what(), te.getType());
|
||||
}catch(InvalidRequestException ire){
|
||||
print_error("%s [%s]", ire.what(), ire.why.c_str());
|
||||
}catch(NotFoundException nfe){
|
||||
print_error("%s", nfe.what());
|
||||
}catch(TException e){
|
||||
print_error("Thrift exception: %s", e.what());
|
||||
}catch (...) {
|
||||
print_error("Unknown exception");
|
||||
}
|
||||
|
||||
if (!res && setup_ddl_checks())
|
||||
res= true;
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::set_consistency_levels(ulong read_cons_level,
|
||||
ulong write_cons_level)
|
||||
{
|
||||
write_cons_level= (ConsistencyLevel::type)(write_cons_level + 1);
|
||||
read_cons_level= (ConsistencyLevel::type)(read_cons_level + 1);
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::retryable_setup_ddl_checks()
|
||||
{
|
||||
try {
|
||||
|
||||
cass->describe_keyspace(ks_def, keyspace);
|
||||
|
||||
} catch (NotFoundException nfe) {
|
||||
print_error("keyspace `%s` not found: %s", keyspace.c_str(), nfe.what());
|
||||
return true;
|
||||
}
|
||||
|
||||
std::vector<CfDef>::iterator it;
|
||||
for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
|
||||
{
|
||||
cf_def= *it;
|
||||
if (!cf_def.name.compare(column_family))
|
||||
return false;
|
||||
}
|
||||
|
||||
print_error("Column family %s not found in keyspace %s",
|
||||
column_family.c_str(),
|
||||
keyspace.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Cassandra_se_impl::setup_ddl_checks()
|
||||
{
|
||||
return try_operation(&Cassandra_se_impl::retryable_setup_ddl_checks);
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::first_ddl_column()
|
||||
{
|
||||
column_ddl_it= cf_def.column_metadata.begin();
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len,
|
||||
char **type, int *type_len)
|
||||
{
|
||||
if (column_ddl_it == cf_def.column_metadata.end())
|
||||
return true;
|
||||
|
||||
*name= (char*)(*column_ddl_it).name.c_str();
|
||||
*name_len= (*column_ddl_it).name.length();
|
||||
|
||||
*type= (char*)(*column_ddl_it).validation_class.c_str();
|
||||
*type_len= (*column_ddl_it).validation_class.length();
|
||||
|
||||
column_ddl_it++;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::get_rowkey_type(char **name, char **type)
|
||||
{
|
||||
if (cf_def.__isset.key_validation_class)
|
||||
*type= (char*)cf_def.key_validation_class.c_str();
|
||||
else
|
||||
*type= NULL;
|
||||
|
||||
if (cf_def.__isset.key_alias)
|
||||
*name= (char*)cf_def.key_alias.c_str();
|
||||
else
|
||||
*name= NULL;
|
||||
}
|
||||
|
||||
size_t Cassandra_se_impl::get_ddl_size()
|
||||
{
|
||||
return cf_def.column_metadata.size();
|
||||
}
|
||||
|
||||
const char* Cassandra_se_impl::get_default_validator()
|
||||
{
|
||||
return cf_def.default_validation_class.c_str();
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// Data writes
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
int64_t Cassandra_se_impl::get_i64_timestamp()
|
||||
{
|
||||
struct timeval td;
|
||||
gettimeofday(&td, NULL);
|
||||
int64_t ms = td.tv_sec;
|
||||
ms = ms * 1000;
|
||||
int64_t usec = td.tv_usec;
|
||||
usec = usec / 1000;
|
||||
ms += usec;
|
||||
|
||||
return ms;
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::clear_insert_buffer()
|
||||
{
|
||||
batch_mutation.clear();
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::start_row_insert(const char *key, int key_len)
|
||||
{
|
||||
std::string key_to_insert;
|
||||
key_to_insert.assign(key, key_len);
|
||||
batch_mutation[key_to_insert]= ColumnFamilyToMutation();
|
||||
ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
|
||||
|
||||
cf_mut[column_family]= std::vector<Mutation>();
|
||||
insert_list= &cf_mut[column_family];
|
||||
|
||||
insert_timestamp= get_i64_timestamp();
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::add_row_deletion(const char *key, int key_len,
|
||||
Column_name_enumerator *col_names,
|
||||
LEX_STRING *names, uint nnames)
|
||||
{
|
||||
std::string key_to_delete;
|
||||
key_to_delete.assign(key, key_len);
|
||||
|
||||
batch_mutation[key_to_delete]= ColumnFamilyToMutation();
|
||||
ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_delete];
|
||||
|
||||
cf_mut[column_family]= std::vector<Mutation>();
|
||||
std::vector<Mutation> &mutation_list= cf_mut[column_family];
|
||||
|
||||
Mutation mut;
|
||||
mut.__isset.deletion= true;
|
||||
mut.deletion.__isset.timestamp= true;
|
||||
mut.deletion.timestamp= get_i64_timestamp();
|
||||
mut.deletion.__isset.predicate= true;
|
||||
|
||||
/*
|
||||
Attempting to delete columns with SliceRange causes exception with message
|
||||
"Deletion does not yet support SliceRange predicates".
|
||||
|
||||
Delete all columns individually.
|
||||
*/
|
||||
SlicePredicate slice_pred;
|
||||
slice_pred.__isset.column_names= true;
|
||||
const char *col_name;
|
||||
while ((col_name= col_names->get_next_name()))
|
||||
slice_pred.column_names.push_back(std::string(col_name));
|
||||
for (uint i= 0; i < nnames; i++)
|
||||
slice_pred.column_names.push_back(std::string(names[i].str,
|
||||
names[i].length));
|
||||
|
||||
mut.deletion.predicate= slice_pred;
|
||||
|
||||
mutation_list.push_back(mut);
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::add_insert_column(const char *name,
|
||||
int name_len,
|
||||
const char *value,
|
||||
int value_len)
|
||||
{
|
||||
Mutation mut;
|
||||
mut.__isset.column_or_supercolumn= true;
|
||||
mut.column_or_supercolumn.__isset.column= true;
|
||||
|
||||
Column& col=mut.column_or_supercolumn.column;
|
||||
if (name_len)
|
||||
col.name.assign(name, name_len);
|
||||
else
|
||||
col.name.assign(name);
|
||||
col.value.assign(value, value_len);
|
||||
col.timestamp= insert_timestamp;
|
||||
col.__isset.value= true;
|
||||
col.__isset.timestamp= true;
|
||||
insert_list->push_back(mut);
|
||||
}
|
||||
|
||||
void Cassandra_se_impl::add_insert_delete_column(const char *name,
|
||||
int name_len)
|
||||
{
|
||||
Mutation mut;
|
||||
mut.__isset.deletion= true;
|
||||
mut.deletion.__isset.timestamp= true;
|
||||
mut.deletion.timestamp= insert_timestamp;
|
||||
mut.deletion.__isset.predicate= true;
|
||||
|
||||
SlicePredicate slice_pred;
|
||||
slice_pred.__isset.column_names= true;
|
||||
slice_pred.column_names.push_back(std::string(name, name_len));
|
||||
mut.deletion.predicate= slice_pred;
|
||||
|
||||
insert_list->push_back(mut);
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::retryable_do_insert()
|
||||
{
|
||||
cass->batch_mutate(batch_mutation, write_consistency);
|
||||
|
||||
cassandra_counters.row_inserts+= batch_mutation.size();
|
||||
cassandra_counters.row_insert_batches++;
|
||||
|
||||
clear_insert_buffer();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::do_insert()
|
||||
{
|
||||
/*
|
||||
zero-size mutations are allowed by Cassandra's batch_mutate but lets not
|
||||
do them (we may attempt to do it if there is a bulk insert that stores
|
||||
exactly @@cassandra_insert_batch_size*n elements.
|
||||
*/
|
||||
if (batch_mutation.empty())
|
||||
return false;
|
||||
|
||||
return try_operation(&Cassandra_se_impl::retryable_do_insert);
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// Reading data
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/*
|
||||
Make one key lookup. If the record is found, the result is stored locally and
|
||||
the caller should iterate over it.
|
||||
*/
|
||||
|
||||
bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
|
||||
{
|
||||
bool res;
|
||||
rowkey.assign(key, key_len);
|
||||
|
||||
if (!(res= try_operation(&Cassandra_se_impl::retryable_get_slice)))
|
||||
*found= get_slice_found_rows;
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::retryable_get_slice()
|
||||
{
|
||||
ColumnParent cparent;
|
||||
cparent.column_family= column_family;
|
||||
|
||||
SlicePredicate slice_pred;
|
||||
SliceRange sr;
|
||||
sr.start = "";
|
||||
sr.finish = "";
|
||||
slice_pred.__set_slice_range(sr);
|
||||
|
||||
cass->get_slice(column_data_vec, rowkey, cparent, slice_pred,
|
||||
read_consistency);
|
||||
|
||||
if (column_data_vec.size() == 0)
|
||||
{
|
||||
/*
|
||||
No columns found. Cassandra doesn't allow records without any column =>
|
||||
this means the seach key doesn't exist
|
||||
*/
|
||||
get_slice_found_rows= false;
|
||||
return false;
|
||||
}
|
||||
get_slice_found_rows= true;
|
||||
|
||||
column_data_it= column_data_vec.begin();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::get_next_read_column(char **name, int *name_len,
|
||||
char **value, int *value_len)
|
||||
{
|
||||
bool use_counter=false;
|
||||
while (1)
|
||||
{
|
||||
if (column_data_it == column_data_vec.end())
|
||||
return true;
|
||||
|
||||
if ((*column_data_it).__isset.column)
|
||||
break; /* Ok it's a real column. Should be always the case. */
|
||||
|
||||
if ((*column_data_it).__isset.counter_column)
|
||||
{
|
||||
use_counter= true;
|
||||
break;
|
||||
}
|
||||
|
||||
column_data_it++;
|
||||
}
|
||||
|
||||
ColumnOrSuperColumn& cs= *column_data_it;
|
||||
if (use_counter)
|
||||
{
|
||||
*name_len= cs.counter_column.name.size();
|
||||
*name= (char*)cs.counter_column.name.c_str();
|
||||
*value= (char*)&cs.counter_column.value;
|
||||
*value_len= sizeof(cs.counter_column.value);
|
||||
}
|
||||
else
|
||||
{
|
||||
*name_len= cs.column.name.size();
|
||||
*name= (char*)cs.column.name.c_str();
|
||||
*value= (char*)cs.column.value.c_str();
|
||||
*value_len= cs.column.value.length();
|
||||
}
|
||||
|
||||
column_data_it++;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/* Return the rowkey for the record that was read */
|
||||
|
||||
void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len)
|
||||
{
|
||||
*value= (char*)rowkey.c_str();
|
||||
*value_len= rowkey.length();
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
|
||||
{
|
||||
get_range_slices_param_last_key_as_start_key= last_key_as_start_key;
|
||||
|
||||
return try_operation(&Cassandra_se_impl::retryable_get_range_slices);
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::retryable_get_range_slices()
|
||||
{
|
||||
bool last_key_as_start_key= get_range_slices_param_last_key_as_start_key;
|
||||
|
||||
ColumnParent cparent;
|
||||
cparent.column_family= column_family;
|
||||
|
||||
/* SlicePredicate can be used to limit columns we will retrieve */
|
||||
|
||||
KeyRange key_range;
|
||||
key_range.__isset.start_key= true;
|
||||
key_range.__isset.end_key= true;
|
||||
|
||||
if (last_key_as_start_key)
|
||||
{
|
||||
key_range.start_key= rowkey;
|
||||
|
||||
have_rowkey_to_skip= true;
|
||||
rowkey_to_skip= rowkey;
|
||||
}
|
||||
else
|
||||
{
|
||||
have_rowkey_to_skip= false;
|
||||
key_range.start_key.assign("", 0);
|
||||
}
|
||||
|
||||
key_range.end_key.assign("", 0);
|
||||
key_range.count= read_batch_size;
|
||||
|
||||
cass->get_range_slices(key_slice_vec, cparent, slice_pred, key_range,
|
||||
read_consistency);
|
||||
|
||||
if (key_slice_vec.size() < (uint)read_batch_size)
|
||||
get_slices_returned_less= true;
|
||||
else
|
||||
get_slices_returned_less= false;
|
||||
|
||||
key_slice_it= key_slice_vec.begin();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/* Switch to next row. This may produce an error */
|
||||
bool Cassandra_se_impl::get_next_range_slice_row(bool *eof)
|
||||
{
|
||||
restart:
|
||||
if (key_slice_it == key_slice_vec.end())
|
||||
{
|
||||
if (get_slices_returned_less)
|
||||
{
|
||||
*eof= true;
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
We have read through all columns in this batch. Try getting the next
|
||||
batch.
|
||||
*/
|
||||
if (get_range_slices(true))
|
||||
return true;
|
||||
|
||||
if (key_slice_vec.empty())
|
||||
{
|
||||
*eof= true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
(1) - skip the last row that we have read in the previous batch.
|
||||
(2) - Rows that were deleted show up as rows without any columns. Skip
|
||||
them, like CQL does.
|
||||
*/
|
||||
if ((have_rowkey_to_skip && !rowkey_to_skip.compare(key_slice_it->key)) || // (1)
|
||||
key_slice_it->columns.size() == 0) // (2)
|
||||
{
|
||||
key_slice_it++;
|
||||
goto restart;
|
||||
}
|
||||
|
||||
*eof= false;
|
||||
column_data_vec= key_slice_it->columns;
|
||||
rowkey= key_slice_it->key;
|
||||
column_data_it= column_data_vec.begin();
|
||||
key_slice_it++;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::finish_reading_range_slices()
|
||||
{
|
||||
key_slice_vec.clear();
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::clear_read_columns()
|
||||
{
|
||||
slice_pred.column_names.clear();
|
||||
}
|
||||
|
||||
void Cassandra_se_impl::clear_read_all_columns()
|
||||
{
|
||||
slice_pred_sr.start = "";
|
||||
slice_pred_sr.finish = "";
|
||||
slice_pred.__set_slice_range(slice_pred_sr);
|
||||
}
|
||||
|
||||
|
||||
void Cassandra_se_impl::add_read_column(const char *name_arg)
|
||||
{
|
||||
std::string name(name_arg);
|
||||
slice_pred.__isset.column_names= true;
|
||||
slice_pred.column_names.push_back(name);
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::truncate()
|
||||
{
|
||||
return try_operation(&Cassandra_se_impl::retryable_truncate);
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::retryable_truncate()
|
||||
{
|
||||
cass->truncate(column_family);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::remove_row()
|
||||
{
|
||||
return try_operation(&Cassandra_se_impl::retryable_remove_row);
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::retryable_remove_row()
|
||||
{
|
||||
ColumnPath column_path;
|
||||
column_path.column_family= column_family;
|
||||
cass->remove(rowkey, column_path, get_i64_timestamp(), write_consistency);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
Try calling a function, catching possible Cassandra errors, and re-trying
|
||||
for "transient" errors.
|
||||
*/
|
||||
bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
|
||||
{
|
||||
bool res;
|
||||
int n_retries= thrift_call_retries_to_do;
|
||||
|
||||
do
|
||||
{
|
||||
res= true;
|
||||
|
||||
try {
|
||||
|
||||
if ((res= (this->*func_to_call)()))
|
||||
{
|
||||
/*
|
||||
The function call was made successfully (without timeouts, etc),
|
||||
but something inside it returned 'true'.
|
||||
This is supposedly a failure (or "not found" or other negative
|
||||
result). We need to return this to the caller.
|
||||
*/
|
||||
n_retries= 0;
|
||||
}
|
||||
|
||||
} catch (InvalidRequestException ire) {
|
||||
n_retries= 0; /* there is no point in retrying this operation */
|
||||
print_error("%s [%s]", ire.what(), ire.why.c_str());
|
||||
} catch (UnavailableException ue) {
|
||||
cassandra_counters.unavailable_exceptions++;
|
||||
if (!--n_retries)
|
||||
print_error("UnavailableException: %s", ue.what());
|
||||
} catch (TimedOutException te) {
|
||||
cassandra_counters.timeout_exceptions++;
|
||||
if (!--n_retries)
|
||||
print_error("TimedOutException: %s", te.what());
|
||||
}catch(TException e){
|
||||
/* todo: we may use retry for certain kinds of Thrift errors */
|
||||
n_retries= 0;
|
||||
print_error("Thrift exception: %s", e.what());
|
||||
} catch (...) {
|
||||
n_retries= 0; /* Don't retry */
|
||||
print_error("Unknown exception");
|
||||
}
|
||||
|
||||
} while (res && n_retries > 0);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// MRR reads
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void Cassandra_se_impl::new_lookup_keys()
|
||||
{
|
||||
mrr_keys.clear();
|
||||
}
|
||||
|
||||
|
||||
int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len)
|
||||
{
|
||||
mrr_keys.push_back(std::string(key, key_len));
|
||||
return mrr_keys.size();
|
||||
}
|
||||
|
||||
bool Cassandra_se_impl::multiget_slice()
|
||||
{
|
||||
return try_operation(&Cassandra_se_impl::retryable_multiget_slice);
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::retryable_multiget_slice()
|
||||
{
|
||||
ColumnParent cparent;
|
||||
cparent.column_family= column_family;
|
||||
|
||||
SlicePredicate slice_pred;
|
||||
SliceRange sr;
|
||||
sr.start = "";
|
||||
sr.finish = "";
|
||||
slice_pred.__set_slice_range(sr);
|
||||
|
||||
cassandra_counters.multiget_reads++;
|
||||
cassandra_counters.multiget_keys_scanned += mrr_keys.size();
|
||||
cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
|
||||
read_consistency);
|
||||
|
||||
cassandra_counters.multiget_rows_read += mrr_result.size();
|
||||
mrr_result_it= mrr_result.begin();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
bool Cassandra_se_impl::get_next_multiget_row()
|
||||
{
|
||||
if (mrr_result_it == mrr_result.end())
|
||||
return true; /* EOF */
|
||||
|
||||
column_data_vec= mrr_result_it->second;
|
||||
rowkey= mrr_result_it->first;
|
||||
|
||||
column_data_it= column_data_vec.begin();
|
||||
mrr_result_it++;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
123
storage/cassandra/cassandra_se.h
Normal file
123
storage/cassandra/cassandra_se.h
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
|
||||
/*
|
||||
This file is a "bridge" interface between cassandra+Thrift and MariaDB.
|
||||
|
||||
It is #included by both sides, so it must itself include neither (including
|
||||
both together causes compile errors due to conflicts).
|
||||
*/
|
||||
|
||||
struct st_mysql_lex_string;
|
||||
typedef struct st_mysql_lex_string LEX_STRING;
|
||||
|
||||
/* We need to define this here so that ha_cassandra.cc also has access to it */
|
||||
typedef enum
|
||||
{
|
||||
ONE = 1-1,
|
||||
QUORUM = 2-1,
|
||||
LOCAL_QUORUM = 3-1,
|
||||
EACH_QUORUM = 4-1,
|
||||
ALL = 5-1,
|
||||
ANY = 6-1,
|
||||
TWO = 7-1,
|
||||
THREE = 8-1,
|
||||
} enum_cassandra_consistency_level;
|
||||
|
||||
|
||||
class Column_name_enumerator
|
||||
{
|
||||
public:
|
||||
virtual const char* get_next_name()=0;
|
||||
virtual ~Column_name_enumerator(){}
|
||||
};
|
||||
|
||||
/*
|
||||
Interface to one cassandra column family, i.e. one 'table'
|
||||
*/
|
||||
class Cassandra_se_interface
|
||||
{
|
||||
public:
|
||||
Cassandra_se_interface() { err_buffer[0]=0; }
|
||||
|
||||
virtual ~Cassandra_se_interface(){};
|
||||
/* Init */
|
||||
virtual bool connect(const char *host, int port, const char *keyspace)=0;
|
||||
virtual void set_column_family(const char *cfname) = 0;
|
||||
|
||||
/* Settings */
|
||||
virtual void set_consistency_levels(ulong read_cons_level, ulong write_cons_level)=0;
|
||||
|
||||
/* Check underlying DDL */
|
||||
virtual bool setup_ddl_checks()=0;
|
||||
virtual void first_ddl_column()=0;
|
||||
virtual bool next_ddl_column(char **name, int *name_len, char **value,
|
||||
int *value_len)=0;
|
||||
virtual void get_rowkey_type(char **name, char **type)=0;
|
||||
virtual size_t get_ddl_size()=0;
|
||||
virtual const char* get_default_validator()=0;
|
||||
|
||||
/* Writes */
|
||||
virtual void clear_insert_buffer()=0;
|
||||
virtual void add_row_deletion(const char *key, int key_len,
|
||||
Column_name_enumerator *col_names,
|
||||
LEX_STRING *names, uint nnames)=0;
|
||||
virtual void start_row_insert(const char *key, int key_len)=0;
|
||||
virtual void add_insert_delete_column(const char *name, int name_len)= 0;
|
||||
virtual void add_insert_column(const char *name, int name_len,
|
||||
const char *value,
|
||||
int value_len)=0;
|
||||
virtual bool do_insert()=0;
|
||||
|
||||
/* Reads */
|
||||
virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
|
||||
virtual bool get_next_read_column(char **name, int *name_len,
|
||||
char **value, int *value_len)=0;
|
||||
virtual void get_read_rowkey(char **value, int *value_len)=0;
|
||||
|
||||
/* Reads, multi-row scans */
|
||||
int read_batch_size;
|
||||
virtual bool get_range_slices(bool last_key_as_start_key)=0;
|
||||
virtual void finish_reading_range_slices()=0;
|
||||
virtual bool get_next_range_slice_row(bool *eof)=0;
|
||||
|
||||
/* Reads, MRR scans */
|
||||
virtual void new_lookup_keys()=0;
|
||||
virtual int add_lookup_key(const char *key, size_t key_len)=0;
|
||||
virtual bool multiget_slice()=0;
|
||||
virtual bool get_next_multiget_row()=0;
|
||||
|
||||
/* read_set setup */
|
||||
virtual void clear_read_columns()=0;
|
||||
virtual void clear_read_all_columns()=0;
|
||||
virtual void add_read_column(const char *name)=0;
|
||||
|
||||
virtual bool truncate()=0;
|
||||
virtual bool remove_row()=0;
|
||||
|
||||
/* Passing error messages up to ha_cassandra */
|
||||
char err_buffer[512];
|
||||
const char *error_str() { return err_buffer; }
|
||||
void print_error(const char *format, ...);
|
||||
};
|
||||
|
||||
|
||||
/* A structure with global counters */
|
||||
class Cassandra_status_vars
|
||||
{
|
||||
public:
|
||||
ulong row_inserts;
|
||||
ulong row_insert_batches;
|
||||
|
||||
ulong multiget_reads;
|
||||
ulong multiget_keys_scanned;
|
||||
ulong multiget_rows_read;
|
||||
|
||||
ulong timeout_exceptions;
|
||||
ulong unavailable_exceptions;
|
||||
};
|
||||
|
||||
|
||||
extern Cassandra_status_vars cassandra_counters;
|
||||
|
||||
|
||||
Cassandra_se_interface *create_cassandra_se();
|
||||
|
||||
12871
storage/cassandra/gen-cpp/Cassandra.cpp
Normal file
12871
storage/cassandra/gen-cpp/Cassandra.cpp
Normal file
File diff suppressed because it is too large
Load diff
5466
storage/cassandra/gen-cpp/Cassandra.h
Normal file
5466
storage/cassandra/gen-cpp/Cassandra.h
Normal file
File diff suppressed because it is too large
Load diff
219
storage/cassandra/gen-cpp/Cassandra_server.skeleton.cpp
Normal file
219
storage/cassandra/gen-cpp/Cassandra_server.skeleton.cpp
Normal file
|
|
@ -0,0 +1,219 @@
|
|||
// This autogenerated skeleton file illustrates how to build a server.
|
||||
// You should copy it to another filename to avoid overwriting it.
|
||||
|
||||
#include "Cassandra.h"
|
||||
#include <protocol/TBinaryProtocol.h>
|
||||
#include <server/TSimpleServer.h>
|
||||
#include <transport/TServerSocket.h>
|
||||
#include <transport/TBufferTransports.h>
|
||||
|
||||
using namespace ::apache::thrift;
|
||||
using namespace ::apache::thrift::protocol;
|
||||
using namespace ::apache::thrift::transport;
|
||||
using namespace ::apache::thrift::server;
|
||||
|
||||
using boost::shared_ptr;
|
||||
|
||||
using namespace ::org::apache::cassandra;
|
||||
|
||||
class CassandraHandler : virtual public CassandraIf {
|
||||
public:
|
||||
CassandraHandler() {
|
||||
// Your initialization goes here
|
||||
}
|
||||
|
||||
void login(const AuthenticationRequest& auth_request) {
|
||||
// Your implementation goes here
|
||||
printf("login\n");
|
||||
}
|
||||
|
||||
void set_keyspace(const std::string& keyspace) {
|
||||
// Your implementation goes here
|
||||
printf("set_keyspace\n");
|
||||
}
|
||||
|
||||
void get(ColumnOrSuperColumn& _return, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("get\n");
|
||||
}
|
||||
|
||||
void get_slice(std::vector<ColumnOrSuperColumn> & _return, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("get_slice\n");
|
||||
}
|
||||
|
||||
int32_t get_count(const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("get_count\n");
|
||||
}
|
||||
|
||||
void multiget_slice(std::map<std::string, std::vector<ColumnOrSuperColumn> > & _return, const std::vector<std::string> & keys, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("multiget_slice\n");
|
||||
}
|
||||
|
||||
void multiget_count(std::map<std::string, int32_t> & _return, const std::vector<std::string> & keys, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("multiget_count\n");
|
||||
}
|
||||
|
||||
void get_range_slices(std::vector<KeySlice> & _return, const ColumnParent& column_parent, const SlicePredicate& predicate, const KeyRange& range, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("get_range_slices\n");
|
||||
}
|
||||
|
||||
void get_paged_slice(std::vector<KeySlice> & _return, const std::string& column_family, const KeyRange& range, const std::string& start_column, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("get_paged_slice\n");
|
||||
}
|
||||
|
||||
void get_indexed_slices(std::vector<KeySlice> & _return, const ColumnParent& column_parent, const IndexClause& index_clause, const SlicePredicate& column_predicate, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("get_indexed_slices\n");
|
||||
}
|
||||
|
||||
void insert(const std::string& key, const ColumnParent& column_parent, const Column& column, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("insert\n");
|
||||
}
|
||||
|
||||
void add(const std::string& key, const ColumnParent& column_parent, const CounterColumn& column, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("add\n");
|
||||
}
|
||||
|
||||
void remove(const std::string& key, const ColumnPath& column_path, const int64_t timestamp, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("remove\n");
|
||||
}
|
||||
|
||||
void remove_counter(const std::string& key, const ColumnPath& path, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("remove_counter\n");
|
||||
}
|
||||
|
||||
void batch_mutate(const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
|
||||
// Your implementation goes here
|
||||
printf("batch_mutate\n");
|
||||
}
|
||||
|
||||
void truncate(const std::string& cfname) {
|
||||
// Your implementation goes here
|
||||
printf("truncate\n");
|
||||
}
|
||||
|
||||
void describe_schema_versions(std::map<std::string, std::vector<std::string> > & _return) {
|
||||
// Your implementation goes here
|
||||
printf("describe_schema_versions\n");
|
||||
}
|
||||
|
||||
void describe_keyspaces(std::vector<KsDef> & _return) {
|
||||
// Your implementation goes here
|
||||
printf("describe_keyspaces\n");
|
||||
}
|
||||
|
||||
void describe_cluster_name(std::string& _return) {
|
||||
// Your implementation goes here
|
||||
printf("describe_cluster_name\n");
|
||||
}
|
||||
|
||||
void describe_version(std::string& _return) {
|
||||
// Your implementation goes here
|
||||
printf("describe_version\n");
|
||||
}
|
||||
|
||||
void describe_ring(std::vector<TokenRange> & _return, const std::string& keyspace) {
|
||||
// Your implementation goes here
|
||||
printf("describe_ring\n");
|
||||
}
|
||||
|
||||
void describe_token_map(std::map<std::string, std::string> & _return) {
|
||||
// Your implementation goes here
|
||||
printf("describe_token_map\n");
|
||||
}
|
||||
|
||||
void describe_partitioner(std::string& _return) {
|
||||
// Your implementation goes here
|
||||
printf("describe_partitioner\n");
|
||||
}
|
||||
|
||||
void describe_snitch(std::string& _return) {
|
||||
// Your implementation goes here
|
||||
printf("describe_snitch\n");
|
||||
}
|
||||
|
||||
void describe_keyspace(KsDef& _return, const std::string& keyspace) {
|
||||
// Your implementation goes here
|
||||
printf("describe_keyspace\n");
|
||||
}
|
||||
|
||||
void describe_splits(std::vector<std::string> & _return, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
|
||||
// Your implementation goes here
|
||||
printf("describe_splits\n");
|
||||
}
|
||||
|
||||
void system_add_column_family(std::string& _return, const CfDef& cf_def) {
|
||||
// Your implementation goes here
|
||||
printf("system_add_column_family\n");
|
||||
}
|
||||
|
||||
void system_drop_column_family(std::string& _return, const std::string& column_family) {
|
||||
// Your implementation goes here
|
||||
printf("system_drop_column_family\n");
|
||||
}
|
||||
|
||||
void system_add_keyspace(std::string& _return, const KsDef& ks_def) {
|
||||
// Your implementation goes here
|
||||
printf("system_add_keyspace\n");
|
||||
}
|
||||
|
||||
void system_drop_keyspace(std::string& _return, const std::string& keyspace) {
|
||||
// Your implementation goes here
|
||||
printf("system_drop_keyspace\n");
|
||||
}
|
||||
|
||||
void system_update_keyspace(std::string& _return, const KsDef& ks_def) {
|
||||
// Your implementation goes here
|
||||
printf("system_update_keyspace\n");
|
||||
}
|
||||
|
||||
void system_update_column_family(std::string& _return, const CfDef& cf_def) {
|
||||
// Your implementation goes here
|
||||
printf("system_update_column_family\n");
|
||||
}
|
||||
|
||||
void execute_cql_query(CqlResult& _return, const std::string& query, const Compression::type compression) {
|
||||
// Your implementation goes here
|
||||
printf("execute_cql_query\n");
|
||||
}
|
||||
|
||||
void prepare_cql_query(CqlPreparedResult& _return, const std::string& query, const Compression::type compression) {
|
||||
// Your implementation goes here
|
||||
printf("prepare_cql_query\n");
|
||||
}
|
||||
|
||||
void execute_prepared_cql_query(CqlResult& _return, const int32_t itemId, const std::vector<std::string> & values) {
|
||||
// Your implementation goes here
|
||||
printf("execute_prepared_cql_query\n");
|
||||
}
|
||||
|
||||
void set_cql_version(const std::string& version) {
|
||||
// Your implementation goes here
|
||||
printf("set_cql_version\n");
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
int port = 9090;
|
||||
shared_ptr<CassandraHandler> handler(new CassandraHandler());
|
||||
shared_ptr<TProcessor> processor(new CassandraProcessor(handler));
|
||||
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
|
||||
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
|
||||
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
|
||||
|
||||
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
|
||||
server.serve();
|
||||
return 0;
|
||||
}
|
||||
|
||||
18
storage/cassandra/gen-cpp/cassandra_constants.cpp
Normal file
18
storage/cassandra/gen-cpp/cassandra_constants.cpp
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Autogenerated by Thrift Compiler (0.8.0)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
*/
|
||||
#include "cassandra_constants.h"
|
||||
|
||||
namespace org { namespace apache { namespace cassandra {
|
||||
|
||||
const cassandraConstants g_cassandra_constants;
|
||||
|
||||
cassandraConstants::cassandraConstants() {
|
||||
cassandra_const_VERSION = (char *)"19.32.0";
|
||||
}
|
||||
|
||||
}}} // namespace
|
||||
|
||||
26
storage/cassandra/gen-cpp/cassandra_constants.h
Normal file
26
storage/cassandra/gen-cpp/cassandra_constants.h
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
/**
|
||||
* Autogenerated by Thrift Compiler (0.8.0)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
*/
|
||||
#ifndef cassandra_CONSTANTS_H
|
||||
#define cassandra_CONSTANTS_H
|
||||
|
||||
#include "cassandra_types.h"
|
||||
|
||||
namespace org { namespace apache { namespace cassandra {
|
||||
|
||||
class cassandraConstants {
|
||||
public:
|
||||
cassandraConstants();
|
||||
|
||||
// std::string VERSION;
|
||||
char* cassandra_const_VERSION;
|
||||
};
|
||||
|
||||
extern const cassandraConstants g_cassandra_constants;
|
||||
|
||||
}}} // namespace
|
||||
|
||||
#endif
|
||||
3512
storage/cassandra/gen-cpp/cassandra_types.cpp
Normal file
3512
storage/cassandra/gen-cpp/cassandra_types.cpp
Normal file
File diff suppressed because it is too large
Load diff
2149
storage/cassandra/gen-cpp/cassandra_types.h
Normal file
2149
storage/cassandra/gen-cpp/cassandra_types.h
Normal file
File diff suppressed because it is too large
Load diff
2619
storage/cassandra/ha_cassandra.cc
Normal file
2619
storage/cassandra/ha_cassandra.cc
Normal file
File diff suppressed because it is too large
Load diff
275
storage/cassandra/ha_cassandra.h
Normal file
275
storage/cassandra/ha_cassandra.h
Normal file
|
|
@ -0,0 +1,275 @@
|
|||
/*
|
||||
Copyright (c) 2012, Monty Program Ab
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
#ifdef USE_PRAGMA_INTERFACE
|
||||
#pragma interface /* gcc class implementation */
|
||||
#endif
|
||||
|
||||
|
||||
#include "my_global.h" /* ulonglong */
|
||||
#include "thr_lock.h" /* THR_LOCK, THR_LOCK_DATA */
|
||||
#include "handler.h" /* handler */
|
||||
#include "my_base.h" /* ha_rows */
|
||||
|
||||
#include "cassandra_se.h"
|
||||
|
||||
/** @brief
|
||||
CASSANDRA_SHARE is a structure that will be shared among all open handlers.
|
||||
This example implements the minimum of what you will probably need.
|
||||
*/
|
||||
typedef struct st_cassandra_share {
|
||||
char *table_name;
|
||||
uint table_name_length,use_count;
|
||||
mysql_mutex_t mutex;
|
||||
THR_LOCK lock;
|
||||
} CASSANDRA_SHARE;
|
||||
|
||||
class ColumnDataConverter;
|
||||
struct st_dynamic_column_value;
|
||||
typedef struct st_dynamic_column_value DYNAMIC_COLUMN_VALUE;
|
||||
|
||||
struct ha_table_option_struct;
|
||||
|
||||
|
||||
struct st_dynamic_column_value;
|
||||
|
||||
typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
|
||||
int cass_data_len,
|
||||
struct st_dynamic_column_value *value,
|
||||
MEM_ROOT *mem_root);
|
||||
typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value,
|
||||
char **cass_data,
|
||||
int *cass_data_len,
|
||||
void *buf, void **freemem);
|
||||
struct cassandra_type_def
|
||||
{
|
||||
const char *name;
|
||||
CAS2DYN_CONVERTER cassandra_to_dynamic;
|
||||
DYN2CAS_CONVERTER dynamic_to_cassandra;
|
||||
};
|
||||
|
||||
typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;
|
||||
|
||||
enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
|
||||
CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT,
|
||||
CT_DECIMAL};
|
||||
|
||||
typedef enum cassandtra_type_enum CASSANDRA_TYPE;
|
||||
|
||||
|
||||
|
||||
/** @brief
|
||||
Class definition for the storage engine
|
||||
*/
|
||||
class ha_cassandra: public handler
|
||||
{
|
||||
friend class Column_name_enumerator_impl;
|
||||
THR_LOCK_DATA lock; ///< MySQL lock
|
||||
CASSANDRA_SHARE *share; ///< Shared lock info
|
||||
|
||||
Cassandra_se_interface *se;
|
||||
|
||||
/* description of static part of the table definition */
|
||||
ColumnDataConverter **field_converters;
|
||||
uint n_field_converters;
|
||||
|
||||
CASSANDRA_TYPE_DEF *default_type_def;
|
||||
/* description of dynamic columns part */
|
||||
CASSANDRA_TYPE_DEF *special_type_field_converters;
|
||||
LEX_STRING *special_type_field_names;
|
||||
uint n_special_type_fields;
|
||||
DYNAMIC_ARRAY dynamic_values, dynamic_names;
|
||||
DYNAMIC_STRING dynamic_rec;
|
||||
|
||||
ColumnDataConverter *rowkey_converter;
|
||||
|
||||
bool setup_field_converters(Field **field, uint n_fields);
|
||||
void free_field_converters();
|
||||
|
||||
int read_cassandra_columns(bool unpack_pk);
|
||||
int check_table_options(struct ha_table_option_struct* options);
|
||||
|
||||
bool doing_insert_batch;
|
||||
ha_rows insert_rows_batched;
|
||||
|
||||
uint dyncol_field;
|
||||
bool dyncol_set;
|
||||
|
||||
/* Used to produce 'wrong column %s at row %lu' warnings */
|
||||
ha_rows insert_lineno;
|
||||
void print_conversion_error(const char *field_name,
|
||||
char *cass_value, int cass_value_len);
|
||||
int connect_and_check_options(TABLE *table_arg);
|
||||
public:
|
||||
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
|
||||
~ha_cassandra()
|
||||
{
|
||||
free_field_converters();
|
||||
delete se;
|
||||
}
|
||||
|
||||
/** @brief
|
||||
The name that will be used for display purposes.
|
||||
*/
|
||||
const char *table_type() const { return "CASSANDRA"; }
|
||||
|
||||
/** @brief
|
||||
The name of the index type that will be used for display.
|
||||
Don't implement this method unless you really have indexes.
|
||||
*/
|
||||
const char *index_type(uint inx) { return "HASH"; }
|
||||
|
||||
/** @brief
|
||||
The file extensions.
|
||||
*/
|
||||
const char **bas_ext() const;
|
||||
|
||||
/** @brief
|
||||
This is a list of flags that indicate what functionality the storage engine
|
||||
implements. The current table flags are documented in handler.h
|
||||
*/
|
||||
ulonglong table_flags() const
|
||||
{
|
||||
return HA_BINLOG_STMT_CAPABLE |
|
||||
HA_REC_NOT_IN_SEQ |
|
||||
HA_NO_TRANSACTIONS |
|
||||
HA_REQUIRE_PRIMARY_KEY |
|
||||
HA_PRIMARY_KEY_IN_READ_INDEX |
|
||||
HA_PRIMARY_KEY_REQUIRED_FOR_POSITION |
|
||||
HA_NO_AUTO_INCREMENT |
|
||||
HA_TABLE_SCAN_ON_INDEX;
|
||||
}
|
||||
|
||||
/** @brief
|
||||
This is a bitmap of flags that indicates how the storage engine
|
||||
implements indexes. The current index flags are documented in
|
||||
handler.h. If you do not implement indexes, just return zero here.
|
||||
|
||||
@details
|
||||
part is the key part to check. First key part is 0.
|
||||
If all_parts is set, MySQL wants to know the flags for the combined
|
||||
index, up to and including 'part'.
|
||||
*/
|
||||
ulong index_flags(uint inx, uint part, bool all_parts) const
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** @brief
|
||||
unireg.cc will call max_supported_record_length(), max_supported_keys(),
|
||||
max_supported_key_parts(), uint max_supported_key_length()
|
||||
to make sure that the storage engine can handle the data it is about to
|
||||
send. Return *real* limits of your storage engine here; MySQL will do
|
||||
min(your_limits, MySQL_limits) automatically.
|
||||
*/
|
||||
uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
|
||||
|
||||
/* Support only one Primary Key, for now */
|
||||
uint max_supported_keys() const { return 1; }
|
||||
uint max_supported_key_parts() const { return 1; }
|
||||
|
||||
/** @brief
|
||||
unireg.cc will call this to make sure that the storage engine can handle
|
||||
the data it is about to send. Return *real* limits of your storage engine
|
||||
here; MySQL will do min(your_limits, MySQL_limits) automatically.
|
||||
|
||||
@details
|
||||
There is no need to implement ..._key_... methods if your engine doesn't
|
||||
support indexes.
|
||||
*/
|
||||
uint max_supported_key_length() const { return 16*1024; /* just to return something*/ }
|
||||
|
||||
int index_init(uint idx, bool sorted);
|
||||
|
||||
int index_read_map(uchar * buf, const uchar * key,
|
||||
key_part_map keypart_map,
|
||||
enum ha_rkey_function find_flag);
|
||||
|
||||
/** @brief
|
||||
Called in test_quick_select to determine if indexes should be used.
|
||||
*/
|
||||
virtual double scan_time() { return (double) (stats.records+stats.deleted) / 20.0+10; }
|
||||
|
||||
/** @brief
|
||||
This method will never be called if you do not implement indexes.
|
||||
*/
|
||||
virtual double read_time(uint, uint, ha_rows rows)
|
||||
{ return (double) rows / 20.0+1; }
|
||||
|
||||
virtual void start_bulk_insert(ha_rows rows, uint flags);
|
||||
virtual int end_bulk_insert();
|
||||
|
||||
virtual int reset();
|
||||
|
||||
|
||||
int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
|
||||
uint n_ranges, uint mode, HANDLER_BUFFER *buf);
|
||||
int multi_range_read_next(range_id_t *range_info);
|
||||
ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
|
||||
void *seq_init_param,
|
||||
uint n_ranges, uint *bufsz,
|
||||
uint *flags, Cost_estimate *cost);
|
||||
ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys,
|
||||
uint key_parts, uint *bufsz,
|
||||
uint *flags, Cost_estimate *cost);
|
||||
int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size);
|
||||
|
||||
private:
|
||||
bool source_exhausted;
|
||||
bool mrr_start_read();
|
||||
int check_field_options(Field **fields);
|
||||
int read_dyncol(uint *count,
|
||||
DYNAMIC_COLUMN_VALUE **vals, LEX_STRING **names,
|
||||
String *valcol);
|
||||
int write_dynamic_row(uint count,
|
||||
DYNAMIC_COLUMN_VALUE *vals,
|
||||
LEX_STRING *names);
|
||||
void static free_dynamic_row(DYNAMIC_COLUMN_VALUE **vals,
|
||||
LEX_STRING **names);
|
||||
CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
|
||||
int cass_name_length);
|
||||
public:
|
||||
int open(const char *name, int mode, uint test_if_locked);
|
||||
int close(void);
|
||||
|
||||
int write_row(uchar *buf);
|
||||
int update_row(const uchar *old_data, uchar *new_data);
|
||||
int delete_row(const uchar *buf);
|
||||
|
||||
/** @brief
|
||||
Unlike index_init(), rnd_init() can be called two consecutive times
|
||||
without rnd_end() in between (it only makes sense if scan=1). In this
|
||||
case, the second call should prepare for the new table scan (e.g if
|
||||
rnd_init() allocates the cursor, the second call should position the
|
||||
cursor to the start of the table; no need to deallocate and allocate
|
||||
it again. This is a required method.
|
||||
*/
|
||||
int rnd_init(bool scan); //required
|
||||
int rnd_end();
|
||||
int rnd_next(uchar *buf); ///< required
|
||||
int rnd_pos(uchar *buf, uchar *pos); ///< required
|
||||
void position(const uchar *record); ///< required
|
||||
int info(uint); ///< required
|
||||
int delete_all_rows(void);
|
||||
ha_rows records_in_range(uint inx, key_range *min_key,
|
||||
key_range *max_key);
|
||||
int create(const char *name, TABLE *form,
|
||||
HA_CREATE_INFO *create_info); ///< required
|
||||
bool check_if_incompatible_data(HA_CREATE_INFO *info,
|
||||
uint table_changes);
|
||||
|
||||
THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
|
||||
enum thr_lock_type lock_type); ///< required
|
||||
};
|
||||
Loading…
Add table
Add a link
Reference in a new issue