mirror of
https://github.com/MariaDB/server.git
synced 2025-02-20 20:33:15 +01:00
1202 lines
31 KiB
C
1202 lines
31 KiB
C
/* -*- c-basic-offset: 2 -*- */
|
|
/* Copyright(C) 2009-2012 Brazil
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Lesser General Public
|
|
License version 2.1 as published by the Free Software Foundation.
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public
|
|
License along with this library; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
|
|
*/
|
|
|
|
#include "grn.h"
|
|
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include "grn_ctx_impl.h"
|
|
|
|
#ifdef WIN32
|
|
# include <ws2tcpip.h>
|
|
#else
|
|
# ifdef HAVE_SYS_SOCKET_H
|
|
# include <sys/socket.h>
|
|
# endif /* HAVE_SYS_SOCKET_H */
|
|
# include <netinet/in.h>
|
|
# include <netinet/tcp.h>
|
|
# ifdef HAVE_SIGNAL_H
|
|
# include <signal.h>
|
|
# endif /* HAVE_SIGNAL_H */
|
|
# include <sys/uio.h>
|
|
#endif /* WIN32 */
|
|
|
|
#include "grn_ctx.h"
|
|
#include "grn_com.h"
|
|
|
|
#ifndef PF_INET
|
|
#define PF_INET AF_INET
|
|
#endif /* PF_INET */
|
|
|
|
#ifndef SOL_TCP
|
|
# ifdef IPPROTO_TCP
|
|
# define SOL_TCP IPPROTO_TCP
|
|
# else
|
|
# define SOL_TCP 6
|
|
# endif /* IPPROTO_TCP */
|
|
#endif /* SOL_TCP */
|
|
|
|
#ifndef USE_MSG_MORE
|
|
# ifdef MSG_MORE
|
|
# undef MSG_MORE
|
|
# endif
|
|
# define MSG_MORE 0
|
|
#endif /* USE_MSG_MORE */
|
|
|
|
|
|
#ifndef USE_MSG_NOSIGNAL
|
|
# ifdef MSG_NOSIGNAL
|
|
# undef MSG_NOSIGNAL
|
|
# endif
|
|
# define MSG_NOSIGNAL 0
|
|
#endif /* USE_MSG_NOSIGNAL */
|
|
/******* grn_com_queue ********/
|
|
|
|
grn_rc
|
|
grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e)
|
|
{
|
|
CRITICAL_SECTION_ENTER(q->cs);
|
|
e->next = NULL;
|
|
*q->tail = e;
|
|
q->tail = &e->next;
|
|
CRITICAL_SECTION_LEAVE(q->cs);
|
|
/*
|
|
uint8_t i = q->last + 1;
|
|
e->next = NULL;
|
|
if (q->first == i || q->next) {
|
|
CRITICAL_SECTION_ENTER(q->cs);
|
|
if (q->first == i || q->next) {
|
|
*q->tail = e;
|
|
q->tail = &e->next;
|
|
} else {
|
|
q->bins[q->last] = e;
|
|
q->last = i;
|
|
}
|
|
CRITICAL_SECTION_LEAVE(q->cs);
|
|
} else {
|
|
q->bins[q->last] = e;
|
|
q->last = i;
|
|
}
|
|
*/
|
|
return GRN_SUCCESS;
|
|
}
|
|
|
|
grn_com_queue_entry *
|
|
grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q)
|
|
{
|
|
grn_com_queue_entry *e = NULL;
|
|
|
|
CRITICAL_SECTION_ENTER(q->cs);
|
|
if (q->next) {
|
|
e = q->next;
|
|
if (!(q->next = e->next)) { q->tail = &q->next; }
|
|
}
|
|
CRITICAL_SECTION_LEAVE(q->cs);
|
|
|
|
/*
|
|
if (q->first == q->last) {
|
|
if (q->next) {
|
|
CRITICAL_SECTION_ENTER(q->cs);
|
|
e = q->next;
|
|
if (!(q->next = e->next)) { q->tail = &q->next; }
|
|
CRITICAL_SECTION_LEAVE(q->cs);
|
|
}
|
|
} else {
|
|
e = q->bins[q->first++];
|
|
}
|
|
*/
|
|
return e;
|
|
}
|
|
|
|
/******* grn_msg ********/
|
|
|
|
grn_obj *
|
|
grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old)
|
|
{
|
|
grn_msg *msg = NULL;
|
|
if (old && (msg = (grn_msg *)grn_com_queue_deque(ctx, old))) {
|
|
if (msg->ctx != ctx) {
|
|
ERR(GRN_INVALID_ARGUMENT, "ctx unmatch");
|
|
return NULL;
|
|
}
|
|
GRN_BULK_REWIND(&msg->qe.obj);
|
|
} else if ((msg = GRN_MALLOCN(grn_msg, 1))) {
|
|
GRN_OBJ_INIT(&msg->qe.obj, GRN_MSG, 0, GRN_DB_TEXT);
|
|
msg->qe.obj.header.impl_flags |= GRN_OBJ_ALLOCATED;
|
|
msg->ctx = ctx;
|
|
}
|
|
msg->qe.next = NULL;
|
|
msg->u.peer = com;
|
|
msg->old = old;
|
|
memset(&msg->header, 0, sizeof(grn_com_header));
|
|
return (grn_obj *)msg;
|
|
}
|
|
|
|
grn_obj *
|
|
grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old)
|
|
{
|
|
grn_msg *req = (grn_msg *)query, *msg = NULL;
|
|
if (req && (msg = (grn_msg *)grn_msg_open(ctx, req->u.peer, old))) {
|
|
msg->edge_id = req->edge_id;
|
|
msg->header.proto = req->header.proto == GRN_COM_PROTO_MBREQ
|
|
? GRN_COM_PROTO_MBRES : req->header.proto;
|
|
}
|
|
return (grn_obj *)msg;
|
|
}
|
|
|
|
grn_rc
|
|
grn_msg_close(grn_ctx *ctx, grn_obj *obj)
|
|
{
|
|
grn_msg *msg = (grn_msg *)obj;
|
|
if (ctx == msg->ctx) { return grn_obj_close(ctx, obj); }
|
|
return grn_com_queue_enque(ctx, msg->old, (grn_com_queue_entry *)msg);
|
|
}
|
|
|
|
grn_rc
|
|
grn_msg_set_property(grn_ctx *ctx, grn_obj *obj,
|
|
uint16_t status, uint32_t key_size, uint8_t extra_size)
|
|
{
|
|
grn_com_header *header = &((grn_msg *)obj)->header;
|
|
header->status = htons(status);
|
|
header->keylen = htons(key_size);
|
|
header->level = extra_size;
|
|
return GRN_SUCCESS;
|
|
}
|
|
|
|
grn_rc
|
|
grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags)
|
|
{
|
|
grn_rc rc;
|
|
grn_msg *m = (grn_msg *)msg;
|
|
grn_com *peer = m->u.peer;
|
|
grn_com_header *header = &m->header;
|
|
if (GRN_COM_QUEUE_EMPTYP(&peer->new_)) {
|
|
switch (header->proto) {
|
|
case GRN_COM_PROTO_HTTP :
|
|
{
|
|
ssize_t ret;
|
|
ret = send(peer->fd, GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), MSG_NOSIGNAL);
|
|
if (ret == -1) { SOERR("send"); }
|
|
if (ctx->rc != GRN_OPERATION_WOULD_BLOCK) {
|
|
grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
|
|
return ctx->rc;
|
|
}
|
|
}
|
|
break;
|
|
case GRN_COM_PROTO_GQTP :
|
|
{
|
|
if (flags & GRN_CTX_MORE) { flags |= GRN_CTX_QUIET; }
|
|
if (ctx->stat == GRN_CTX_QUIT) { flags |= GRN_CTX_QUIT; }
|
|
header->qtype = (uint8_t) ctx->impl->output.type;
|
|
header->keylen = 0;
|
|
header->level = 0;
|
|
header->flags = flags;
|
|
header->status = htons((uint16_t)ctx->rc);
|
|
header->opaque = 0;
|
|
header->cas = 0;
|
|
//todo : MSG_DONTWAIT
|
|
rc = grn_com_send(ctx, peer, header,
|
|
GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), 0);
|
|
if (rc != GRN_OPERATION_WOULD_BLOCK) {
|
|
grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
|
|
return rc;
|
|
}
|
|
}
|
|
break;
|
|
case GRN_COM_PROTO_MBREQ :
|
|
return GRN_FUNCTION_NOT_IMPLEMENTED;
|
|
case GRN_COM_PROTO_MBRES :
|
|
rc = grn_com_send(ctx, peer, header,
|
|
GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg),
|
|
(flags & GRN_CTX_MORE) ? MSG_MORE :0);
|
|
if (rc != GRN_OPERATION_WOULD_BLOCK) {
|
|
grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
|
|
return rc;
|
|
}
|
|
break;
|
|
default :
|
|
return GRN_INVALID_ARGUMENT;
|
|
}
|
|
}
|
|
MUTEX_LOCK(peer->ev->mutex);
|
|
rc = grn_com_queue_enque(ctx, &peer->new_, (grn_com_queue_entry *)msg);
|
|
COND_SIGNAL(peer->ev->cond);
|
|
MUTEX_UNLOCK(peer->ev->mutex);
|
|
return rc;
|
|
}
|
|
|
|
/******* grn_com ********/
|
|
|
|
grn_rc
|
|
grn_com_init(void)
|
|
{
|
|
#ifdef WIN32
|
|
WSADATA wd;
|
|
if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) {
|
|
grn_ctx *ctx = &grn_gctx;
|
|
SOERR("WSAStartup");
|
|
}
|
|
#else /* WIN32 */
|
|
#ifndef USE_MSG_NOSIGNAL
|
|
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
|
|
grn_ctx *ctx = &grn_gctx;
|
|
SERR("signal");
|
|
}
|
|
#endif /* USE_MSG_NOSIGNAL */
|
|
#endif /* WIN32 */
|
|
return grn_gctx.rc;
|
|
}
|
|
|
|
void
|
|
grn_com_fin(void)
|
|
{
|
|
#ifdef WIN32
|
|
WSACleanup();
|
|
#endif /* WIN32 */
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size)
|
|
{
|
|
ev->max_nevents = max_nevents;
|
|
if ((ev->hash = grn_hash_create(ctx, NULL, sizeof(grn_sock), data_size, 0))) {
|
|
MUTEX_INIT(ev->mutex);
|
|
COND_INIT(ev->cond);
|
|
GRN_COM_QUEUE_INIT(&ev->recv_old);
|
|
ev->msg_handler = NULL;
|
|
memset(&(ev->curr_edge_id), 0, sizeof(grn_com_addr));
|
|
ev->acceptor = NULL;
|
|
ev->opaque = NULL;
|
|
#ifndef USE_SELECT
|
|
# ifdef USE_EPOLL
|
|
if ((ev->events = GRN_MALLOC(sizeof(struct epoll_event) * max_nevents))) {
|
|
if ((ev->epfd = epoll_create(max_nevents)) != -1) {
|
|
goto exit;
|
|
} else {
|
|
SERR("epoll_create");
|
|
}
|
|
GRN_FREE(ev->events);
|
|
}
|
|
# else /* USE_EPOLL */
|
|
# ifdef USE_KQUEUE
|
|
if ((ev->events = GRN_MALLOC(sizeof(struct kevent) * max_nevents))) {
|
|
if ((ev->kqfd = kqueue()) != -1) {
|
|
goto exit;
|
|
} else {
|
|
SERR("kqueue");
|
|
}
|
|
GRN_FREE(ev->events);
|
|
}
|
|
# else /* USE_KQUEUE */
|
|
if ((ev->events = GRN_MALLOC(sizeof(struct pollfd) * max_nevents))) {
|
|
goto exit;
|
|
}
|
|
# endif /* USE_KQUEUE*/
|
|
# endif /* USE_EPOLL */
|
|
grn_hash_close(ctx, ev->hash);
|
|
ev->hash = NULL;
|
|
ev->events = NULL;
|
|
#else /* USE_SELECT */
|
|
goto exit;
|
|
#endif /* USE_SELECT */
|
|
}
|
|
exit :
|
|
return ctx->rc;
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev)
|
|
{
|
|
grn_obj *msg;
|
|
while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &ev->recv_old))) {
|
|
grn_msg_close(ctx, msg);
|
|
}
|
|
if (ev->hash) { grn_hash_close(ctx, ev->hash); }
|
|
#ifndef USE_SELECT
|
|
if (ev->events) { GRN_FREE(ev->events); }
|
|
# ifdef USE_EPOLL
|
|
grn_close(ev->epfd);
|
|
# endif /* USE_EPOLL */
|
|
# ifdef USE_KQUEUE
|
|
grn_close(ev->kqfd);
|
|
# endif /* USE_KQUEUE*/
|
|
#endif /* USE_SELECT */
|
|
return GRN_SUCCESS;
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
|
|
{
|
|
grn_com *c;
|
|
/* todo : expand events */
|
|
if (!ev || *ev->hash->n_entries == (uint32_t) ev->max_nevents) {
|
|
if (ev) { GRN_LOG(ctx, GRN_LOG_ERROR, "too many connections (%d)", ev->max_nevents); }
|
|
return GRN_INVALID_ARGUMENT;
|
|
}
|
|
#ifdef USE_EPOLL
|
|
{
|
|
struct epoll_event e;
|
|
memset(&e, 0, sizeof(struct epoll_event));
|
|
e.data.fd = (fd);
|
|
e.events = (uint32_t) events;
|
|
if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) {
|
|
SERR("epoll_ctl");
|
|
return ctx->rc;
|
|
}
|
|
}
|
|
#endif /* USE_EPOLL*/
|
|
#ifdef USE_KQUEUE
|
|
{
|
|
struct kevent e;
|
|
/* todo: udata should have fd */
|
|
EV_SET(&e, (fd), events, EV_ADD, 0, 0, NULL);
|
|
if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
|
|
SERR("kevent");
|
|
return ctx->rc;
|
|
}
|
|
}
|
|
#endif /* USE_KQUEUE */
|
|
{
|
|
if (grn_hash_add(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c, NULL)) {
|
|
c->ev = ev;
|
|
c->fd = fd;
|
|
c->events = events;
|
|
if (com) { *com = c; }
|
|
}
|
|
}
|
|
return ctx->rc;
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
|
|
{
|
|
grn_com *c;
|
|
if (!ev) { return GRN_INVALID_ARGUMENT; }
|
|
if (grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c)) {
|
|
if (c->fd != fd) {
|
|
GRN_LOG(ctx, GRN_LOG_ERROR,
|
|
"grn_com_event_mod fd unmatch "
|
|
"%" GRN_FMT_SOCKET " != %" GRN_FMT_SOCKET,
|
|
c->fd, fd);
|
|
return GRN_OBJECT_CORRUPT;
|
|
}
|
|
if (com) { *com = c; }
|
|
if (c->events != events) {
|
|
#ifdef USE_EPOLL
|
|
struct epoll_event e;
|
|
memset(&e, 0, sizeof(struct epoll_event));
|
|
e.data.fd = (fd);
|
|
e.events = (uint32_t) events;
|
|
if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) {
|
|
SERR("epoll_ctl");
|
|
return ctx->rc;
|
|
}
|
|
#endif /* USE_EPOLL*/
|
|
#ifdef USE_KQUEUE
|
|
// experimental
|
|
struct kevent e[2];
|
|
EV_SET(&e[0], (fd), GRN_COM_POLLIN|GRN_COM_POLLOUT, EV_DELETE, 0, 0, NULL);
|
|
EV_SET(&e[1], (fd), events, EV_ADD, 0, 0, NULL);
|
|
if (kevent(ev->kqfd, e, 2, NULL, 0, NULL) == -1) {
|
|
SERR("kevent");
|
|
return ctx->rc;
|
|
}
|
|
#endif /* USE_KQUEUE */
|
|
c->events = events;
|
|
}
|
|
return GRN_SUCCESS;
|
|
}
|
|
return GRN_INVALID_ARGUMENT;
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd)
|
|
{
|
|
if (!ev) { return GRN_INVALID_ARGUMENT; }
|
|
{
|
|
grn_com *c;
|
|
grn_id id = grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c);
|
|
if (id) {
|
|
#ifdef USE_EPOLL
|
|
if (!c->closed) {
|
|
struct epoll_event e;
|
|
memset(&e, 0, sizeof(struct epoll_event));
|
|
e.data.fd = fd;
|
|
e.events = c->events;
|
|
if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) {
|
|
SERR("epoll_ctl");
|
|
return ctx->rc;
|
|
}
|
|
}
|
|
#endif /* USE_EPOLL*/
|
|
#ifdef USE_KQUEUE
|
|
struct kevent e;
|
|
EV_SET(&e, (fd), c->events, EV_DELETE, 0, 0, NULL);
|
|
if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
|
|
SERR("kevent");
|
|
return ctx->rc;
|
|
}
|
|
#endif /* USE_KQUEUE */
|
|
return grn_hash_delete_by_id(ctx, ev->hash, id, NULL);
|
|
} else {
|
|
GRN_LOG(ctx, GRN_LOG_ERROR,
|
|
"%04x| fd(%" GRN_FMT_SOCKET ") not found in ev(%p)",
|
|
grn_getpid(), fd, ev);
|
|
return GRN_INVALID_ARGUMENT;
|
|
}
|
|
}
|
|
}
|
|
|
|
#define LISTEN_BACKLOG 0x1000
|
|
|
|
grn_rc
|
|
grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev)
|
|
{
|
|
grn_com *com = ev->acceptor;
|
|
|
|
if (com->accepting) {return ctx->rc;}
|
|
|
|
GRN_API_ENTER;
|
|
if (!grn_com_event_mod(ctx, ev, com->fd, GRN_COM_POLLIN, NULL)) {
|
|
if (listen(com->fd, LISTEN_BACKLOG) == 0) {
|
|
com->accepting = GRN_TRUE;
|
|
} else {
|
|
SOERR("listen - start accept");
|
|
}
|
|
}
|
|
GRN_API_RETURN(ctx->rc);
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev)
|
|
{
|
|
grn_com *com = ev->acceptor;
|
|
|
|
if (!com->accepting) {return ctx->rc;}
|
|
|
|
GRN_API_ENTER;
|
|
if (!grn_com_event_mod(ctx, ev, com->fd, 0, NULL)) {
|
|
if (listen(com->fd, 0) == 0) {
|
|
com->accepting = GRN_FALSE;
|
|
} else {
|
|
SOERR("listen - disable accept");
|
|
}
|
|
}
|
|
GRN_API_RETURN(ctx->rc);
|
|
}
|
|
|
|
static void
|
|
grn_com_receiver(grn_ctx *ctx, grn_com *com)
|
|
{
|
|
grn_com_event *ev = com->ev;
|
|
ERRCLR(ctx);
|
|
if (ev->acceptor == com) {
|
|
grn_com *ncs;
|
|
grn_sock fd = accept(com->fd, NULL, NULL);
|
|
if (fd == -1) {
|
|
if (errno == EMFILE) {
|
|
grn_com_event_stop_accept(ctx, ev);
|
|
} else {
|
|
SOERR("accept");
|
|
}
|
|
return;
|
|
}
|
|
if (grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, (grn_com **)&ncs)) {
|
|
grn_sock_close(fd);
|
|
return;
|
|
}
|
|
ncs->has_sid = 0;
|
|
ncs->closed = 0;
|
|
ncs->opaque = NULL;
|
|
GRN_COM_QUEUE_INIT(&ncs->new_);
|
|
// GRN_LOG(ctx, GRN_LOG_NOTICE, "accepted (%d)", fd);
|
|
return;
|
|
} else {
|
|
grn_msg *msg = (grn_msg *)grn_msg_open(ctx, com, &ev->recv_old);
|
|
grn_com_recv(ctx, msg->u.peer, &msg->header, (grn_obj *)msg);
|
|
if (msg->u.peer /* is_edge_request(msg)*/) {
|
|
grn_memcpy(&msg->edge_id, &ev->curr_edge_id, sizeof(grn_com_addr));
|
|
if (!com->has_sid) {
|
|
com->has_sid = 1;
|
|
com->sid = ev->curr_edge_id.sid++;
|
|
}
|
|
msg->edge_id.sid = com->sid;
|
|
}
|
|
msg->acceptor = ev->acceptor;
|
|
ev->msg_handler(ctx, (grn_obj *)msg);
|
|
}
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout)
|
|
{
|
|
int nevents;
|
|
grn_com *com;
|
|
#ifdef USE_SELECT
|
|
uint32_t dummy;
|
|
grn_sock *pfd;
|
|
int nfds = 0;
|
|
fd_set rfds;
|
|
fd_set wfds;
|
|
struct timeval tv;
|
|
if (timeout >= 0) {
|
|
tv.tv_sec = timeout / 1000;
|
|
tv.tv_usec = (timeout % 1000) * 1000;
|
|
}
|
|
FD_ZERO(&rfds);
|
|
FD_ZERO(&wfds);
|
|
ctx->errlvl = GRN_OK;
|
|
ctx->rc = GRN_SUCCESS;
|
|
{
|
|
grn_hash_cursor *cursor;
|
|
cursor = grn_hash_cursor_open(ctx, ev->hash, NULL, 0, NULL, 0, 0, -1, 0);
|
|
if (cursor) {
|
|
grn_id id;
|
|
while ((id = grn_hash_cursor_next(ctx, cursor))) {
|
|
grn_hash_cursor_get_key_value(ctx,
|
|
cursor,
|
|
(void **)(&pfd),
|
|
&dummy,
|
|
(void **)(&com));
|
|
if (com->events & GRN_COM_POLLIN) { FD_SET(*pfd, &rfds); }
|
|
if (com->events & GRN_COM_POLLOUT) { FD_SET(*pfd, &wfds); }
|
|
# ifndef WIN32
|
|
if (*pfd > nfds) { nfds = *pfd; }
|
|
# endif /* WIN32 */
|
|
}
|
|
grn_hash_cursor_close(ctx, cursor);
|
|
}
|
|
}
|
|
nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL);
|
|
if (nevents < 0) {
|
|
SOERR("select");
|
|
if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { ERRCLR(ctx); }
|
|
return ctx->rc;
|
|
}
|
|
if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "select returns 0 events"); }
|
|
GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
|
|
if (FD_ISSET(*pfd, &rfds)) { grn_com_receiver(ctx, com); }
|
|
});
|
|
#else /* USE_SELECT */
|
|
# ifdef USE_EPOLL
|
|
struct epoll_event *ep;
|
|
ctx->errlvl = GRN_OK;
|
|
ctx->rc = GRN_SUCCESS;
|
|
nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout);
|
|
if (nevents < 0) {
|
|
SERR("epoll_wait");
|
|
}
|
|
# else /* USE_EPOLL */
|
|
# ifdef USE_KQUEUE
|
|
struct kevent *ep;
|
|
struct timespec tv;
|
|
if (timeout >= 0) {
|
|
tv.tv_sec = timeout / 1000;
|
|
tv.tv_nsec = (timeout % 1000) * 1000;
|
|
}
|
|
nevents = kevent(ev->kqfd, NULL, 0, ev->events, ev->max_nevents, &tv);
|
|
if (nevents < 0) {
|
|
SERR("kevent");
|
|
}
|
|
# else /* USE_KQUEUE */
|
|
uint32_t dummy;
|
|
int nfd = 0, *pfd;
|
|
struct pollfd *ep = ev->events;
|
|
ctx->errlvl = GRN_OK;
|
|
ctx->rc = GRN_SUCCESS;
|
|
GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
|
|
ep->fd = *pfd;
|
|
// ep->events =(short) com->events;
|
|
ep->events = POLLIN;
|
|
ep->revents = 0;
|
|
ep++;
|
|
nfd++;
|
|
});
|
|
nevents = poll(ev->events, nfd, timeout);
|
|
if (nevents < 0) {
|
|
SERR("poll");
|
|
}
|
|
# endif /* USE_KQUEUE */
|
|
# endif /* USE_EPOLL */
|
|
if (ctx->rc != GRN_SUCCESS) {
|
|
if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
|
|
ERRCLR(ctx);
|
|
}
|
|
return ctx->rc;
|
|
}
|
|
if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "poll returns 0 events"); }
|
|
for (ep = ev->events; nevents; ep++) {
|
|
int efd;
|
|
# ifdef USE_EPOLL
|
|
efd = ep->data.fd;
|
|
nevents--;
|
|
// todo : com = ep->data.ptr;
|
|
if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
|
|
struct epoll_event e;
|
|
GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
|
|
memset(&e, 0, sizeof(struct epoll_event));
|
|
e.data.fd = efd;
|
|
e.events = ep->events;
|
|
if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) { SERR("epoll_ctl"); }
|
|
if (grn_sock_close(efd) == -1) { SOERR("close"); }
|
|
continue;
|
|
}
|
|
if (ep->events & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
|
|
# else /* USE_EPOLL */
|
|
# ifdef USE_KQUEUE
|
|
efd = ep->ident;
|
|
nevents--;
|
|
// todo : com = ep->udata;
|
|
if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
|
|
struct kevent e;
|
|
GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->set", efd);
|
|
EV_SET(&e, efd, ep->filter, EV_DELETE, 0, 0, NULL);
|
|
if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { SERR("kevent"); }
|
|
if (grn_sock_close(efd) == -1) { SOERR("close"); }
|
|
continue;
|
|
}
|
|
if (ep->filter == GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
|
|
# else
|
|
efd = ep->fd;
|
|
if (!(ep->events & ep->revents)) { continue; }
|
|
nevents--;
|
|
if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
|
|
GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
|
|
if (grn_sock_close(efd) == -1) { SOERR("close"); }
|
|
continue;
|
|
}
|
|
if (ep->revents & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
|
|
# endif /* USE_KQUEUE */
|
|
# endif /* USE_EPOLL */
|
|
}
|
|
#endif /* USE_SELECT */
|
|
/* todo :
|
|
while (!(msg = (grn_com_msg *)grn_com_queue_deque(&recv_old))) {
|
|
grn_msg_close(ctx, msg);
|
|
}
|
|
*/
|
|
return GRN_SUCCESS;
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags)
|
|
{
|
|
ssize_t ret;
|
|
grn_obj buf;
|
|
GRN_TEXT_INIT(&buf, 0);
|
|
GRN_TEXT_PUTS(ctx, &buf, "GET ");
|
|
grn_bulk_write(ctx, &buf, path, path_len);
|
|
GRN_TEXT_PUTS(ctx, &buf, " HTTP/1.0\r\n\r\n");
|
|
// todo : refine
|
|
if ((ret = send(cs->fd, GRN_BULK_HEAD(&buf), GRN_BULK_VSIZE(&buf), MSG_NOSIGNAL|flags)) == -1) {
|
|
SOERR("send");
|
|
}
|
|
if (ret != GRN_BULK_VSIZE(&buf)) {
|
|
GRN_LOG(ctx, GRN_LOG_NOTICE, "send %d != %d", (int)ret, (int)GRN_BULK_VSIZE(&buf));
|
|
}
|
|
grn_obj_close(ctx, &buf);
|
|
return ctx->rc;
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_send(grn_ctx *ctx, grn_com *cs,
|
|
grn_com_header *header, const char *body, uint32_t size, int flags)
|
|
{
|
|
grn_rc rc = GRN_SUCCESS;
|
|
size_t whole_size = sizeof(grn_com_header) + size;
|
|
ssize_t ret;
|
|
header->size = htonl(size);
|
|
GRN_LOG(ctx, GRN_LOG_INFO, "send (%d,%x,%d,%02x,%02x,%04x)", size, header->flags, header->proto, header->qtype, header->level, header->status);
|
|
|
|
if (size) {
|
|
#ifdef WIN32
|
|
WSABUF wsabufs[2];
|
|
DWORD n_sent;
|
|
wsabufs[0].buf = (char *)header;
|
|
wsabufs[0].len = sizeof(grn_com_header);
|
|
wsabufs[1].buf = (char *)body;
|
|
wsabufs[1].len = size;
|
|
if (WSASend(cs->fd, wsabufs, 2, &n_sent, 0, NULL, NULL) == SOCKET_ERROR) {
|
|
SOERR("WSASend");
|
|
}
|
|
ret = n_sent;
|
|
#else /* WIN32 */
|
|
struct iovec msg_iov[2];
|
|
struct msghdr msg;
|
|
memset(&msg, 0, sizeof(struct msghdr));
|
|
msg.msg_name = NULL;
|
|
msg.msg_namelen = 0;
|
|
msg.msg_iov = msg_iov;
|
|
msg.msg_iovlen = 2;
|
|
msg_iov[0].iov_base = (char*) header;
|
|
msg_iov[0].iov_len = sizeof(grn_com_header);
|
|
msg_iov[1].iov_base = (char *)body;
|
|
msg_iov[1].iov_len = size;
|
|
if ((ret = sendmsg(cs->fd, &msg, MSG_NOSIGNAL|flags)) == -1) {
|
|
SOERR("sendmsg");
|
|
rc = ctx->rc;
|
|
}
|
|
#endif /* WIN32 */
|
|
} else {
|
|
if ((ret = send(cs->fd, (const void *)header, whole_size, MSG_NOSIGNAL|flags)) == -1) {
|
|
SOERR("send");
|
|
rc = ctx->rc;
|
|
}
|
|
}
|
|
if ((size_t) ret != whole_size) {
|
|
GRN_LOG(ctx, GRN_LOG_ERROR,
|
|
"sendmsg(%" GRN_FMT_SOCKET "): %" GRN_FMT_LLD " < %" GRN_FMT_LLU,
|
|
cs->fd, (long long int)ret, (unsigned long long int)whole_size);
|
|
rc = ctx->rc;
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
#define RETRY_MAX 10
|
|
|
|
static const char *
|
|
scan_delimiter(const char *p, const char *e)
|
|
{
|
|
while (p + 4 <= e) {
|
|
if (p[3] == '\n') {
|
|
if (p[2] == '\r') {
|
|
if (p[1] == '\n') {
|
|
if (p[0] == '\r') { return p + 4; } else { p += 2; }
|
|
} else { p += 2; }
|
|
} else { p += 4; }
|
|
} else { p += p[3] == '\r' ? 1 : 4; }
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
#define BUFSIZE 4096
|
|
|
|
static grn_rc
|
|
grn_com_recv_text(grn_ctx *ctx, grn_com *com,
|
|
grn_com_header *header, grn_obj *buf, ssize_t ret)
|
|
{
|
|
const char *p;
|
|
int retry = 0;
|
|
grn_bulk_write(ctx, buf, (char *)header, ret);
|
|
if ((p = scan_delimiter(GRN_BULK_HEAD(buf), GRN_BULK_CURR(buf)))) {
|
|
header->qtype = *GRN_BULK_HEAD(buf);
|
|
header->proto = GRN_COM_PROTO_HTTP;
|
|
header->size = GRN_BULK_VSIZE(buf);
|
|
goto exit;
|
|
}
|
|
for (;;) {
|
|
if (grn_bulk_reserve(ctx, buf, BUFSIZE)) { return ctx->rc; }
|
|
if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
|
|
SOERR("recv text");
|
|
if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
|
|
ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
|
|
ERRCLR(ctx);
|
|
continue;
|
|
}
|
|
goto exit;
|
|
}
|
|
if (ret) {
|
|
off_t o = GRN_BULK_VSIZE(buf);
|
|
p = GRN_BULK_CURR(buf);
|
|
GRN_BULK_INCR_LEN(buf, ret);
|
|
if (scan_delimiter(p - (o > 3 ? 3 : o), p + ret)) {
|
|
break;
|
|
}
|
|
} else {
|
|
if (++retry > RETRY_MAX) {
|
|
// ERR(GRN_RETRY_MAX, "retry max in recv text");
|
|
goto exit;
|
|
}
|
|
}
|
|
}
|
|
header->qtype = *GRN_BULK_HEAD(buf);
|
|
header->proto = GRN_COM_PROTO_HTTP;
|
|
header->size = GRN_BULK_VSIZE(buf);
|
|
exit :
|
|
if (header->qtype == 'H') {
|
|
//todo : refine
|
|
/*
|
|
GRN_BULK_REWIND(buf);
|
|
grn_bulk_reserve(ctx, buf, BUFSIZE);
|
|
if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
|
|
SOERR("recv text body");
|
|
} else {
|
|
GRN_BULK_CURR(buf) += ret;
|
|
}
|
|
*/
|
|
}
|
|
return ctx->rc;
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_recv(grn_ctx *ctx, grn_com *com, grn_com_header *header, grn_obj *buf)
|
|
{
|
|
ssize_t ret;
|
|
int retry = 0;
|
|
byte *p = (byte *)header;
|
|
size_t rest = sizeof(grn_com_header);
|
|
do {
|
|
if ((ret = recv(com->fd, p, rest, 0)) < 0) {
|
|
SOERR("recv size");
|
|
GRN_LOG(ctx, GRN_LOG_ERROR, "recv error (%" GRN_FMT_SOCKET ")", com->fd);
|
|
if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
|
|
ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
|
|
ERRCLR(ctx);
|
|
continue;
|
|
}
|
|
goto exit;
|
|
}
|
|
if (ret) {
|
|
if (header->proto < 0x80) {
|
|
return grn_com_recv_text(ctx, com, header, buf, ret);
|
|
}
|
|
rest -= ret, p += ret;
|
|
} else {
|
|
if (++retry > RETRY_MAX) {
|
|
// ERR(GRN_RETRY_MAX, "retry max in recv header (%d)", com->fd);
|
|
goto exit;
|
|
}
|
|
}
|
|
} while (rest);
|
|
GRN_LOG(ctx, GRN_LOG_INFO,
|
|
"recv (%u,%x,%d,%02x,%02x,%04x)",
|
|
(uint32_t)ntohl(header->size),
|
|
header->flags,
|
|
header->proto,
|
|
header->qtype,
|
|
header->level,
|
|
header->status);
|
|
{
|
|
uint8_t proto = header->proto;
|
|
size_t value_size = ntohl(header->size);
|
|
GRN_BULK_REWIND(buf);
|
|
switch (proto) {
|
|
case GRN_COM_PROTO_GQTP :
|
|
case GRN_COM_PROTO_MBREQ :
|
|
if (GRN_BULK_WSIZE(buf) < value_size) {
|
|
if (grn_bulk_resize(ctx, buf, value_size)) {
|
|
goto exit;
|
|
}
|
|
}
|
|
retry = 0;
|
|
for (rest = value_size; rest;) {
|
|
if ((ret = recv(com->fd, GRN_BULK_CURR(buf), rest, MSG_WAITALL)) < 0) {
|
|
SOERR("recv body");
|
|
if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
|
|
ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
|
|
ERRCLR(ctx);
|
|
continue;
|
|
}
|
|
goto exit;
|
|
}
|
|
if (ret) {
|
|
rest -= ret;
|
|
GRN_BULK_INCR_LEN(buf, ret);
|
|
} else {
|
|
if (++retry > RETRY_MAX) {
|
|
// ERR(GRN_RETRY_MAX, "retry max in recv body");
|
|
goto exit;
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
default :
|
|
GRN_LOG(ctx, GRN_LOG_ERROR, "illegal header: %d", proto);
|
|
ctx->rc = GRN_INVALID_FORMAT;
|
|
goto exit;
|
|
}
|
|
}
|
|
exit :
|
|
return ctx->rc;
|
|
}
|
|
|
|
grn_com *
|
|
grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port)
|
|
{
|
|
grn_sock fd = -1;
|
|
grn_com *cs = NULL;
|
|
|
|
struct addrinfo hints, *addrinfo_list, *addrinfo_ptr;
|
|
char port_string[16];
|
|
int getaddrinfo_result;
|
|
|
|
memset(&hints, 0, sizeof(hints));
|
|
hints.ai_family = AF_UNSPEC;
|
|
hints.ai_socktype = SOCK_STREAM;
|
|
#ifdef AI_NUMERICSERV
|
|
hints.ai_flags = AI_NUMERICSERV;
|
|
#endif
|
|
grn_snprintf(port_string, sizeof(port_string), sizeof(port_string),
|
|
"%d", port);
|
|
|
|
getaddrinfo_result = getaddrinfo(dest, port_string, &hints, &addrinfo_list);
|
|
if (getaddrinfo_result != 0) {
|
|
switch (getaddrinfo_result) {
|
|
#ifdef EAI_MEMORY
|
|
case EAI_MEMORY:
|
|
ERR(GRN_NO_MEMORY_AVAILABLE, "getaddrinfo: <%s:%s>: %s",
|
|
dest, port_string, gai_strerror(getaddrinfo_result));
|
|
break;
|
|
#endif
|
|
#ifdef EAI_SYSTEM
|
|
case EAI_SYSTEM:
|
|
SOERR("getaddrinfo");
|
|
break;
|
|
#endif
|
|
default:
|
|
ERR(GRN_INVALID_ARGUMENT, "getaddrinfo: <%s:%s>: %s",
|
|
dest, port_string, gai_strerror(getaddrinfo_result));
|
|
break;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
for (addrinfo_ptr = addrinfo_list; addrinfo_ptr;
|
|
addrinfo_ptr = addrinfo_ptr->ai_next) {
|
|
fd = socket(addrinfo_ptr->ai_family, addrinfo_ptr->ai_socktype,
|
|
addrinfo_ptr->ai_protocol);
|
|
if (fd == -1) {
|
|
SOERR("socket");
|
|
continue;
|
|
}
|
|
#ifdef TCP_NODELAY
|
|
{
|
|
static const int value = 1;
|
|
if (setsockopt(fd, 6, TCP_NODELAY,
|
|
(const char *)&value, sizeof(value)) != 0) {
|
|
SOERR("setsockopt");
|
|
grn_sock_close(fd);
|
|
continue;
|
|
}
|
|
}
|
|
#endif
|
|
if (connect(fd, addrinfo_ptr->ai_addr, addrinfo_ptr->ai_addrlen) != 0) {
|
|
SOERR("connect");
|
|
grn_sock_close(fd);
|
|
continue;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
freeaddrinfo(addrinfo_list);
|
|
|
|
if (!addrinfo_ptr) {
|
|
return NULL;
|
|
}
|
|
ctx->errlvl = GRN_OK;
|
|
ctx->rc = GRN_SUCCESS;
|
|
|
|
if (ev) {
|
|
grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, &cs);
|
|
} else {
|
|
cs = GRN_CALLOC(sizeof(grn_com));
|
|
if (cs) {
|
|
cs->fd = fd;
|
|
}
|
|
}
|
|
if (!cs) {
|
|
grn_sock_close(fd);
|
|
}
|
|
return cs;
|
|
}
|
|
|
|
void
|
|
grn_com_close_(grn_ctx *ctx, grn_com *com)
|
|
{
|
|
grn_sock fd = com->fd;
|
|
if (shutdown(fd, SHUT_RDWR) == -1) { /* SOERR("shutdown"); */ }
|
|
if (grn_sock_close(fd) == -1) {
|
|
SOERR("close");
|
|
} else {
|
|
com->closed = 1;
|
|
}
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_close(grn_ctx *ctx, grn_com *com)
|
|
{
|
|
grn_sock fd = com->fd;
|
|
grn_com_event *ev = com->ev;
|
|
if (ev) {
|
|
grn_com *acceptor = ev->acceptor;
|
|
grn_com_event_del(ctx, ev, fd);
|
|
if (acceptor) { grn_com_event_start_accept(ctx, ev); }
|
|
}
|
|
if (!com->closed) { grn_com_close_(ctx, com); }
|
|
if (!ev) { GRN_FREE(com); }
|
|
return GRN_SUCCESS;
|
|
}
|
|
|
|
grn_rc
|
|
grn_com_sopen(grn_ctx *ctx, grn_com_event *ev,
|
|
const char *bind_address, int port, grn_msg_handler *func,
|
|
struct hostent *he)
|
|
{
|
|
grn_sock lfd = -1;
|
|
grn_com *cs = NULL;
|
|
int getaddrinfo_result;
|
|
struct addrinfo *bind_address_info = NULL;
|
|
struct addrinfo hints;
|
|
char port_string[6]; /* ceil(log10(65535)) + 1 ('\0')*/
|
|
|
|
GRN_API_ENTER;
|
|
if (!bind_address) {
|
|
bind_address = "0.0.0.0";
|
|
}
|
|
grn_snprintf(port_string, sizeof(port_string), sizeof(port_string),
|
|
"%d", port);
|
|
memset(&hints, 0, sizeof(struct addrinfo));
|
|
hints.ai_family = PF_UNSPEC;
|
|
hints.ai_socktype = SOCK_STREAM;
|
|
#ifdef AI_NUMERICSERV
|
|
hints.ai_flags = AI_NUMERICSERV;
|
|
#endif
|
|
getaddrinfo_result = getaddrinfo(bind_address, port_string,
|
|
&hints, &bind_address_info);
|
|
if (getaddrinfo_result != 0) {
|
|
switch (getaddrinfo_result) {
|
|
#ifdef EAI_MEMORY
|
|
case EAI_MEMORY:
|
|
ERR(GRN_NO_MEMORY_AVAILABLE,
|
|
"getaddrinfo: <%s:%s>: %s",
|
|
bind_address, port_string, gai_strerror(getaddrinfo_result));
|
|
break;
|
|
#endif
|
|
#ifdef EAI_SYSTEM
|
|
case EAI_SYSTEM:
|
|
SOERR("getaddrinfo");
|
|
break;
|
|
#endif
|
|
default:
|
|
ERR(GRN_INVALID_ARGUMENT,
|
|
"getaddrinfo: <%s:%s>: %s",
|
|
bind_address, port_string, gai_strerror(getaddrinfo_result));
|
|
break;
|
|
}
|
|
goto exit;
|
|
}
|
|
if ((lfd = socket(bind_address_info->ai_family, SOCK_STREAM, 0)) == -1) {
|
|
SOERR("socket");
|
|
goto exit;
|
|
}
|
|
grn_memcpy(&ev->curr_edge_id.addr, he->h_addr, he->h_length);
|
|
ev->curr_edge_id.port = htons(port);
|
|
ev->curr_edge_id.sid = 0;
|
|
{
|
|
int v = 1;
|
|
#ifdef TCP_NODELAY
|
|
if (setsockopt(lfd, SOL_TCP, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) {
|
|
SOERR("setsockopt");
|
|
goto exit;
|
|
}
|
|
#endif
|
|
if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (void *) &v, sizeof(int)) == -1) {
|
|
SOERR("setsockopt");
|
|
goto exit;
|
|
}
|
|
}
|
|
if (bind(lfd, bind_address_info->ai_addr, bind_address_info->ai_addrlen) < 0) {
|
|
SOERR("bind");
|
|
goto exit;
|
|
}
|
|
if (listen(lfd, LISTEN_BACKLOG) < 0) {
|
|
SOERR("listen");
|
|
goto exit;
|
|
}
|
|
if (ev) {
|
|
if (grn_com_event_add(ctx, ev, lfd, GRN_COM_POLLIN, &cs)) { goto exit; }
|
|
ev->acceptor = cs;
|
|
ev->msg_handler = func;
|
|
cs->has_sid = 0;
|
|
cs->closed = 0;
|
|
cs->opaque = NULL;
|
|
GRN_COM_QUEUE_INIT(&cs->new_);
|
|
} else {
|
|
if (!(cs = GRN_MALLOC(sizeof(grn_com)))) { goto exit; }
|
|
cs->fd = lfd;
|
|
}
|
|
cs->accepting = GRN_TRUE;
|
|
exit :
|
|
if (!cs && lfd != 1) { grn_sock_close(lfd); }
|
|
if (bind_address_info) { freeaddrinfo(bind_address_info); }
|
|
GRN_API_RETURN(ctx->rc);
|
|
}
|
|
|
|
|
|
grn_hash *grn_edges = NULL;
|
|
void (*grn_dispatcher)(grn_ctx *ctx, grn_edge *edge);
|
|
|
|
void
|
|
grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge))
|
|
{
|
|
grn_edges = grn_hash_create(ctx, NULL, sizeof(grn_com_addr), sizeof(grn_edge), 0);
|
|
grn_dispatcher = dispatcher;
|
|
}
|
|
|
|
void
|
|
grn_edges_fin(grn_ctx *ctx)
|
|
{
|
|
grn_hash_close(ctx, grn_edges);
|
|
}
|
|
|
|
grn_edge *
|
|
grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added)
|
|
{
|
|
if (grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) {
|
|
return NULL;
|
|
} else {
|
|
grn_edge *edge;
|
|
grn_id id = grn_hash_add(ctx, grn_edges, addr, sizeof(grn_com_addr),
|
|
(void **)&edge, added);
|
|
grn_io_unlock(grn_edges->io);
|
|
if (id) { edge->id = id; }
|
|
return edge;
|
|
}
|
|
}
|
|
|
|
void
|
|
grn_edges_delete(grn_ctx *ctx, grn_edge *edge)
|
|
{
|
|
if (!grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) {
|
|
grn_hash_delete_by_id(ctx, grn_edges, edge->id, NULL);
|
|
grn_io_unlock(grn_edges->io);
|
|
}
|
|
}
|
|
|
|
grn_edge *
|
|
grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr)
|
|
{
|
|
int added;
|
|
grn_edge *edge = grn_edges_add(ctx, addr, &added);
|
|
if (added) {
|
|
grn_ctx_init(&edge->ctx, 0);
|
|
GRN_COM_QUEUE_INIT(&edge->recv_new);
|
|
GRN_COM_QUEUE_INIT(&edge->send_old);
|
|
edge->com = NULL;
|
|
edge->stat = 0 /*EDGE_IDLE*/;
|
|
edge->flags = GRN_EDGE_COMMUNICATOR;
|
|
}
|
|
return edge;
|
|
}
|
|
|
|
void
|
|
grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg)
|
|
{
|
|
grn_com_queue_enque(ctx, &edge->recv_new, (grn_com_queue_entry *)msg);
|
|
grn_dispatcher(ctx, edge);
|
|
}
|