Upgraded to latest handlersocket code. This fixed LP:766870 "Assertion `next_insert_id == 0' failed with handlersocket"

sql/handler.cc:
  Added DBUG_ code
This commit is contained in:
Michael Widenius 2011-06-07 14:19:49 +03:00
commit 8d7f810894
46 changed files with 18522 additions and 230 deletions

View file

@ -23,6 +23,7 @@
#define DBG_FLD(x)
#define DBG_FILTER(x)
#define DBG_REFCNT(x)
#define DBG_KEYLEN(x)
#define DBG_DELETED
/* status variables */
@ -140,10 +141,8 @@ struct dbcontext : public dbcontext_i, private noncopyable {
virtual void close_tables_if();
virtual void table_addref(size_t tbl_id);
virtual void table_release(size_t tbl_id);
virtual void cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
const char *tbl, const char *idx, const char *retflds,
const char *filflds);
virtual void cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args);
virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args);
virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args);
virtual void set_statistics(size_t num_conns, size_t num_active);
private:
int set_thread_message(const char *fmt, ...)
@ -184,11 +183,6 @@ struct dbcontext : public dbcontext_i, private noncopyable {
std::vector<char> info_message_buf;
table_vec_type table_vec;
table_map_type table_map;
#if MYSQL_VERSION_ID >= 50505
MDL_request *mdl_request;
#else
void *mdl_request;
#endif
};
database::database(const config& c)
@ -227,7 +221,7 @@ database_i::create(const config& conf)
dbcontext::dbcontext(volatile database *d, bool for_write)
: dbref(d), for_write_flag(for_write), thd(0), lock(0), lock_failed(false),
user_level_lock_timeout(0), user_level_lock_locked(false),
commit_error(false), mdl_request(0)
commit_error(false)
{
info_message_buf.resize(8192);
user_level_lock_timeout = d->get_conf().get_int("wrlock_timeout", 12);
@ -274,6 +268,8 @@ wait_server_to_start(THD *thd, volatile int& shutdown_flag)
}; // namespace
#define DENA_THR_OFFSETOF(fld) ((char *)(&thd->fld) - (char *)thd)
void
dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
{
@ -282,9 +278,17 @@ dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
my_thread_init();
thd = new THD;
thd->thread_stack = (char *)stack_bottom;
DBG_THR(const size_t of = (char *)(&thd->thread_stack) - (char *)thd);
DBG_THR(fprintf(stderr, "thread_stack = %p sz=%zu of=%zu\n",
thd->thread_stack, sizeof(THD), of));
DBG_THR(fprintf(stderr,
"thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu "
"O: %zu %zu %zu %zu %zu %zu %zu\n",
thd->thread_stack, sizeof(THD), sizeof(LOCK_thread_count),
DENA_THR_OFFSETOF(mdl_context),
DENA_THR_OFFSETOF(net),
DENA_THR_OFFSETOF(LOCK_thd_data),
DENA_THR_OFFSETOF(mysys_var),
DENA_THR_OFFSETOF(stmt_arena),
DENA_THR_OFFSETOF(limit_found_rows),
DENA_THR_OFFSETOF(locked_tables_list)));
thd->store_globals();
thd->system_thread = static_cast<enum_thread_type>(1<<30UL);
memset(&thd->net, 0, sizeof(thd->net));
@ -317,15 +321,6 @@ dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
set_thread_message("hs:listening");
DBG_THR(fprintf(stderr, "HNDSOCK x1 %p\n", thd));
#if MYSQL_VERSION_ID >= 50508
mdl_request = new(thd->mem_root) MDL_request;
mdl_request->init(MDL_key::TABLE, "", "",
for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, MDL_STATEMENT);
#elif MYSQL_VERSION_ID >= 50505
mdl_request = MDL_request::create(MDL_key::TABLE, "", "",
for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, thd->mem_root);
#endif
lex_start(thd);
user_lock.reset(new expr_user_lock(thd, user_level_lock_timeout));
@ -389,7 +384,7 @@ dbcontext::lock_tables_if()
}
if (lock == 0) {
const size_t num_max = table_vec.size();
TABLE *tables[num_max ? num_max : 1]; /* GNU */
TABLE **const tables = DENA_ALLOCA_ALLOCATE(TABLE *, num_max + 1);
size_t num_open = 0;
for (size_t i = 0; i < num_max; ++i) {
if (table_vec[i].refcount > 0) {
@ -420,6 +415,7 @@ dbcontext::lock_tables_if()
thd->current_stmt_binlog_row_based = 1;
#endif
}
DENA_ALLOCA_FREE(tables);
}
DBG_LOCK(fprintf(stderr, "HNDSOCK tblnum=%d\n", (int)tblnum));
}
@ -428,14 +424,17 @@ void
dbcontext::unlock_tables_if()
{
if (lock != 0) {
DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables\n"));
DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables %p %p\n",
thd, thd->lock));
if (for_write_flag) {
for (size_t i = 0; i < table_vec.size(); ++i) {
if (table_vec[i].modified) {
query_cache_invalidate3(thd, table_vec[i].table, 0);
/* invalidate immediately */
query_cache_invalidate3(thd, table_vec[i].table, 1);
table_vec[i].table->file->ha_release_auto_increment();
}
}
}
{
bool suc = true;
#if MYSQL_VERSION_ID >= 50505
suc = (trans_commit_stmt(thd) == 0);
@ -476,9 +475,12 @@ void
dbcontext::close_tables_if()
{
unlock_tables_if();
DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
close_thread_tables(thd);
#if MYSQL_VERSION_ID >= 50505
thd->mdl_context.release_transactional_locks();
#endif
if (!table_vec.empty()) {
DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
close_thread_tables(thd);
statistic_increment(close_tables_count, &LOCK_status);
table_vec.clear();
table_map.clear();
@ -544,7 +546,6 @@ dbcontext::dump_record(dbcallback_i& cb, TABLE *const table,
Field *const fld = table->field[fn];
if (fld->is_null()) {
/* null */
cb.dbcb_resp_entry(0, 0);
fprintf(stderr, "NULL");
} else {
fld->val_str(&rwpstr, &rwpstr);
@ -621,9 +622,6 @@ dbcontext::modify_record(dbcallback_i& cb, TABLE *const table,
if ((pval < 0 && nval > 0) || (pval > 0 && nval < 0)) {
break; /* don't modify */
}
if ((pval < 0) != (nval < 0)) {
nval = 0; /* crip */
}
}
fld->store(nval, false);
}
@ -649,7 +647,7 @@ dbcontext::cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
}
lock_tables_if();
if (lock == 0) {
return cb.dbcb_resp_short(2, "lock_tables");
return cb.dbcb_resp_short(1, "lock_tables");
}
if (pst.get_table_id() >= table_vec.size()) {
return cb.dbcb_resp_short(2, "tblnum");
@ -659,20 +657,30 @@ dbcontext::cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
uchar *const buf = table->record[0];
empty_record(table);
memset(buf, 0, table->s->null_bytes); /* clear null flags */
Field **fld = table->field;
size_t i = 0;
for (; *fld && i < fvalslen; ++fld, ++i) {
const prep_stmt::fields_type& rf = pst.get_ret_fields();
const size_t n = rf.size();
for (size_t i = 0; i < n; ++i) {
uint32_t fn = rf[i];
Field *const fld = table->field[fn];
if (fvals[i].begin() == 0) {
(*fld)->set_null();
fld->set_null();
} else {
(*fld)->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
fld->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
}
}
table->next_number_field = table->found_next_number_field;
/* FIXME: test */
const int r = hnd->ha_write_row(buf);
const ulonglong insert_id = table->file->insert_id_for_cur_row;
table->next_number_field = 0;
table_vec[pst.get_table_id()].modified = true;
return cb.dbcb_resp_short(r != 0 ? 1 : 0, "");
if (r == 0 && table->found_next_number_field != 0) {
return cb.dbcb_resp_short_num64(0, insert_id);
}
if (r != 0) {
return cb.dbcb_resp_short_num(1, r);
}
return cb.dbcb_resp_short(0, "");
}
void
@ -685,6 +693,35 @@ dbcontext::cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
return cb.dbcb_resp_short(2, "notimpl");
}
static size_t
prepare_keybuf(const cmd_exec_args& args, uchar *key_buf, TABLE *table,
KEY& kinfo, size_t invalues_index)
{
size_t kplen_sum = 0;
DBG_KEY(fprintf(stderr, "SLOW\n"));
for (size_t i = 0; i < args.kvalslen; ++i) {
const KEY_PART_INFO & kpt = kinfo.key_part[i];
string_ref kval = args.kvals[i];
if (args.invalues_keypart >= 0 &&
static_cast<size_t>(args.invalues_keypart) == i) {
kval = args.invalues[invalues_index];
}
if (kval.begin() == 0) {
kpt.field->set_null();
} else {
kpt.field->set_notnull();
}
kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
kplen_sum += kpt.store_length;
DBG_KEYLEN(fprintf(stderr, "l=%u sl=%zu\n", kpt.length,
kpt.store_length));
}
key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
DBG_KEYLEN(fprintf(stderr, "sum=%zu flen=%u\n", kplen_sum,
kinfo.key_length));
return kplen_sum;
}
void
dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
ha_rkey_function find_flag, const cmd_exec_args& args)
@ -714,7 +751,7 @@ dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
}
lock_tables_if();
if (lock == 0) {
return cb.dbcb_resp_short(2, "lock_tables");
return cb.dbcb_resp_short(1, "lock_tables");
}
if (pst.get_table_id() >= table_vec.size()) {
return cb.dbcb_resp_short(2, "tblnum");
@ -728,30 +765,15 @@ dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
if (args.kvalslen > kinfo.key_parts) {
return cb.dbcb_resp_short(2, "kpnum");
}
uchar key_buf[kinfo.key_length]; /* GNU */
size_t kplen_sum = 0;
{
DBG_KEY(fprintf(stderr, "SLOW\n"));
for (size_t i = 0; i < args.kvalslen; ++i) {
const KEY_PART_INFO & kpt = kinfo.key_part[i];
const string_ref& kval = args.kvals[i];
if (kval.begin() == 0) {
kpt.field->set_null();
} else {
kpt.field->set_notnull();
}
kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
kplen_sum += kpt.length;
}
key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
}
uchar *const key_buf = DENA_ALLOCA_ALLOCATE(uchar, kinfo.key_length);
size_t invalues_idx = 0;
size_t kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
/* filters */
uchar *filter_buf = 0;
if (args.filters != 0) {
const size_t filter_buf_len = calc_filter_buf_size(table, pst,
args.filters);
filter_buf = reinterpret_cast<uchar *>(alloca(filter_buf_len));
/* FIXME: TEST */
filter_buf = DENA_ALLOCA_ALLOCATE(uchar, filter_buf_len);
if (!fill_filter_buf(table, pst, args.filters, filter_buf,
filter_buf_len)) {
return cb.dbcb_resp_short(2, "filterblob");
@ -765,9 +787,6 @@ dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
}
hnd->ha_index_or_rnd_end();
hnd->ha_index_init(pst.get_idxnum(), 1);
#if 0
statistic_increment(index_exec_count, &LOCK_status);
#endif
if (need_resp_record) {
cb.dbcb_resp_begin(pst.get_ret_fields().size());
}
@ -775,8 +794,17 @@ dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
uint32_t skip = args.skip;
size_t modified_count = 0;
int r = 0;
for (uint32_t i = 0; i < limit + skip; ++i) {
if (i == 0) {
bool is_first = true;
for (uint32_t cnt = 0; cnt < limit + skip;) {
if (is_first) {
is_first = false;
const key_part_map kpm = (1U << args.kvalslen) - 1;
r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag);
} else if (args.invalues_keypart >= 0) {
if (++invalues_idx >= args.invalueslen) {
break;
}
kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
const key_part_map kpm = (1U << args.kvalslen) - 1;
r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag);
} else {
@ -814,12 +842,17 @@ dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
} else if (skip > 0) {
--skip;
} else {
/* hit */
if (need_resp_record) {
resp_record(cb, table, pst);
}
if (mod_op != 0) {
r = modify_record(cb, table, pst, args, mod_op, modified_count);
}
++cnt;
}
if (args.invalues_keypart >= 0 && r == HA_ERR_KEY_NOT_FOUND) {
continue;
}
if (r != 0 && r != HA_ERR_RECORD_DELETED) {
break;
@ -833,7 +866,7 @@ dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
/* revert dbcb_resp_begin() and dbcb_resp_entry() */
cb.dbcb_resp_cancel();
}
cb.dbcb_resp_short_num(2, r);
cb.dbcb_resp_short_num(1, r);
} else {
/* succeeded */
if (need_resp_record) {
@ -842,6 +875,8 @@ dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
cb.dbcb_resp_short_num(0, modified_count);
}
}
DENA_ALLOCA_FREE(filter_buf);
DENA_ALLOCA_FREE(key_buf);
}
size_t
@ -856,6 +891,9 @@ dbcontext::calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
filter_buf_len += table->field[fn]->pack_length();
}
++filter_buf_len;
/* Field_medium::cmp() calls uint3korr(), which may read 4 bytes.
Allocate 1 more byte for safety. */
return filter_buf_len;
}
@ -954,11 +992,11 @@ dbcontext::check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
}
void
dbcontext::cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
const char *tbl, const char *idx, const char *retflds, const char *filflds)
dbcontext::cmd_open(dbcallback_i& cb, const cmd_open_args& arg)
{
unlock_tables_if();
const table_name_type k = std::make_pair(std::string(dbn), std::string(tbl));
const table_name_type k = std::make_pair(std::string(arg.dbn),
std::string(arg.tbl));
const table_map_type::const_iterator iter = table_map.find(k);
uint32_t tblnum = 0;
if (iter != table_map.end()) {
@ -971,23 +1009,24 @@ dbcontext::cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
bool refresh = true;
const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ;
#if MYSQL_VERSION_ID >= 50505
tables.init_one_table(dbn, strlen(dbn), tbl, strlen(tbl), tbl,
lock_type);
tables.mdl_request = mdl_request;
Open_table_context ot_act(thd, MYSQL_OPEN_REOPEN);
tables.init_one_table(arg.dbn, strlen(arg.dbn), arg.tbl, strlen(arg.tbl),
arg.tbl, lock_type);
tables.mdl_request.init(MDL_key::TABLE, arg.dbn, arg.tbl,
for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, MDL_TRANSACTION);
Open_table_context ot_act(thd, 0);
if (!open_table(thd, &tables, thd->mem_root, &ot_act)) {
table = tables.table;
}
#else
tables.init_one_table(dbn, tbl, lock_type);
tables.init_one_table(arg.dbn, arg.tbl, lock_type);
table = open_table(thd, &tables, thd->mem_root, &refresh,
OPEN_VIEW_NO_PARSE);
#endif
if (table == 0) {
DENA_VERBOSE(10, fprintf(stderr,
DENA_VERBOSE(20, fprintf(stderr,
"HNDSOCK failed to open %p [%s] [%s] [%d]\n",
thd, dbn, tbl, static_cast<int>(refresh)));
return cb.dbcb_resp_short(2, "open_table");
thd, arg.dbn, arg.tbl, static_cast<int>(refresh)));
return cb.dbcb_resp_short(1, "open_table");
}
statistic_increment(open_tables_count, &LOCK_status);
table->reginfo.lock_type = lock_type;
@ -999,15 +1038,16 @@ dbcontext::cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
table_map[k] = tblnum;
}
size_t idxnum = static_cast<size_t>(-1);
if (idx[0] >= '0' && idx[0] <= '9') {
if (arg.idx[0] >= '0' && arg.idx[0] <= '9') {
/* numeric */
TABLE *const table = table_vec[tblnum].table;
idxnum = atoi(idx);
idxnum = atoi(arg.idx);
if (idxnum >= table->s->keys) {
return cb.dbcb_resp_short(2, "idxnum");
}
} else {
const char *const idx_name_to_open = idx[0] == '\0' ? "PRIMARY" : idx;
const char *const idx_name_to_open =
arg.idx[0] == '\0' ? "PRIMARY" : arg.idx;
TABLE *const table = table_vec[tblnum].table;
for (uint i = 0; i < table->s->keys; ++i) {
KEY& kinfo = table->key_info[i];
@ -1022,14 +1062,14 @@ dbcontext::cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
}
prep_stmt::fields_type rf;
prep_stmt::fields_type ff;
if (!parse_fields(table_vec[tblnum].table, retflds, rf)) {
if (!parse_fields(table_vec[tblnum].table, arg.retflds, rf)) {
return cb.dbcb_resp_short(2, "fld");
}
if (!parse_fields(table_vec[tblnum].table, filflds, ff)) {
if (!parse_fields(table_vec[tblnum].table, arg.filflds, ff)) {
return cb.dbcb_resp_short(2, "fld");
}
prep_stmt p(this, tblnum, idxnum, rf, ff);
cb.dbcb_set_prep_stmt(pst_id, p);
cb.dbcb_set_prep_stmt(arg.pst_id, p);
return cb.dbcb_resp_short(0, "");
}
@ -1070,7 +1110,7 @@ enum db_write_op {
};
void
dbcontext::cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args)
dbcontext::cmd_exec(dbcallback_i& cb, const cmd_exec_args& args)
{
const prep_stmt& p = *args.pst;
if (p.get_table_id() == static_cast<size_t>(-1)) {
@ -1096,7 +1136,7 @@ dbcontext::cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args)
wrop = db_write_op_sql;
break;
default:
return cb.dbcb_resp_short(1, "op");
return cb.dbcb_resp_short(2, "op");
}
} else if (args.op.size() == 2 && args.op.begin()[1] == '=') {
switch (args.op.begin()[0]) {
@ -1107,10 +1147,10 @@ dbcontext::cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args)
find_flag = HA_READ_KEY_OR_PREV;
break;
default:
return cb.dbcb_resp_short(1, "op");
return cb.dbcb_resp_short(2, "op");
}
} else {
return cb.dbcb_resp_short(1, "op");
return cb.dbcb_resp_short(2, "op");
}
if (args.kvalslen <= 0) {
return cb.dbcb_resp_short(2, "klen");
@ -1128,7 +1168,6 @@ dbcontext::cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args)
void
dbcontext::set_statistics(size_t num_conns, size_t num_active)
{
thd_proc_info(thd, &info_message_buf[0]);
if (for_write_flag) {
set_thread_message("handlersocket: mode=wr, %zu conns, %zu active",
num_conns, num_active);
@ -1136,6 +1175,12 @@ dbcontext::set_statistics(size_t num_conns, size_t num_active)
set_thread_message("handlersocket: mode=rd, %zu conns, %zu active",
num_conns, num_active);
}
/*
Don't set message buf if it's already in use. This saves slow call to
thd_proc_info() (if profiling is enabled)
*/
if (thd->proc_info != &info_message_buf[0])
thd_proc_info(thd, &info_message_buf[0]);
}
};

View file

@ -62,6 +62,7 @@ struct dbcallback_i {
virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const = 0;
virtual void dbcb_resp_short(uint32_t code, const char *msg) = 0;
virtual void dbcb_resp_short_num(uint32_t code, uint32_t value) = 0;
virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value) = 0;
virtual void dbcb_resp_begin(size_t num_flds) = 0;
virtual void dbcb_resp_entry(const char *fld, size_t fldlen) = 0;
virtual void dbcb_resp_end() = 0;
@ -81,6 +82,17 @@ struct record_filter {
record_filter() : filter_type(record_filter_type_skip), ff_offset(0) { }
};
struct cmd_open_args {
size_t pst_id;
const char *dbn;
const char *tbl;
const char *idx;
const char *retflds;
const char *filflds;
cmd_open_args() : pst_id(0), dbn(0), tbl(0), idx(0), retflds(0),
filflds(0) { }
};
struct cmd_exec_args {
const prep_stmt *pst;
string_ref op;
@ -91,8 +103,11 @@ struct cmd_exec_args {
string_ref mod_op;
const string_ref *uvals; /* size must be pst->retfieelds.size() */
const record_filter *filters;
int invalues_keypart;
const string_ref *invalues;
size_t invalueslen;
cmd_exec_args() : pst(0), kvals(0), kvalslen(0), limit(0), skip(0),
uvals(0), filters(0) { }
uvals(0), filters(0), invalues_keypart(-1), invalues(0), invalueslen(0) { }
};
struct dbcontext_i {
@ -108,11 +123,8 @@ struct dbcontext_i {
virtual void close_tables_if() = 0;
virtual void table_addref(size_t tbl_id) = 0; /* TODO: hide */
virtual void table_release(size_t tbl_id) = 0; /* TODO: hide */
virtual void cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
const char *tbl, const char *idx, const char *retflds,
const char *filflds) = 0;
virtual void cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args)
= 0;
virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args) = 0;
virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args) = 0;
virtual void set_statistics(size_t num_conns, size_t num_active) = 0;
};

View file

@ -27,6 +27,7 @@
#define DBG_FD(x)
#define DBG_TR(x)
#define DBG_EP(x)
#define DBG_MULTI(x)
/* TODO */
#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
@ -40,13 +41,15 @@ struct dbconnstate {
string_buffer writebuf;
std::vector<prep_stmt> prep_stmts;
size_t resp_begin_pos;
size_t find_nl_pos;
void reset() {
readbuf.clear();
writebuf.clear();
prep_stmts.clear();
resp_begin_pos = 0;
find_nl_pos = 0;
}
dbconnstate() : resp_begin_pos(0) { }
dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { }
};
struct hstcpsvr_conn;
@ -78,6 +81,7 @@ struct hstcpsvr_conn : public dbcallback_i {
virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const;
virtual void dbcb_resp_short(uint32_t code, const char *msg);
virtual void dbcb_resp_short_num(uint32_t code, uint32_t value);
virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value);
virtual void dbcb_resp_begin(size_t num_flds);
virtual void dbcb_resp_entry(const char *fld, size_t fldlen);
virtual void dbcb_resp_end();
@ -205,6 +209,15 @@ hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value)
cstate.writebuf.append_literal("\n");
}
void
hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value)
{
write_ui32(cstate.writebuf, code);
cstate.writebuf.append_literal("\t1\t");
write_ui64(cstate.writebuf, value);
cstate.writebuf.append_literal("\n");
}
void
hstcpsvr_conn::dbcb_resp_begin(size_t num_flds)
{
@ -256,6 +269,7 @@ struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable {
#endif
bool accept_enabled;
int accept_balance;
std::vector<string_ref> invalues_work;
std::vector<record_filter> filters_work;
private:
int run_one_nb();
@ -385,6 +399,7 @@ hstcpsvr_worker::run_one_nb()
vshared.shutdown = 1;
} else if (ch == '/') {
conn.cstate.readbuf.clear();
conn.cstate.find_nl_pos = 0;
conn.cstate.writebuf.clear();
conn.read_finished = true;
conn.write_finished = true;
@ -533,6 +548,7 @@ hstcpsvr_worker::run_one_ep()
vshared.shutdown = 1;
} else if (ch == '/') {
conn->cstate.readbuf.clear();
conn->cstate.find_nl_pos = 0;
conn->cstate.writebuf.clear();
conn->read_finished = true;
conn->write_finished = true;
@ -641,19 +657,24 @@ hstcpsvr_worker::run_one_ep()
void
hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn)
{
DBG_MULTI(int cnt = 0);
dbconnstate& cstate = conn.cstate;
char *buf_end = cstate.readbuf.end();
char *line_begin = cstate.readbuf.begin();
char *find_pos = line_begin + cstate.find_nl_pos;
while (true) {
char *const nl = memchr_char(line_begin, '\n', buf_end - line_begin);
char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos);
if (nl == 0) {
break;
}
char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl;
DBG_MULTI(cnt++);
execute_line(line_begin, lf, conn);
line_begin = nl + 1;
find_pos = line_begin = nl + 1;
}
cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin());
cstate.find_nl_pos = cstate.readbuf.size();
DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt));
}
void
@ -721,8 +742,14 @@ hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn)
idxname_end[0] = 0;
retflds_end[0] = 0;
filflds_end[0] = 0;
return dbctx->cmd_open_index(conn, pst_id, dbname_begin, tblname_begin,
idxname_begin, retflds_begin, filflds_begin);
cmd_open_args args;
args.pst_id = pst_id;
args.dbn = dbname_begin;
args.tbl = tblname_begin;
args.idx = idxname_begin;
args.retflds = retflds_begin;
args.filflds = filflds_begin;
return dbctx->cmd_open(conn, args);
}
void
@ -741,7 +768,8 @@ hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
args.op = string_ref(op_begin, op_end);
skip_one(start, finish);
const uint32_t fldnum = read_ui32(start, finish);
string_ref flds[fldnum]; /* GNU */
string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum);
auto_alloca_free<string_ref> flds_autofree(flds);
args.kvals = flds;
args.kvalslen = fldnum;
for (size_t i = 0; i < fldnum; ++i) {
@ -765,10 +793,39 @@ hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
args.skip = read_ui32(start, finish);
if (start == finish) {
/* simple query */
return dbctx->cmd_exec_on_index(conn, args);
return dbctx->cmd_exec(conn, args);
}
/* has filters or modops */
/* has more options */
skip_one(start, finish);
/* in-clause */
if (start[0] == '@') {
read_token(start, finish); /* '@' */
skip_one(start, finish);
args.invalues_keypart = read_ui32(start, finish);
skip_one(start, finish);
args.invalueslen = read_ui32(start, finish);
if (args.invalueslen <= 0) {
return conn.dbcb_resp_short(2, "invalueslen");
}
if (invalues_work.size() < args.invalueslen) {
invalues_work.resize(args.invalueslen);
}
args.invalues = &invalues_work[0];
for (uint32_t i = 0; i < args.invalueslen; ++i) {
skip_one(start, finish);
char *const invalue_begin = start;
read_token(start, finish);
char *const invalue_end = start;
char *wp = invalue_begin;
unescape_string(wp, invalue_begin, invalue_end);
invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin);
}
skip_one(start, finish);
}
if (start == finish) {
/* no more options */
return dbctx->cmd_exec(conn, args);
}
/* filters */
size_t filters_count = 0;
while (start != finish && (start[0] == 'W' || start[0] == 'F')) {
@ -823,7 +880,7 @@ hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
}
if (start == finish) {
/* no modops */
return dbctx->cmd_exec_on_index(conn, args);
return dbctx->cmd_exec(conn, args);
}
/* has modops */
char *const mod_op_begin = start;
@ -831,7 +888,8 @@ hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
char *const mod_op_end = start;
args.mod_op = string_ref(mod_op_begin, mod_op_end);
const size_t num_uvals = args.pst->get_ret_fields().size();
string_ref uflds[num_uvals]; /* GNU */
string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals);
auto_alloca_free<string_ref> uflds_autofree(uflds);
for (size_t i = 0; i < num_uvals; ++i) {
skip_one(start, finish);
char *const f_begin = start;
@ -848,7 +906,7 @@ hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
}
}
args.uvals = uflds;
return dbctx->cmd_exec_on_index(conn, args);
return dbctx->cmd_exec(conn, args);
}
void
@ -871,7 +929,7 @@ hstcpsvr_worker::do_authorization(char *start, char *finish,
char *wp = key_begin;
unescape_string(wp, key_begin, key_end);
if (authtype_len != 1 || authtype_begin[0] != '1') {
return conn.dbcb_resp_short(2, "authtype");
return conn.dbcb_resp_short(3, "authtype");
}
if (cshared.plain_secret.size() == key_len &&
memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) {

View file

@ -17,6 +17,7 @@
#define MYSQL_SERVER 1
#include <my_config.h>
#include <mysql_version.h>
#if MYSQL_VERSION_ID >= 50505