mariadb/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp
Michael Widenius 4aaa38d26e Enusure that my_global.h is included first
- Added sql/mariadb.h file that should be included first by files in sql
  directory, if sql_plugin.h is not used (sql_plugin.h adds SHOW variables
  that must be done before my_global.h is included)
- Removed a lot of include my_global.h from include files
- Removed include's of some files that my_global.h automatically includes
- Removed duplicated include's of my_sys.h
- Replaced include my_config.h with my_global.h
2017-08-24 01:05:44 +02:00

957 lines
26 KiB
C++

// vim:sw=2:ai
/*
* Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
* See COPYRIGHT.txt for details.
*/
#include <my_global.h>
#include <netinet/in.h>
#include <errno.h>
#include <poll.h>
#include <unistd.h>
#include <stdexcept>
#include <signal.h>
#include <list>
#if __linux__
#include <sys/epoll.h>
#endif
#ifdef HAVE_ALLOCA_H
#include <alloca.h>
#endif
#include "hstcpsvr_worker.hpp"
#include "string_buffer.hpp"
#include "auto_ptrcontainer.hpp"
#include "string_util.hpp"
#include "escape.hpp"
#define DBG_FD(x)
#define DBG_TR(x)
#define DBG_EP(x)
#define DBG_MULTI(x)
/* TODO */
#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
#define MSG_NOSIGNAL 0
#endif
namespace dena {
struct dbconnstate {
string_buffer readbuf;
string_buffer writebuf;
std::vector<prep_stmt> prep_stmts;
size_t resp_begin_pos;
size_t find_nl_pos;
void reset() {
readbuf.clear();
writebuf.clear();
prep_stmts.clear();
resp_begin_pos = 0;
find_nl_pos = 0;
}
dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { }
};
struct hstcpsvr_conn;
typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type;
struct hstcpsvr_conn : public dbcallback_i {
public:
auto_file fd;
sockaddr_storage addr;
size_socket addr_len;
dbconnstate cstate;
std::string err;
size_t readsize;
bool nonblocking;
bool read_finished;
bool write_finished;
time_t nb_last_io;
hstcpsvr_conns_type::iterator conns_iter;
bool authorized;
public:
bool closed() const;
bool ok_to_close() const;
void reset();
int accept(const hstcpsvr_shared_c& cshared);
bool write_more(bool *more_r = 0);
bool read_more(bool *more_r = 0);
public:
virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v);
virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const;
virtual void dbcb_resp_short(uint32_t code, const char *msg);
virtual void dbcb_resp_short_num(uint32_t code, uint32_t value);
virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value);
virtual void dbcb_resp_begin(size_t num_flds);
virtual void dbcb_resp_entry(const char *fld, size_t fldlen);
virtual void dbcb_resp_end();
virtual void dbcb_resp_cancel();
public:
hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096),
nonblocking(false), read_finished(false), write_finished(false),
nb_last_io(0), authorized(false) { }
};
bool
hstcpsvr_conn::closed() const
{
return fd.get() < 0;
}
bool
hstcpsvr_conn::ok_to_close() const
{
return write_finished || (read_finished && cstate.writebuf.size() == 0);
}
void
hstcpsvr_conn::reset()
{
addr = sockaddr_storage();
addr_len = sizeof(addr);
cstate.reset();
fd.reset();
read_finished = false;
write_finished = false;
}
int
hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared)
{
reset();
return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr,
addr_len, err);
}
bool
hstcpsvr_conn::write_more(bool *more_r)
{
if (write_finished || cstate.writebuf.size() == 0) {
return false;
}
const size_t wlen = cstate.writebuf.size();
ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL);
if (len <= 0) {
if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
cstate.writebuf.clear();
write_finished = true;
}
return false;
}
cstate.writebuf.erase_front(len);
/* FIXME: reallocate memory if too large */
if (more_r) {
*more_r = (static_cast<size_t>(len) == wlen);
}
return true;
}
bool
hstcpsvr_conn::read_more(bool *more_r)
{
if (read_finished) {
return false;
}
const size_t block_size = readsize > 4096 ? readsize : 4096;
char *wp = cstate.readbuf.make_space(block_size);
const ssize_t len = read(fd.get(), wp, block_size);
if (len <= 0) {
if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
read_finished = true;
}
return false;
}
cstate.readbuf.space_wrote(len);
if (more_r) {
*more_r = (static_cast<size_t>(len) == block_size);
}
return true;
}
void
hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v)
{
if (cstate.prep_stmts.size() <= pst_id) {
cstate.prep_stmts.resize(pst_id + 1);
}
cstate.prep_stmts[pst_id] = v;
}
const prep_stmt *
hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const
{
if (cstate.prep_stmts.size() <= pst_id) {
return 0;
}
return &cstate.prep_stmts[pst_id];
}
void
hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg)
{
write_ui32(cstate.writebuf, code);
const size_t msglen = strlen(msg);
if (msglen != 0) {
cstate.writebuf.append_literal("\t1\t");
cstate.writebuf.append(msg, msg + msglen);
} else {
cstate.writebuf.append_literal("\t1");
}
cstate.writebuf.append_literal("\n");
}
void
hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value)
{
write_ui32(cstate.writebuf, code);
cstate.writebuf.append_literal("\t1\t");
write_ui32(cstate.writebuf, value);
cstate.writebuf.append_literal("\n");
}
void
hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value)
{
write_ui32(cstate.writebuf, code);
cstate.writebuf.append_literal("\t1\t");
write_ui64(cstate.writebuf, value);
cstate.writebuf.append_literal("\n");
}
void
hstcpsvr_conn::dbcb_resp_begin(size_t num_flds)
{
cstate.resp_begin_pos = cstate.writebuf.size();
cstate.writebuf.append_literal("0\t");
write_ui32(cstate.writebuf, num_flds);
}
void
hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen)
{
if (fld != 0) {
cstate.writebuf.append_literal("\t");
escape_string(cstate.writebuf, fld, fld + fldlen);
} else {
static const char t[] = "\t\0";
cstate.writebuf.append(t, t + 2);
}
}
void
hstcpsvr_conn::dbcb_resp_end()
{
cstate.writebuf.append_literal("\n");
cstate.resp_begin_pos = 0;
}
void
hstcpsvr_conn::dbcb_resp_cancel()
{
cstate.writebuf.resize(cstate.resp_begin_pos);
cstate.resp_begin_pos = 0;
}
struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable {
hstcpsvr_worker(const hstcpsvr_worker_arg& arg);
virtual void run();
private:
const hstcpsvr_shared_c& cshared;
volatile hstcpsvr_shared_v& vshared;
long worker_id;
dbcontext_ptr dbctx;
hstcpsvr_conns_type conns; /* conns refs dbctx */
time_t last_check_time;
std::vector<pollfd> pfds;
#ifdef __linux__
std::vector<epoll_event> events_vec;
auto_file epoll_fd;
#endif
bool accept_enabled;
int accept_balance;
std::vector<string_ref> invalues_work;
std::vector<record_filter> filters_work;
private:
int run_one_nb();
int run_one_ep();
void execute_lines(hstcpsvr_conn& conn);
void execute_line(char *start, char *finish, hstcpsvr_conn& conn);
void do_open_index(char *start, char *finish, hstcpsvr_conn& conn);
void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
char *finish, hstcpsvr_conn& conn);
void do_authorization(char *start, char *finish, hstcpsvr_conn& conn);
};
hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg)
: cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id),
dbctx(cshared.dbptr->create_context(cshared.for_write_flag)),
last_check_time(time(0)), accept_enabled(true), accept_balance(0)
{
#ifdef __linux__
if (cshared.sockargs.use_epoll) {
epoll_fd.reset(epoll_create(10));
if (epoll_fd.get() < 0) {
fatal_abort("epoll_create");
}
epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
ev.data.ptr = 0;
if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
!= 0) {
fatal_abort("epoll_ctl EPOLL_CTL_ADD");
}
events_vec.resize(10240);
}
#endif
accept_balance = cshared.conf.get_int("accept_balance", 0);
}
namespace {
struct thr_init {
thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) {
dbctx->init_thread(this, shutdown_flag);
}
~thr_init() {
dbctx->term_thread();
}
const dbcontext_ptr& dbctx;
};
}; // namespace
void
hstcpsvr_worker::run()
{
thr_init initobj(dbctx, vshared.shutdown);
#ifdef __linux__
if (cshared.sockargs.use_epoll) {
while (!vshared.shutdown && dbctx->check_alive()) {
run_one_ep();
}
} else if (cshared.sockargs.nonblocking) {
while (!vshared.shutdown && dbctx->check_alive()) {
run_one_nb();
}
} else {
/* UNUSED */
fatal_abort("run_one");
}
#else
while (!vshared.shutdown && dbctx->check_alive()) {
run_one_nb();
}
#endif
}
int
hstcpsvr_worker::run_one_nb()
{
size_t nfds = 0;
/* CLIENT SOCKETS */
for (hstcpsvr_conns_type::const_iterator i = conns.begin();
i != conns.end(); ++i) {
if (pfds.size() <= nfds) {
pfds.resize(nfds + 1);
}
pollfd& pfd = pfds[nfds++];
pfd.fd = (*i)->fd.get();
short ev = 0;
if ((*i)->cstate.writebuf.size() != 0) {
ev = POLLOUT;
} else {
ev = POLLIN;
}
pfd.events = pfd.revents = ev;
}
/* LISTENER */
{
const size_t cpt = cshared.nb_conn_per_thread;
const short ev = (cpt > nfds) ? POLLIN : 0;
if (pfds.size() <= nfds) {
pfds.resize(nfds + 1);
}
pollfd& pfd = pfds[nfds++];
pfd.fd = cshared.listen_fd.get();
pfd.events = pfd.revents = ev;
}
/* POLL */
const int npollev = poll(&pfds[0], nfds, 1 * 1000);
dbctx->set_statistics(conns.size(), npollev);
const time_t now = time(0);
size_t j = 0;
const short mask_in = ~POLLOUT;
const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
/* READ */
for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
++i, ++j) {
pollfd& pfd = pfds[j];
if ((pfd.revents & mask_in) == 0) {
continue;
}
hstcpsvr_conn& conn = **i;
if (conn.read_more()) {
if (conn.cstate.readbuf.size() > 0) {
const char ch = conn.cstate.readbuf.begin()[0];
if (ch == 'Q') {
vshared.shutdown = 1;
} else if (ch == '/') {
conn.cstate.readbuf.clear();
conn.cstate.find_nl_pos = 0;
conn.cstate.writebuf.clear();
conn.read_finished = true;
conn.write_finished = true;
}
}
conn.nb_last_io = now;
}
}
/* EXECUTE */
j = 0;
for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
++i, ++j) {
pollfd& pfd = pfds[j];
if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) {
continue;
}
execute_lines(**i);
}
/* COMMIT */
dbctx->unlock_tables_if();
const bool commit_error = dbctx->get_commit_error();
dbctx->clear_error();
/* WRITE/CLOSE */
j = 0;
for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
++j) {
pollfd& pfd = pfds[j];
hstcpsvr_conn& conn = **i;
hstcpsvr_conns_type::iterator icur = i;
++i;
if (commit_error) {
conn.reset();
continue;
}
if ((pfd.revents & (mask_out | mask_in)) != 0) {
if (conn.write_more()) {
conn.nb_last_io = now;
}
}
if (cshared.sockargs.timeout != 0 &&
conn.nb_last_io + cshared.sockargs.timeout < now) {
conn.reset();
}
if (conn.closed() || conn.ok_to_close()) {
conns.erase_ptr(icur);
}
}
/* ACCEPT */
{
pollfd& pfd = pfds[nfds - 1];
if ((pfd.revents & mask_in) != 0) {
std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
c->nonblocking = true;
c->readsize = cshared.readsize;
c->accept(cshared);
if (c->fd.get() >= 0) {
if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
fatal_abort("F_SETFL O_NONBLOCK");
}
c->nb_last_io = now;
conns.push_back_ptr(c);
} else {
/* errno == 11 (EAGAIN) is not a fatal error. */
DENA_VERBOSE(100, fprintf(stderr,
"accept failed: errno=%d (not fatal)\n", errno));
}
}
}
DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds,
conns.size()));
if (conns.empty()) {
dbctx->close_tables_if();
}
dbctx->set_statistics(conns.size(), 0);
return 0;
}
#ifdef __linux__
int
hstcpsvr_worker::run_one_ep()
{
epoll_event *const events = &events_vec[0];
const size_t num_events = events_vec.size();
const time_t now = time(0);
size_t in_count = 0, out_count = 0, accept_count = 0;
int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000);
/* READ/ACCEPT */
dbctx->set_statistics(conns.size(), nfds);
for (int i = 0; i < nfds; ++i) {
epoll_event& ev = events[i];
if ((ev.events & EPOLLIN) == 0) {
continue;
}
hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
if (conn == 0) {
/* listener */
++accept_count;
DBG_EP(fprintf(stderr, "IN listener\n"));
std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
c->nonblocking = true;
c->readsize = cshared.readsize;
c->accept(cshared);
if (c->fd.get() >= 0) {
if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
fatal_abort("F_SETFL O_NONBLOCK");
}
epoll_event cev;
memset(&cev, 0, sizeof(cev));
cev.events = EPOLLIN | EPOLLOUT | EPOLLET;
cev.data.ptr = c.get();
c->nb_last_io = now;
const int fd = c->fd.get();
conns.push_back_ptr(c);
conns.back()->conns_iter = --conns.end();
if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) {
fatal_abort("epoll_ctl EPOLL_CTL_ADD");
}
} else {
DENA_VERBOSE(100, fprintf(stderr,
"accept failed: errno=%d (not fatal)\n", errno));
}
} else {
/* client connection */
++in_count;
DBG_EP(fprintf(stderr, "IN client\n"));
bool more_data = false;
while (conn->read_more(&more_data)) {
DBG_EP(fprintf(stderr, "IN client read_more\n"));
conn->nb_last_io = now;
if (!more_data) {
break;
}
}
}
}
/* EXECUTE */
for (int i = 0; i < nfds; ++i) {
epoll_event& ev = events[i];
hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
if ((ev.events & EPOLLIN) == 0 || conn == 0 ||
conn->cstate.readbuf.size() == 0) {
continue;
}
const char ch = conn->cstate.readbuf.begin()[0];
if (ch == 'Q') {
vshared.shutdown = 1;
} else if (ch == '/') {
conn->cstate.readbuf.clear();
conn->cstate.find_nl_pos = 0;
conn->cstate.writebuf.clear();
conn->read_finished = true;
conn->write_finished = true;
} else {
execute_lines(*conn);
}
}
/* COMMIT */
dbctx->unlock_tables_if();
const bool commit_error = dbctx->get_commit_error();
dbctx->clear_error();
/* WRITE */
for (int i = 0; i < nfds; ++i) {
epoll_event& ev = events[i];
hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
if (commit_error && conn != 0) {
conn->reset();
continue;
}
if ((ev.events & EPOLLOUT) == 0) {
continue;
}
++out_count;
if (conn == 0) {
/* listener */
DBG_EP(fprintf(stderr, "OUT listener\n"));
} else {
/* client connection */
DBG_EP(fprintf(stderr, "OUT client\n"));
bool more_data = false;
while (conn->write_more(&more_data)) {
DBG_EP(fprintf(stderr, "OUT client write_more\n"));
conn->nb_last_io = now;
if (!more_data) {
break;
}
}
}
}
/* CLOSE */
for (int i = 0; i < nfds; ++i) {
epoll_event& ev = events[i];
hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
if (conn != 0 && conn->ok_to_close()) {
DBG_EP(fprintf(stderr, "CLOSE close\n"));
conns.erase_ptr(conn->conns_iter);
}
}
/* TIMEOUT & cleanup */
if (last_check_time + 10 < now) {
for (hstcpsvr_conns_type::iterator i = conns.begin();
i != conns.end(); ) {
hstcpsvr_conns_type::iterator icur = i;
++i;
if (cshared.sockargs.timeout != 0 &&
(*icur)->nb_last_io + cshared.sockargs.timeout < now) {
conns.erase_ptr((*icur)->conns_iter);
}
}
last_check_time = now;
DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds,
conns.size()));
}
DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n",
this, in_count, out_count, accept_count, conns.size()));
if (conns.empty()) {
dbctx->close_tables_if();
}
/* STATISTICS */
const size_t num_conns = conns.size();
dbctx->set_statistics(num_conns, 0);
/* ENABLE/DISABLE ACCEPT */
if (accept_balance != 0) {
cshared.thread_num_conns[worker_id] = num_conns;
size_t total_num_conns = 0;
for (long i = 0; i < cshared.num_threads; ++i) {
total_num_conns += cshared.thread_num_conns[i];
}
bool e_acc = false;
if (num_conns < 10 ||
total_num_conns * 2 > num_conns * cshared.num_threads) {
e_acc = true;
}
epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
ev.data.ptr = 0;
if (e_acc == accept_enabled) {
} else if (e_acc) {
if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
!= 0) {
fatal_abort("epoll_ctl EPOLL_CTL_ADD");
}
} else {
if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev)
!= 0) {
fatal_abort("epoll_ctl EPOLL_CTL_ADD");
}
}
accept_enabled = e_acc;
}
return 0;
}
#endif
void
hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn)
{
DBG_MULTI(int cnt = 0);
dbconnstate& cstate = conn.cstate;
char *buf_end = cstate.readbuf.end();
char *line_begin = cstate.readbuf.begin();
char *find_pos = line_begin + cstate.find_nl_pos;
while (true) {
char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos);
if (nl == 0) {
break;
}
char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl;
DBG_MULTI(cnt++);
execute_line(line_begin, lf, conn);
find_pos = line_begin = nl + 1;
}
cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin());
cstate.find_nl_pos = cstate.readbuf.size();
DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt));
}
void
hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn)
{
/* safe to modify, safe to dereference 'finish' */
char *const cmd_begin = start;
read_token(start, finish);
char *const cmd_end = start;
skip_one(start, finish);
if (cmd_begin == cmd_end) {
return conn.dbcb_resp_short(2, "cmd");
}
if (cmd_begin + 1 == cmd_end) {
if (cmd_begin[0] == 'P') {
if (cshared.require_auth && !conn.authorized) {
return conn.dbcb_resp_short(3, "unauth");
}
return do_open_index(start, finish, conn);
}
if (cmd_begin[0] == 'A') {
return do_authorization(start, finish, conn);
}
}
if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') {
if (cshared.require_auth && !conn.authorized) {
return conn.dbcb_resp_short(3, "unauth");
}
return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn);
}
return conn.dbcb_resp_short(2, "cmd");
}
void
hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn)
{
const size_t pst_id = read_ui32(start, finish);
skip_one(start, finish);
/* dbname */
char *const dbname_begin = start;
read_token(start, finish);
char *const dbname_end = start;
skip_one(start, finish);
/* tblname */
char *const tblname_begin = start;
read_token(start, finish);
char *const tblname_end = start;
skip_one(start, finish);
/* idxname */
char *const idxname_begin = start;
read_token(start, finish);
char *const idxname_end = start;
skip_one(start, finish);
/* retfields */
char *const retflds_begin = start;
read_token(start, finish);
char *const retflds_end = start;
skip_one(start, finish);
/* filfields */
char *const filflds_begin = start;
read_token(start, finish);
char *const filflds_end = start;
dbname_end[0] = 0;
tblname_end[0] = 0;
idxname_end[0] = 0;
retflds_end[0] = 0;
filflds_end[0] = 0;
cmd_open_args args;
args.pst_id = pst_id;
args.dbn = dbname_begin;
args.tbl = tblname_begin;
args.idx = idxname_begin;
args.retflds = retflds_begin;
args.filflds = filflds_begin;
return dbctx->cmd_open(conn, args);
}
void
hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
char *finish, hstcpsvr_conn& conn)
{
cmd_exec_args args;
const size_t pst_id = read_ui32(cmd_begin, cmd_end);
if (pst_id >= conn.cstate.prep_stmts.size()) {
return conn.dbcb_resp_short(2, "stmtnum");
}
args.pst = &conn.cstate.prep_stmts[pst_id];
char *const op_begin = start;
read_token(start, finish);
char *const op_end = start;
args.op = string_ref(op_begin, op_end);
skip_one(start, finish);
const uint32_t fldnum = read_ui32(start, finish);
string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum);
auto_alloca_free<string_ref> flds_autofree(flds);
args.kvals = flds;
args.kvalslen = fldnum;
for (size_t i = 0; i < fldnum; ++i) {
skip_one(start, finish);
char *const f_begin = start;
read_token(start, finish);
char *const f_end = start;
if (is_null_expression(f_begin, f_end)) {
/* null */
flds[i] = string_ref();
} else {
/* non-null */
char *wp = f_begin;
unescape_string(wp, f_begin, f_end);
flds[i] = string_ref(f_begin, wp - f_begin);
}
}
skip_one(start, finish);
args.limit = read_ui32(start, finish);
skip_one(start, finish);
args.skip = read_ui32(start, finish);
if (start == finish) {
/* simple query */
return dbctx->cmd_exec(conn, args);
}
/* has more options */
skip_one(start, finish);
/* in-clause */
if (start[0] == '@') {
read_token(start, finish); /* '@' */
skip_one(start, finish);
args.invalues_keypart = read_ui32(start, finish);
skip_one(start, finish);
args.invalueslen = read_ui32(start, finish);
if (args.invalueslen <= 0) {
return conn.dbcb_resp_short(2, "invalueslen");
}
if (invalues_work.size() < args.invalueslen) {
invalues_work.resize(args.invalueslen);
}
args.invalues = &invalues_work[0];
for (uint32_t i = 0; i < args.invalueslen; ++i) {
skip_one(start, finish);
char *const invalue_begin = start;
read_token(start, finish);
char *const invalue_end = start;
char *wp = invalue_begin;
unescape_string(wp, invalue_begin, invalue_end);
invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin);
}
skip_one(start, finish);
}
if (start == finish) {
/* no more options */
return dbctx->cmd_exec(conn, args);
}
/* filters */
size_t filters_count = 0;
while (start != finish && (start[0] == 'W' || start[0] == 'F')) {
char *const filter_type_begin = start;
read_token(start, finish);
char *const filter_type_end = start;
skip_one(start, finish);
char *const filter_op_begin = start;
read_token(start, finish);
char *const filter_op_end = start;
skip_one(start, finish);
const uint32_t ff_offset = read_ui32(start, finish);
skip_one(start, finish);
char *const filter_val_begin = start;
read_token(start, finish);
char *const filter_val_end = start;
skip_one(start, finish);
if (filters_work.size() <= filters_count) {
filters_work.resize(filters_count + 1);
}
record_filter& fi = filters_work[filters_count];
if (filter_type_end != filter_type_begin + 1) {
return conn.dbcb_resp_short(2, "filtertype");
}
fi.filter_type = (filter_type_begin[0] == 'W')
? record_filter_type_break : record_filter_type_skip;
const uint32_t num_filflds = args.pst->get_filter_fields().size();
if (ff_offset >= num_filflds) {
return conn.dbcb_resp_short(2, "filterfld");
}
fi.op = string_ref(filter_op_begin, filter_op_end);
fi.ff_offset = ff_offset;
if (is_null_expression(filter_val_begin, filter_val_end)) {
/* null */
fi.val = string_ref();
} else {
/* non-null */
char *wp = filter_val_begin;
unescape_string(wp, filter_val_begin, filter_val_end);
fi.val = string_ref(filter_val_begin, wp - filter_val_begin);
}
++filters_count;
}
if (filters_count > 0) {
if (filters_work.size() <= filters_count) {
filters_work.resize(filters_count + 1);
}
filters_work[filters_count].op = string_ref(); /* sentinel */
args.filters = &filters_work[0];
} else {
args.filters = 0;
}
if (start == finish) {
/* no modops */
return dbctx->cmd_exec(conn, args);
}
/* has modops */
char *const mod_op_begin = start;
read_token(start, finish);
char *const mod_op_end = start;
args.mod_op = string_ref(mod_op_begin, mod_op_end);
const size_t num_uvals = args.pst->get_ret_fields().size();
string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals);
auto_alloca_free<string_ref> uflds_autofree(uflds);
for (size_t i = 0; i < num_uvals; ++i) {
skip_one(start, finish);
char *const f_begin = start;
read_token(start, finish);
char *const f_end = start;
if (is_null_expression(f_begin, f_end)) {
/* null */
uflds[i] = string_ref();
} else {
/* non-null */
char *wp = f_begin;
unescape_string(wp, f_begin, f_end);
uflds[i] = string_ref(f_begin, wp - f_begin);
}
}
args.uvals = uflds;
return dbctx->cmd_exec(conn, args);
}
void
hstcpsvr_worker::do_authorization(char *start, char *finish,
hstcpsvr_conn& conn)
{
/* auth type */
char *const authtype_begin = start;
read_token(start, finish);
char *const authtype_end = start;
const size_t authtype_len = authtype_end - authtype_begin;
skip_one(start, finish);
/* key */
char *const key_begin = start;
read_token(start, finish);
char *const key_end = start;
const size_t key_len = key_end - key_begin;
authtype_end[0] = 0;
key_end[0] = 0;
char *wp = key_begin;
unescape_string(wp, key_begin, key_end);
if (authtype_len != 1 || authtype_begin[0] != '1') {
return conn.dbcb_resp_short(3, "authtype");
}
if (cshared.plain_secret.size() == key_len &&
memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) {
conn.authorized = true;
} else {
conn.authorized = false;
}
if (!conn.authorized) {
return conn.dbcb_resp_short(3, "unauth");
} else {
return conn.dbcb_resp_short(0, "");
}
}
hstcpsvr_worker_ptr
hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg)
{
return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg));
}
};