mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-04 04:46:15 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1202 lines
		
	
	
	
		
			31 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			1202 lines
		
	
	
	
		
			31 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/* -*- c-basic-offset: 2 -*- */
 | 
						|
/* Copyright(C) 2009-2012 Brazil
 | 
						|
 | 
						|
  This library is free software; you can redistribute it and/or
 | 
						|
  modify it under the terms of the GNU Lesser General Public
 | 
						|
  License version 2.1 as published by the Free Software Foundation.
 | 
						|
 | 
						|
  This library is distributed in the hope that it will be useful,
 | 
						|
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | 
						|
  Lesser General Public License for more details.
 | 
						|
 | 
						|
  You should have received a copy of the GNU Lesser General Public
 | 
						|
  License along with this library; if not, write to the Free Software
 | 
						|
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
 | 
						|
*/
 | 
						|
 | 
						|
#include "grn.h"
 | 
						|
 | 
						|
#include <stdio.h>
 | 
						|
#include <string.h>
 | 
						|
#include "grn_ctx_impl.h"
 | 
						|
 | 
						|
#ifdef WIN32
 | 
						|
# include <ws2tcpip.h>
 | 
						|
#else
 | 
						|
# ifdef HAVE_SYS_SOCKET_H
 | 
						|
#  include <sys/socket.h>
 | 
						|
# endif /* HAVE_SYS_SOCKET_H */
 | 
						|
# include <netinet/in.h>
 | 
						|
# include <netinet/tcp.h>
 | 
						|
# ifdef HAVE_SIGNAL_H
 | 
						|
#  include <signal.h>
 | 
						|
# endif /* HAVE_SIGNAL_H */
 | 
						|
# include <sys/uio.h>
 | 
						|
#endif /* WIN32 */
 | 
						|
 | 
						|
#include "grn_ctx.h"
 | 
						|
#include "grn_com.h"
 | 
						|
 | 
						|
#ifndef PF_INET
 | 
						|
#define PF_INET AF_INET
 | 
						|
#endif /* PF_INET */
 | 
						|
 | 
						|
#ifndef SOL_TCP
 | 
						|
#  ifdef IPPROTO_TCP
 | 
						|
#    define SOL_TCP IPPROTO_TCP
 | 
						|
#  else
 | 
						|
#    define SOL_TCP 6
 | 
						|
#  endif /* IPPROTO_TCP */
 | 
						|
#endif /* SOL_TCP */
 | 
						|
 | 
						|
#ifndef USE_MSG_MORE
 | 
						|
#  ifdef MSG_MORE
 | 
						|
#    undef MSG_MORE
 | 
						|
#  endif
 | 
						|
#  define MSG_MORE     0
 | 
						|
#endif /* USE_MSG_MORE */
 | 
						|
 | 
						|
 | 
						|
#ifndef USE_MSG_NOSIGNAL
 | 
						|
#  ifdef MSG_NOSIGNAL
 | 
						|
#    undef MSG_NOSIGNAL
 | 
						|
#  endif
 | 
						|
#  define MSG_NOSIGNAL 0
 | 
						|
#endif /* USE_MSG_NOSIGNAL */
 | 
						|
/******* grn_com_queue ********/
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e)
 | 
						|
{
 | 
						|
  CRITICAL_SECTION_ENTER(q->cs);
 | 
						|
  e->next = NULL;
 | 
						|
  *q->tail = e;
 | 
						|
  q->tail = &e->next;
 | 
						|
  CRITICAL_SECTION_LEAVE(q->cs);
 | 
						|
  /*
 | 
						|
  uint8_t i = q->last + 1;
 | 
						|
  e->next = NULL;
 | 
						|
  if (q->first == i || q->next) {
 | 
						|
    CRITICAL_SECTION_ENTER(q->cs);
 | 
						|
    if (q->first == i || q->next) {
 | 
						|
      *q->tail = e;
 | 
						|
      q->tail = &e->next;
 | 
						|
    } else {
 | 
						|
      q->bins[q->last] = e;
 | 
						|
      q->last = i;
 | 
						|
    }
 | 
						|
    CRITICAL_SECTION_LEAVE(q->cs);
 | 
						|
  } else {
 | 
						|
    q->bins[q->last] = e;
 | 
						|
    q->last = i;
 | 
						|
  }
 | 
						|
  */
 | 
						|
  return GRN_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
grn_com_queue_entry *
 | 
						|
grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q)
 | 
						|
{
 | 
						|
  grn_com_queue_entry *e = NULL;
 | 
						|
 | 
						|
  CRITICAL_SECTION_ENTER(q->cs);
 | 
						|
  if (q->next) {
 | 
						|
    e = q->next;
 | 
						|
    if (!(q->next = e->next)) { q->tail = &q->next; }
 | 
						|
  }
 | 
						|
  CRITICAL_SECTION_LEAVE(q->cs);
 | 
						|
 | 
						|
  /*
 | 
						|
  if (q->first == q->last) {
 | 
						|
    if (q->next) {
 | 
						|
      CRITICAL_SECTION_ENTER(q->cs);
 | 
						|
      e = q->next;
 | 
						|
      if (!(q->next = e->next)) { q->tail = &q->next; }
 | 
						|
      CRITICAL_SECTION_LEAVE(q->cs);
 | 
						|
    }
 | 
						|
  } else {
 | 
						|
    e = q->bins[q->first++];
 | 
						|
  }
 | 
						|
  */
 | 
						|
  return e;
 | 
						|
}
 | 
						|
 | 
						|
/******* grn_msg ********/
 | 
						|
 | 
						|
grn_obj *
 | 
						|
grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old)
 | 
						|
{
 | 
						|
  grn_msg *msg = NULL;
 | 
						|
  if (old && (msg = (grn_msg *)grn_com_queue_deque(ctx, old))) {
 | 
						|
    if (msg->ctx != ctx) {
 | 
						|
      ERR(GRN_INVALID_ARGUMENT, "ctx unmatch");
 | 
						|
      return NULL;
 | 
						|
    }
 | 
						|
    GRN_BULK_REWIND(&msg->qe.obj);
 | 
						|
  } else if ((msg = GRN_MALLOCN(grn_msg, 1))) {
 | 
						|
    GRN_OBJ_INIT(&msg->qe.obj, GRN_MSG, 0, GRN_DB_TEXT);
 | 
						|
    msg->qe.obj.header.impl_flags |= GRN_OBJ_ALLOCATED;
 | 
						|
    msg->ctx = ctx;
 | 
						|
  }
 | 
						|
  msg->qe.next = NULL;
 | 
						|
  msg->u.peer = com;
 | 
						|
  msg->old = old;
 | 
						|
  memset(&msg->header, 0, sizeof(grn_com_header));
 | 
						|
  return (grn_obj *)msg;
 | 
						|
}
 | 
						|
 | 
						|
grn_obj *
 | 
						|
grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old)
 | 
						|
{
 | 
						|
  grn_msg *req = (grn_msg *)query, *msg = NULL;
 | 
						|
  if (req && (msg = (grn_msg *)grn_msg_open(ctx, req->u.peer, old))) {
 | 
						|
    msg->edge_id = req->edge_id;
 | 
						|
    msg->header.proto = req->header.proto == GRN_COM_PROTO_MBREQ
 | 
						|
      ? GRN_COM_PROTO_MBRES : req->header.proto;
 | 
						|
  }
 | 
						|
  return (grn_obj *)msg;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_msg_close(grn_ctx *ctx, grn_obj *obj)
 | 
						|
{
 | 
						|
  grn_msg *msg = (grn_msg *)obj;
 | 
						|
  if (ctx == msg->ctx) { return grn_obj_close(ctx, obj); }
 | 
						|
  return grn_com_queue_enque(ctx, msg->old, (grn_com_queue_entry *)msg);
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_msg_set_property(grn_ctx *ctx, grn_obj *obj,
 | 
						|
                     uint16_t status, uint32_t key_size, uint8_t extra_size)
 | 
						|
{
 | 
						|
  grn_com_header *header = &((grn_msg *)obj)->header;
 | 
						|
  header->status = htons(status);
 | 
						|
  header->keylen = htons(key_size);
 | 
						|
  header->level = extra_size;
 | 
						|
  return GRN_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags)
 | 
						|
{
 | 
						|
  grn_rc rc;
 | 
						|
  grn_msg *m = (grn_msg *)msg;
 | 
						|
  grn_com *peer = m->u.peer;
 | 
						|
  grn_com_header *header = &m->header;
 | 
						|
  if (GRN_COM_QUEUE_EMPTYP(&peer->new_)) {
 | 
						|
    switch (header->proto) {
 | 
						|
    case GRN_COM_PROTO_HTTP :
 | 
						|
      {
 | 
						|
        ssize_t ret;
 | 
						|
        ret = send(peer->fd, GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), MSG_NOSIGNAL);
 | 
						|
        if (ret == -1) { SOERR("send"); }
 | 
						|
        if (ctx->rc != GRN_OPERATION_WOULD_BLOCK) {
 | 
						|
          grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
 | 
						|
          return ctx->rc;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      break;
 | 
						|
    case GRN_COM_PROTO_GQTP :
 | 
						|
      {
 | 
						|
        if (flags & GRN_CTX_MORE) { flags |= GRN_CTX_QUIET; }
 | 
						|
        if (ctx->stat == GRN_CTX_QUIT) { flags |= GRN_CTX_QUIT; }
 | 
						|
        header->qtype = (uint8_t) ctx->impl->output.type;
 | 
						|
        header->keylen = 0;
 | 
						|
        header->level = 0;
 | 
						|
        header->flags = flags;
 | 
						|
        header->status = htons((uint16_t)ctx->rc);
 | 
						|
        header->opaque = 0;
 | 
						|
        header->cas = 0;
 | 
						|
        //todo : MSG_DONTWAIT
 | 
						|
        rc = grn_com_send(ctx, peer, header,
 | 
						|
                          GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), 0);
 | 
						|
        if (rc != GRN_OPERATION_WOULD_BLOCK) {
 | 
						|
          grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
 | 
						|
          return rc;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      break;
 | 
						|
    case GRN_COM_PROTO_MBREQ :
 | 
						|
      return GRN_FUNCTION_NOT_IMPLEMENTED;
 | 
						|
    case GRN_COM_PROTO_MBRES :
 | 
						|
      rc = grn_com_send(ctx, peer, header,
 | 
						|
                        GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg),
 | 
						|
                        (flags & GRN_CTX_MORE) ? MSG_MORE :0);
 | 
						|
      if (rc != GRN_OPERATION_WOULD_BLOCK) {
 | 
						|
        grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
 | 
						|
        return rc;
 | 
						|
      }
 | 
						|
      break;
 | 
						|
    default :
 | 
						|
      return GRN_INVALID_ARGUMENT;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  MUTEX_LOCK(peer->ev->mutex);
 | 
						|
  rc = grn_com_queue_enque(ctx, &peer->new_, (grn_com_queue_entry *)msg);
 | 
						|
  COND_SIGNAL(peer->ev->cond);
 | 
						|
  MUTEX_UNLOCK(peer->ev->mutex);
 | 
						|
  return rc;
 | 
						|
}
 | 
						|
 | 
						|
/******* grn_com ********/
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_init(void)
 | 
						|
{
 | 
						|
#ifdef WIN32
 | 
						|
  WSADATA wd;
 | 
						|
  if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) {
 | 
						|
    grn_ctx *ctx = &grn_gctx;
 | 
						|
    SOERR("WSAStartup");
 | 
						|
  }
 | 
						|
#else /* WIN32 */
 | 
						|
#ifndef USE_MSG_NOSIGNAL
 | 
						|
  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
 | 
						|
    grn_ctx *ctx = &grn_gctx;
 | 
						|
    SERR("signal");
 | 
						|
  }
 | 
						|
#endif /* USE_MSG_NOSIGNAL */
 | 
						|
#endif /* WIN32 */
 | 
						|
  return grn_gctx.rc;
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
grn_com_fin(void)
 | 
						|
{
 | 
						|
#ifdef WIN32
 | 
						|
  WSACleanup();
 | 
						|
#endif /* WIN32 */
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size)
 | 
						|
{
 | 
						|
  ev->max_nevents = max_nevents;
 | 
						|
  if ((ev->hash = grn_hash_create(ctx, NULL, sizeof(grn_sock), data_size, 0))) {
 | 
						|
    MUTEX_INIT(ev->mutex);
 | 
						|
    COND_INIT(ev->cond);
 | 
						|
    GRN_COM_QUEUE_INIT(&ev->recv_old);
 | 
						|
    ev->msg_handler = NULL;
 | 
						|
    memset(&(ev->curr_edge_id), 0, sizeof(grn_com_addr));
 | 
						|
    ev->acceptor = NULL;
 | 
						|
    ev->opaque = NULL;
 | 
						|
#ifndef USE_SELECT
 | 
						|
# ifdef USE_EPOLL
 | 
						|
    if ((ev->events = GRN_MALLOC(sizeof(struct epoll_event) * max_nevents))) {
 | 
						|
      if ((ev->epfd = epoll_create(max_nevents)) != -1) {
 | 
						|
        goto exit;
 | 
						|
      } else {
 | 
						|
        SERR("epoll_create");
 | 
						|
      }
 | 
						|
      GRN_FREE(ev->events);
 | 
						|
    }
 | 
						|
# else /* USE_EPOLL */
 | 
						|
#  ifdef USE_KQUEUE
 | 
						|
    if ((ev->events = GRN_MALLOC(sizeof(struct kevent) * max_nevents))) {
 | 
						|
      if ((ev->kqfd = kqueue()) != -1) {
 | 
						|
        goto exit;
 | 
						|
      } else {
 | 
						|
        SERR("kqueue");
 | 
						|
      }
 | 
						|
      GRN_FREE(ev->events);
 | 
						|
    }
 | 
						|
#  else /* USE_KQUEUE */
 | 
						|
    if ((ev->events = GRN_MALLOC(sizeof(struct pollfd) * max_nevents))) {
 | 
						|
      goto exit;
 | 
						|
    }
 | 
						|
#  endif /* USE_KQUEUE*/
 | 
						|
# endif /* USE_EPOLL */
 | 
						|
    grn_hash_close(ctx, ev->hash);
 | 
						|
    ev->hash = NULL;
 | 
						|
    ev->events = NULL;
 | 
						|
#else /* USE_SELECT */
 | 
						|
    goto exit;
 | 
						|
#endif /* USE_SELECT */
 | 
						|
  }
 | 
						|
exit :
 | 
						|
  return ctx->rc;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev)
 | 
						|
{
 | 
						|
  grn_obj *msg;
 | 
						|
  while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &ev->recv_old))) {
 | 
						|
    grn_msg_close(ctx, msg);
 | 
						|
  }
 | 
						|
  if (ev->hash) { grn_hash_close(ctx, ev->hash); }
 | 
						|
#ifndef USE_SELECT
 | 
						|
  if (ev->events) { GRN_FREE(ev->events); }
 | 
						|
# ifdef USE_EPOLL
 | 
						|
  grn_close(ev->epfd);
 | 
						|
# endif /* USE_EPOLL */
 | 
						|
# ifdef USE_KQUEUE
 | 
						|
  grn_close(ev->kqfd);
 | 
						|
# endif /* USE_KQUEUE*/
 | 
						|
#endif /* USE_SELECT */
 | 
						|
  return GRN_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
 | 
						|
{
 | 
						|
  grn_com *c;
 | 
						|
  /* todo : expand events */
 | 
						|
  if (!ev || *ev->hash->n_entries == (uint32_t) ev->max_nevents) {
 | 
						|
    if (ev) { GRN_LOG(ctx, GRN_LOG_ERROR, "too many connections (%d)", ev->max_nevents); }
 | 
						|
    return GRN_INVALID_ARGUMENT;
 | 
						|
  }
 | 
						|
#ifdef USE_EPOLL
 | 
						|
  {
 | 
						|
    struct epoll_event e;
 | 
						|
    memset(&e, 0, sizeof(struct epoll_event));
 | 
						|
    e.data.fd = (fd);
 | 
						|
    e.events = (uint32_t) events;
 | 
						|
    if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) {
 | 
						|
      SERR("epoll_ctl");
 | 
						|
      return ctx->rc;
 | 
						|
    }
 | 
						|
  }
 | 
						|
#endif /* USE_EPOLL*/
 | 
						|
#ifdef USE_KQUEUE
 | 
						|
  {
 | 
						|
    struct kevent e;
 | 
						|
    /* todo: udata should have fd */
 | 
						|
    EV_SET(&e, (fd), events, EV_ADD, 0, 0, NULL);
 | 
						|
    if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
 | 
						|
      SERR("kevent");
 | 
						|
      return ctx->rc;
 | 
						|
    }
 | 
						|
  }
 | 
						|
#endif /* USE_KQUEUE */
 | 
						|
  {
 | 
						|
    if (grn_hash_add(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c, NULL)) {
 | 
						|
      c->ev = ev;
 | 
						|
      c->fd = fd;
 | 
						|
      c->events = events;
 | 
						|
      if (com) { *com = c; }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return ctx->rc;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
 | 
						|
{
 | 
						|
  grn_com *c;
 | 
						|
  if (!ev) { return GRN_INVALID_ARGUMENT; }
 | 
						|
  if (grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c)) {
 | 
						|
    if (c->fd != fd) {
 | 
						|
      GRN_LOG(ctx, GRN_LOG_ERROR,
 | 
						|
              "grn_com_event_mod fd unmatch "
 | 
						|
              "%" GRN_FMT_SOCKET " != %" GRN_FMT_SOCKET,
 | 
						|
              c->fd, fd);
 | 
						|
      return GRN_OBJECT_CORRUPT;
 | 
						|
    }
 | 
						|
    if (com) { *com = c; }
 | 
						|
    if (c->events != events) {
 | 
						|
#ifdef USE_EPOLL
 | 
						|
      struct epoll_event e;
 | 
						|
      memset(&e, 0, sizeof(struct epoll_event));
 | 
						|
      e.data.fd = (fd);
 | 
						|
      e.events = (uint32_t) events;
 | 
						|
      if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) {
 | 
						|
        SERR("epoll_ctl");
 | 
						|
        return ctx->rc;
 | 
						|
      }
 | 
						|
#endif /* USE_EPOLL*/
 | 
						|
#ifdef USE_KQUEUE
 | 
						|
      // experimental
 | 
						|
      struct kevent e[2];
 | 
						|
      EV_SET(&e[0], (fd), GRN_COM_POLLIN|GRN_COM_POLLOUT, EV_DELETE, 0, 0, NULL);
 | 
						|
      EV_SET(&e[1], (fd), events, EV_ADD, 0, 0, NULL);
 | 
						|
      if (kevent(ev->kqfd, e, 2, NULL, 0, NULL) == -1) {
 | 
						|
        SERR("kevent");
 | 
						|
        return ctx->rc;
 | 
						|
      }
 | 
						|
#endif /* USE_KQUEUE */
 | 
						|
      c->events = events;
 | 
						|
    }
 | 
						|
    return GRN_SUCCESS;
 | 
						|
  }
 | 
						|
  return GRN_INVALID_ARGUMENT;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd)
 | 
						|
{
 | 
						|
  if (!ev) { return GRN_INVALID_ARGUMENT; }
 | 
						|
  {
 | 
						|
    grn_com *c;
 | 
						|
    grn_id id = grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c);
 | 
						|
    if (id) {
 | 
						|
#ifdef USE_EPOLL
 | 
						|
      if (!c->closed) {
 | 
						|
        struct epoll_event e;
 | 
						|
        memset(&e, 0, sizeof(struct epoll_event));
 | 
						|
        e.data.fd = fd;
 | 
						|
        e.events = c->events;
 | 
						|
        if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) {
 | 
						|
          SERR("epoll_ctl");
 | 
						|
          return ctx->rc;
 | 
						|
        }
 | 
						|
      }
 | 
						|
#endif /* USE_EPOLL*/
 | 
						|
#ifdef USE_KQUEUE
 | 
						|
      struct kevent e;
 | 
						|
      EV_SET(&e, (fd), c->events, EV_DELETE, 0, 0, NULL);
 | 
						|
      if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
 | 
						|
        SERR("kevent");
 | 
						|
        return ctx->rc;
 | 
						|
      }
 | 
						|
#endif /* USE_KQUEUE */
 | 
						|
      return grn_hash_delete_by_id(ctx, ev->hash, id, NULL);
 | 
						|
    } else {
 | 
						|
      GRN_LOG(ctx, GRN_LOG_ERROR,
 | 
						|
              "%04x| fd(%" GRN_FMT_SOCKET ") not found in ev(%p)",
 | 
						|
              grn_getpid(), fd, ev);
 | 
						|
      return GRN_INVALID_ARGUMENT;
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
#define LISTEN_BACKLOG 0x1000
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev)
 | 
						|
{
 | 
						|
  grn_com *com = ev->acceptor;
 | 
						|
 | 
						|
  if (com->accepting) {return ctx->rc;}
 | 
						|
 | 
						|
  GRN_API_ENTER;
 | 
						|
  if (!grn_com_event_mod(ctx, ev, com->fd, GRN_COM_POLLIN, NULL)) {
 | 
						|
    if (listen(com->fd, LISTEN_BACKLOG) == 0) {
 | 
						|
      com->accepting = GRN_TRUE;
 | 
						|
    } else {
 | 
						|
      SOERR("listen - start accept");
 | 
						|
    }
 | 
						|
  }
 | 
						|
  GRN_API_RETURN(ctx->rc);
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev)
 | 
						|
{
 | 
						|
  grn_com *com = ev->acceptor;
 | 
						|
 | 
						|
  if (!com->accepting) {return ctx->rc;}
 | 
						|
 | 
						|
  GRN_API_ENTER;
 | 
						|
  if (!grn_com_event_mod(ctx, ev, com->fd, 0, NULL)) {
 | 
						|
    if (listen(com->fd, 0) == 0) {
 | 
						|
      com->accepting = GRN_FALSE;
 | 
						|
    } else {
 | 
						|
      SOERR("listen - disable accept");
 | 
						|
    }
 | 
						|
  }
 | 
						|
  GRN_API_RETURN(ctx->rc);
 | 
						|
}
 | 
						|
 | 
						|
static void
 | 
						|
grn_com_receiver(grn_ctx *ctx, grn_com *com)
 | 
						|
{
 | 
						|
  grn_com_event *ev = com->ev;
 | 
						|
  ERRCLR(ctx);
 | 
						|
  if (ev->acceptor == com) {
 | 
						|
    grn_com *ncs;
 | 
						|
    grn_sock fd = accept(com->fd, NULL, NULL);
 | 
						|
    if (fd == -1) {
 | 
						|
      if (errno == EMFILE) {
 | 
						|
        grn_com_event_stop_accept(ctx, ev);
 | 
						|
      } else {
 | 
						|
        SOERR("accept");
 | 
						|
      }
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    if (grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, (grn_com **)&ncs)) {
 | 
						|
      grn_sock_close(fd);
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    ncs->has_sid = 0;
 | 
						|
    ncs->closed = 0;
 | 
						|
    ncs->opaque = NULL;
 | 
						|
    GRN_COM_QUEUE_INIT(&ncs->new_);
 | 
						|
    // GRN_LOG(ctx, GRN_LOG_NOTICE, "accepted (%d)", fd);
 | 
						|
    return;
 | 
						|
  } else {
 | 
						|
    grn_msg *msg = (grn_msg *)grn_msg_open(ctx, com, &ev->recv_old);
 | 
						|
    grn_com_recv(ctx, msg->u.peer, &msg->header, (grn_obj *)msg);
 | 
						|
    if (msg->u.peer /* is_edge_request(msg)*/) {
 | 
						|
      grn_memcpy(&msg->edge_id, &ev->curr_edge_id, sizeof(grn_com_addr));
 | 
						|
      if (!com->has_sid) {
 | 
						|
        com->has_sid = 1;
 | 
						|
        com->sid = ev->curr_edge_id.sid++;
 | 
						|
      }
 | 
						|
      msg->edge_id.sid = com->sid;
 | 
						|
    }
 | 
						|
    msg->acceptor = ev->acceptor;
 | 
						|
    ev->msg_handler(ctx, (grn_obj *)msg);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout)
 | 
						|
{
 | 
						|
  int nevents;
 | 
						|
  grn_com *com;
 | 
						|
#ifdef USE_SELECT
 | 
						|
  uint32_t dummy;
 | 
						|
  grn_sock *pfd;
 | 
						|
  int nfds = 0;
 | 
						|
  fd_set rfds;
 | 
						|
  fd_set wfds;
 | 
						|
  struct timeval tv;
 | 
						|
  if (timeout >= 0) {
 | 
						|
    tv.tv_sec = timeout / 1000;
 | 
						|
    tv.tv_usec = (timeout % 1000) * 1000;
 | 
						|
  }
 | 
						|
  FD_ZERO(&rfds);
 | 
						|
  FD_ZERO(&wfds);
 | 
						|
  ctx->errlvl = GRN_OK;
 | 
						|
  ctx->rc = GRN_SUCCESS;
 | 
						|
  {
 | 
						|
    grn_hash_cursor *cursor;
 | 
						|
    cursor = grn_hash_cursor_open(ctx, ev->hash, NULL, 0, NULL, 0, 0, -1, 0);
 | 
						|
    if (cursor) {
 | 
						|
      grn_id id;
 | 
						|
      while ((id = grn_hash_cursor_next(ctx, cursor))) {
 | 
						|
        grn_hash_cursor_get_key_value(ctx,
 | 
						|
                                      cursor,
 | 
						|
                                      (void **)(&pfd),
 | 
						|
                                      &dummy,
 | 
						|
                                      (void **)(&com));
 | 
						|
        if (com->events & GRN_COM_POLLIN) { FD_SET(*pfd, &rfds); }
 | 
						|
        if (com->events & GRN_COM_POLLOUT) { FD_SET(*pfd, &wfds); }
 | 
						|
# ifndef WIN32
 | 
						|
        if (*pfd > nfds) { nfds = *pfd; }
 | 
						|
# endif /* WIN32 */
 | 
						|
      }
 | 
						|
      grn_hash_cursor_close(ctx, cursor);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL);
 | 
						|
  if (nevents < 0) {
 | 
						|
    SOERR("select");
 | 
						|
    if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { ERRCLR(ctx); }
 | 
						|
    return ctx->rc;
 | 
						|
  }
 | 
						|
  if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "select returns 0 events"); }
 | 
						|
  GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
 | 
						|
    if (FD_ISSET(*pfd, &rfds)) { grn_com_receiver(ctx, com); }
 | 
						|
  });
 | 
						|
#else /* USE_SELECT */
 | 
						|
# ifdef USE_EPOLL
 | 
						|
  struct epoll_event *ep;
 | 
						|
  ctx->errlvl = GRN_OK;
 | 
						|
  ctx->rc = GRN_SUCCESS;
 | 
						|
  nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout);
 | 
						|
  if (nevents < 0) {
 | 
						|
    SERR("epoll_wait");
 | 
						|
  }
 | 
						|
# else /* USE_EPOLL */
 | 
						|
#  ifdef USE_KQUEUE
 | 
						|
  struct kevent *ep;
 | 
						|
  struct timespec tv;
 | 
						|
  if (timeout >= 0) {
 | 
						|
    tv.tv_sec = timeout / 1000;
 | 
						|
    tv.tv_nsec = (timeout % 1000) * 1000;
 | 
						|
  }
 | 
						|
  nevents = kevent(ev->kqfd, NULL, 0, ev->events, ev->max_nevents, &tv);
 | 
						|
  if (nevents < 0) {
 | 
						|
    SERR("kevent");
 | 
						|
  }
 | 
						|
#  else /* USE_KQUEUE */
 | 
						|
  uint32_t dummy;
 | 
						|
  int nfd = 0, *pfd;
 | 
						|
  struct pollfd *ep = ev->events;
 | 
						|
  ctx->errlvl = GRN_OK;
 | 
						|
  ctx->rc = GRN_SUCCESS;
 | 
						|
  GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
 | 
						|
    ep->fd = *pfd;
 | 
						|
    //    ep->events =(short) com->events;
 | 
						|
    ep->events = POLLIN;
 | 
						|
    ep->revents = 0;
 | 
						|
    ep++;
 | 
						|
    nfd++;
 | 
						|
  });
 | 
						|
  nevents = poll(ev->events, nfd, timeout);
 | 
						|
  if (nevents < 0) {
 | 
						|
    SERR("poll");
 | 
						|
  }
 | 
						|
#  endif /* USE_KQUEUE */
 | 
						|
# endif /* USE_EPOLL */
 | 
						|
  if (ctx->rc != GRN_SUCCESS) {
 | 
						|
    if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
 | 
						|
      ERRCLR(ctx);
 | 
						|
    }
 | 
						|
    return ctx->rc;
 | 
						|
  }
 | 
						|
  if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "poll returns 0 events"); }
 | 
						|
  for (ep = ev->events; nevents; ep++) {
 | 
						|
    int efd;
 | 
						|
# ifdef USE_EPOLL
 | 
						|
    efd = ep->data.fd;
 | 
						|
    nevents--;
 | 
						|
    // todo : com = ep->data.ptr;
 | 
						|
    if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
 | 
						|
      struct epoll_event e;
 | 
						|
      GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
 | 
						|
      memset(&e, 0, sizeof(struct epoll_event));
 | 
						|
      e.data.fd = efd;
 | 
						|
      e.events = ep->events;
 | 
						|
      if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) { SERR("epoll_ctl"); }
 | 
						|
      if (grn_sock_close(efd) == -1) { SOERR("close"); }
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    if (ep->events & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
 | 
						|
# else /* USE_EPOLL */
 | 
						|
#  ifdef USE_KQUEUE
 | 
						|
    efd = ep->ident;
 | 
						|
    nevents--;
 | 
						|
    // todo : com = ep->udata;
 | 
						|
    if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
 | 
						|
      struct kevent e;
 | 
						|
      GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->set", efd);
 | 
						|
      EV_SET(&e, efd, ep->filter, EV_DELETE, 0, 0, NULL);
 | 
						|
      if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { SERR("kevent"); }
 | 
						|
      if (grn_sock_close(efd) == -1) { SOERR("close"); }
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    if (ep->filter == GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
 | 
						|
#  else
 | 
						|
    efd = ep->fd;
 | 
						|
    if (!(ep->events & ep->revents)) { continue; }
 | 
						|
    nevents--;
 | 
						|
    if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
 | 
						|
      GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
 | 
						|
      if (grn_sock_close(efd) == -1) { SOERR("close"); }
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    if (ep->revents & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
 | 
						|
#  endif /* USE_KQUEUE */
 | 
						|
# endif /* USE_EPOLL */
 | 
						|
  }
 | 
						|
#endif /* USE_SELECT */
 | 
						|
  /* todo :
 | 
						|
  while (!(msg = (grn_com_msg *)grn_com_queue_deque(&recv_old))) {
 | 
						|
    grn_msg_close(ctx, msg);
 | 
						|
  }
 | 
						|
  */
 | 
						|
  return GRN_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags)
 | 
						|
{
 | 
						|
  ssize_t ret;
 | 
						|
  grn_obj buf;
 | 
						|
  GRN_TEXT_INIT(&buf, 0);
 | 
						|
  GRN_TEXT_PUTS(ctx, &buf, "GET ");
 | 
						|
  grn_bulk_write(ctx, &buf, path, path_len);
 | 
						|
  GRN_TEXT_PUTS(ctx, &buf, " HTTP/1.0\r\n\r\n");
 | 
						|
  // todo : refine
 | 
						|
  if ((ret = send(cs->fd, GRN_BULK_HEAD(&buf), GRN_BULK_VSIZE(&buf), MSG_NOSIGNAL|flags)) == -1) {
 | 
						|
    SOERR("send");
 | 
						|
  }
 | 
						|
  if (ret != GRN_BULK_VSIZE(&buf)) {
 | 
						|
    GRN_LOG(ctx, GRN_LOG_NOTICE, "send %d != %d", (int)ret, (int)GRN_BULK_VSIZE(&buf));
 | 
						|
  }
 | 
						|
  grn_obj_close(ctx, &buf);
 | 
						|
  return ctx->rc;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_send(grn_ctx *ctx, grn_com *cs,
 | 
						|
             grn_com_header *header, const char *body, uint32_t size, int flags)
 | 
						|
{
 | 
						|
  grn_rc rc = GRN_SUCCESS;
 | 
						|
  size_t whole_size = sizeof(grn_com_header) + size;
 | 
						|
  ssize_t ret;
 | 
						|
  header->size = htonl(size);
 | 
						|
  GRN_LOG(ctx, GRN_LOG_INFO, "send (%d,%x,%d,%02x,%02x,%04x)", size, header->flags, header->proto, header->qtype, header->level, header->status);
 | 
						|
 | 
						|
  if (size) {
 | 
						|
#ifdef WIN32
 | 
						|
    WSABUF wsabufs[2];
 | 
						|
    DWORD n_sent;
 | 
						|
    wsabufs[0].buf = (char *)header;
 | 
						|
    wsabufs[0].len = sizeof(grn_com_header);
 | 
						|
    wsabufs[1].buf = (char *)body;
 | 
						|
    wsabufs[1].len = size;
 | 
						|
    if (WSASend(cs->fd, wsabufs, 2, &n_sent, 0, NULL, NULL) == SOCKET_ERROR) {
 | 
						|
      SOERR("WSASend");
 | 
						|
    }
 | 
						|
    ret = n_sent;
 | 
						|
#else /* WIN32 */
 | 
						|
    struct iovec msg_iov[2];
 | 
						|
    struct msghdr msg;
 | 
						|
    memset(&msg, 0, sizeof(struct msghdr));
 | 
						|
    msg.msg_name = NULL;
 | 
						|
    msg.msg_namelen = 0;
 | 
						|
    msg.msg_iov = msg_iov;
 | 
						|
    msg.msg_iovlen = 2;
 | 
						|
    msg_iov[0].iov_base = (char*) header;
 | 
						|
    msg_iov[0].iov_len = sizeof(grn_com_header);
 | 
						|
    msg_iov[1].iov_base = (char *)body;
 | 
						|
    msg_iov[1].iov_len = size;
 | 
						|
    if ((ret = sendmsg(cs->fd, &msg, MSG_NOSIGNAL|flags)) == -1) {
 | 
						|
      SOERR("sendmsg");
 | 
						|
      rc = ctx->rc;
 | 
						|
    }
 | 
						|
#endif /* WIN32 */
 | 
						|
  } else {
 | 
						|
    if ((ret = send(cs->fd, (const void *)header, whole_size, MSG_NOSIGNAL|flags)) == -1) {
 | 
						|
      SOERR("send");
 | 
						|
      rc = ctx->rc;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  if ((size_t) ret != whole_size) {
 | 
						|
    GRN_LOG(ctx, GRN_LOG_ERROR,
 | 
						|
            "sendmsg(%" GRN_FMT_SOCKET "): %" GRN_FMT_LLD " < %" GRN_FMT_LLU,
 | 
						|
            cs->fd, (long long int)ret, (unsigned long long int)whole_size);
 | 
						|
    rc = ctx->rc;
 | 
						|
  }
 | 
						|
  return rc;
 | 
						|
}
 | 
						|
 | 
						|
#define RETRY_MAX 10
 | 
						|
 | 
						|
static const char *
 | 
						|
scan_delimiter(const char *p, const char *e)
 | 
						|
{
 | 
						|
  while (p + 4 <= e) {
 | 
						|
    if (p[3] == '\n') {
 | 
						|
      if (p[2] == '\r') {
 | 
						|
        if (p[1] == '\n') {
 | 
						|
          if (p[0] == '\r') { return p + 4; } else { p += 2; }
 | 
						|
        } else { p += 2; }
 | 
						|
      } else { p += 4; }
 | 
						|
    } else { p += p[3] == '\r' ? 1 : 4; }
 | 
						|
  }
 | 
						|
  return NULL;
 | 
						|
}
 | 
						|
 | 
						|
#define BUFSIZE 4096
 | 
						|
 | 
						|
static grn_rc
 | 
						|
grn_com_recv_text(grn_ctx *ctx, grn_com *com,
 | 
						|
                  grn_com_header *header, grn_obj *buf, ssize_t ret)
 | 
						|
{
 | 
						|
  const char *p;
 | 
						|
  int retry = 0;
 | 
						|
  grn_bulk_write(ctx, buf, (char *)header, ret);
 | 
						|
  if ((p = scan_delimiter(GRN_BULK_HEAD(buf), GRN_BULK_CURR(buf)))) {
 | 
						|
    header->qtype = *GRN_BULK_HEAD(buf);
 | 
						|
    header->proto = GRN_COM_PROTO_HTTP;
 | 
						|
    header->size = GRN_BULK_VSIZE(buf);
 | 
						|
    goto exit;
 | 
						|
  }
 | 
						|
  for (;;) {
 | 
						|
    if (grn_bulk_reserve(ctx, buf, BUFSIZE)) { return ctx->rc; }
 | 
						|
    if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
 | 
						|
      SOERR("recv text");
 | 
						|
      if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
 | 
						|
          ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
 | 
						|
        ERRCLR(ctx);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
      goto exit;
 | 
						|
    }
 | 
						|
    if (ret) {
 | 
						|
      off_t o = GRN_BULK_VSIZE(buf);
 | 
						|
      p = GRN_BULK_CURR(buf);
 | 
						|
      GRN_BULK_INCR_LEN(buf, ret);
 | 
						|
      if (scan_delimiter(p - (o > 3 ? 3 : o), p + ret)) {
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    } else {
 | 
						|
      if (++retry > RETRY_MAX) {
 | 
						|
        // ERR(GRN_RETRY_MAX, "retry max in recv text");
 | 
						|
        goto exit;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  header->qtype = *GRN_BULK_HEAD(buf);
 | 
						|
  header->proto = GRN_COM_PROTO_HTTP;
 | 
						|
  header->size = GRN_BULK_VSIZE(buf);
 | 
						|
exit :
 | 
						|
  if (header->qtype == 'H') {
 | 
						|
    //todo : refine
 | 
						|
    /*
 | 
						|
    GRN_BULK_REWIND(buf);
 | 
						|
    grn_bulk_reserve(ctx, buf, BUFSIZE);
 | 
						|
    if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
 | 
						|
      SOERR("recv text body");
 | 
						|
    } else {
 | 
						|
      GRN_BULK_CURR(buf) += ret;
 | 
						|
    }
 | 
						|
    */
 | 
						|
  }
 | 
						|
  return ctx->rc;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_recv(grn_ctx *ctx, grn_com *com, grn_com_header *header, grn_obj *buf)
 | 
						|
{
 | 
						|
  ssize_t ret;
 | 
						|
  int retry = 0;
 | 
						|
  byte *p = (byte *)header;
 | 
						|
  size_t rest = sizeof(grn_com_header);
 | 
						|
  do {
 | 
						|
    if ((ret = recv(com->fd, p, rest, 0)) < 0) {
 | 
						|
      SOERR("recv size");
 | 
						|
      GRN_LOG(ctx, GRN_LOG_ERROR, "recv error (%" GRN_FMT_SOCKET ")", com->fd);
 | 
						|
      if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
 | 
						|
          ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
 | 
						|
        ERRCLR(ctx);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
      goto exit;
 | 
						|
    }
 | 
						|
    if (ret) {
 | 
						|
      if (header->proto < 0x80) {
 | 
						|
        return grn_com_recv_text(ctx, com, header, buf, ret);
 | 
						|
      }
 | 
						|
      rest -= ret, p += ret;
 | 
						|
    } else {
 | 
						|
      if (++retry > RETRY_MAX) {
 | 
						|
        // ERR(GRN_RETRY_MAX, "retry max in recv header (%d)", com->fd);
 | 
						|
        goto exit;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  } while (rest);
 | 
						|
  GRN_LOG(ctx, GRN_LOG_INFO,
 | 
						|
          "recv (%u,%x,%d,%02x,%02x,%04x)",
 | 
						|
          (uint32_t)ntohl(header->size),
 | 
						|
          header->flags,
 | 
						|
          header->proto,
 | 
						|
          header->qtype,
 | 
						|
          header->level,
 | 
						|
          header->status);
 | 
						|
  {
 | 
						|
    uint8_t proto = header->proto;
 | 
						|
    size_t value_size = ntohl(header->size);
 | 
						|
    GRN_BULK_REWIND(buf);
 | 
						|
    switch (proto) {
 | 
						|
    case GRN_COM_PROTO_GQTP :
 | 
						|
    case GRN_COM_PROTO_MBREQ :
 | 
						|
      if (GRN_BULK_WSIZE(buf) < value_size) {
 | 
						|
        if (grn_bulk_resize(ctx, buf, value_size)) {
 | 
						|
          goto exit;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      retry = 0;
 | 
						|
      for (rest = value_size; rest;) {
 | 
						|
        if ((ret = recv(com->fd, GRN_BULK_CURR(buf), rest, MSG_WAITALL)) < 0) {
 | 
						|
          SOERR("recv body");
 | 
						|
          if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
 | 
						|
              ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
 | 
						|
            ERRCLR(ctx);
 | 
						|
            continue;
 | 
						|
          }
 | 
						|
          goto exit;
 | 
						|
        }
 | 
						|
        if (ret) {
 | 
						|
          rest -= ret;
 | 
						|
          GRN_BULK_INCR_LEN(buf, ret);
 | 
						|
        } else {
 | 
						|
          if (++retry > RETRY_MAX) {
 | 
						|
            // ERR(GRN_RETRY_MAX, "retry max in recv body");
 | 
						|
            goto exit;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
      break;
 | 
						|
    default :
 | 
						|
      GRN_LOG(ctx, GRN_LOG_ERROR, "illegal header: %d", proto);
 | 
						|
      ctx->rc = GRN_INVALID_FORMAT;
 | 
						|
      goto exit;
 | 
						|
    }
 | 
						|
  }
 | 
						|
exit :
 | 
						|
  return ctx->rc;
 | 
						|
}
 | 
						|
 | 
						|
grn_com *
 | 
						|
grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port)
 | 
						|
{
 | 
						|
  grn_sock fd = -1;
 | 
						|
  grn_com *cs = NULL;
 | 
						|
 | 
						|
  struct addrinfo hints, *addrinfo_list, *addrinfo_ptr;
 | 
						|
  char port_string[16];
 | 
						|
  int getaddrinfo_result;
 | 
						|
 | 
						|
  memset(&hints, 0, sizeof(hints));
 | 
						|
  hints.ai_family = AF_UNSPEC;
 | 
						|
  hints.ai_socktype = SOCK_STREAM;
 | 
						|
#ifdef AI_NUMERICSERV
 | 
						|
  hints.ai_flags = AI_NUMERICSERV;
 | 
						|
#endif
 | 
						|
  grn_snprintf(port_string, sizeof(port_string), sizeof(port_string),
 | 
						|
               "%d", port);
 | 
						|
 | 
						|
  getaddrinfo_result = getaddrinfo(dest, port_string, &hints, &addrinfo_list);
 | 
						|
  if (getaddrinfo_result != 0) {
 | 
						|
    switch (getaddrinfo_result) {
 | 
						|
#ifdef EAI_MEMORY
 | 
						|
    case EAI_MEMORY:
 | 
						|
      ERR(GRN_NO_MEMORY_AVAILABLE, "getaddrinfo: <%s:%s>: %s",
 | 
						|
          dest, port_string, gai_strerror(getaddrinfo_result));
 | 
						|
      break;
 | 
						|
#endif
 | 
						|
#ifdef EAI_SYSTEM
 | 
						|
    case EAI_SYSTEM:
 | 
						|
      SOERR("getaddrinfo");
 | 
						|
      break;
 | 
						|
#endif
 | 
						|
    default:
 | 
						|
      ERR(GRN_INVALID_ARGUMENT, "getaddrinfo: <%s:%s>: %s",
 | 
						|
          dest, port_string, gai_strerror(getaddrinfo_result));
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    return NULL;
 | 
						|
  }
 | 
						|
 | 
						|
  for (addrinfo_ptr = addrinfo_list; addrinfo_ptr;
 | 
						|
       addrinfo_ptr = addrinfo_ptr->ai_next) {
 | 
						|
    fd = socket(addrinfo_ptr->ai_family, addrinfo_ptr->ai_socktype,
 | 
						|
                addrinfo_ptr->ai_protocol);
 | 
						|
    if (fd == -1) {
 | 
						|
      SOERR("socket");
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
#ifdef TCP_NODELAY
 | 
						|
    {
 | 
						|
      static const int value = 1;
 | 
						|
      if (setsockopt(fd, 6, TCP_NODELAY,
 | 
						|
                     (const char *)&value, sizeof(value)) != 0) {
 | 
						|
        SOERR("setsockopt");
 | 
						|
        grn_sock_close(fd);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
    }
 | 
						|
#endif
 | 
						|
    if (connect(fd, addrinfo_ptr->ai_addr, addrinfo_ptr->ai_addrlen) != 0) {
 | 
						|
      SOERR("connect");
 | 
						|
      grn_sock_close(fd);
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
 | 
						|
    break;
 | 
						|
  }
 | 
						|
 | 
						|
  freeaddrinfo(addrinfo_list);
 | 
						|
 | 
						|
  if (!addrinfo_ptr) {
 | 
						|
    return NULL;
 | 
						|
  }
 | 
						|
  ctx->errlvl = GRN_OK;
 | 
						|
  ctx->rc = GRN_SUCCESS;
 | 
						|
 | 
						|
  if (ev) {
 | 
						|
    grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, &cs);
 | 
						|
  } else {
 | 
						|
    cs = GRN_CALLOC(sizeof(grn_com));
 | 
						|
    if (cs) {
 | 
						|
      cs->fd = fd;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  if (!cs) {
 | 
						|
    grn_sock_close(fd);
 | 
						|
  }
 | 
						|
  return cs;
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
grn_com_close_(grn_ctx *ctx, grn_com *com)
 | 
						|
{
 | 
						|
  grn_sock fd = com->fd;
 | 
						|
  if (shutdown(fd, SHUT_RDWR) == -1) { /* SOERR("shutdown"); */ }
 | 
						|
  if (grn_sock_close(fd) == -1) {
 | 
						|
    SOERR("close");
 | 
						|
  } else {
 | 
						|
    com->closed = 1;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_close(grn_ctx *ctx, grn_com *com)
 | 
						|
{
 | 
						|
  grn_sock fd = com->fd;
 | 
						|
  grn_com_event *ev = com->ev;
 | 
						|
  if (ev) {
 | 
						|
    grn_com *acceptor = ev->acceptor;
 | 
						|
    grn_com_event_del(ctx, ev, fd);
 | 
						|
    if (acceptor) { grn_com_event_start_accept(ctx, ev); }
 | 
						|
  }
 | 
						|
  if (!com->closed) { grn_com_close_(ctx, com); }
 | 
						|
  if (!ev) { GRN_FREE(com); }
 | 
						|
  return GRN_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_com_sopen(grn_ctx *ctx, grn_com_event *ev,
 | 
						|
              const char *bind_address, int port, grn_msg_handler *func,
 | 
						|
              struct hostent *he)
 | 
						|
{
 | 
						|
  grn_sock lfd = -1;
 | 
						|
  grn_com *cs = NULL;
 | 
						|
  int getaddrinfo_result;
 | 
						|
  struct addrinfo *bind_address_info = NULL;
 | 
						|
  struct addrinfo hints;
 | 
						|
  char port_string[6]; /* ceil(log10(65535)) + 1 ('\0')*/
 | 
						|
 | 
						|
  GRN_API_ENTER;
 | 
						|
  if (!bind_address) {
 | 
						|
    bind_address = "0.0.0.0";
 | 
						|
  }
 | 
						|
  grn_snprintf(port_string, sizeof(port_string), sizeof(port_string),
 | 
						|
               "%d", port);
 | 
						|
  memset(&hints, 0, sizeof(struct addrinfo));
 | 
						|
  hints.ai_family = PF_UNSPEC;
 | 
						|
  hints.ai_socktype = SOCK_STREAM;
 | 
						|
#ifdef AI_NUMERICSERV
 | 
						|
  hints.ai_flags = AI_NUMERICSERV;
 | 
						|
#endif
 | 
						|
  getaddrinfo_result = getaddrinfo(bind_address, port_string,
 | 
						|
                                   &hints, &bind_address_info);
 | 
						|
  if (getaddrinfo_result != 0) {
 | 
						|
    switch (getaddrinfo_result) {
 | 
						|
#ifdef EAI_MEMORY
 | 
						|
    case EAI_MEMORY:
 | 
						|
      ERR(GRN_NO_MEMORY_AVAILABLE,
 | 
						|
          "getaddrinfo: <%s:%s>: %s",
 | 
						|
          bind_address, port_string, gai_strerror(getaddrinfo_result));
 | 
						|
      break;
 | 
						|
#endif
 | 
						|
#ifdef EAI_SYSTEM
 | 
						|
    case EAI_SYSTEM:
 | 
						|
      SOERR("getaddrinfo");
 | 
						|
      break;
 | 
						|
#endif
 | 
						|
    default:
 | 
						|
      ERR(GRN_INVALID_ARGUMENT,
 | 
						|
          "getaddrinfo: <%s:%s>: %s",
 | 
						|
          bind_address, port_string, gai_strerror(getaddrinfo_result));
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    goto exit;
 | 
						|
  }
 | 
						|
  if ((lfd = socket(bind_address_info->ai_family, SOCK_STREAM, 0)) == -1) {
 | 
						|
    SOERR("socket");
 | 
						|
    goto exit;
 | 
						|
  }
 | 
						|
  grn_memcpy(&ev->curr_edge_id.addr, he->h_addr, he->h_length);
 | 
						|
  ev->curr_edge_id.port = htons(port);
 | 
						|
  ev->curr_edge_id.sid = 0;
 | 
						|
  {
 | 
						|
    int v = 1;
 | 
						|
#ifdef TCP_NODELAY
 | 
						|
    if (setsockopt(lfd, SOL_TCP, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) {
 | 
						|
      SOERR("setsockopt");
 | 
						|
      goto exit;
 | 
						|
    }
 | 
						|
#endif
 | 
						|
    if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (void *) &v, sizeof(int)) == -1) {
 | 
						|
      SOERR("setsockopt");
 | 
						|
      goto exit;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  if (bind(lfd, bind_address_info->ai_addr, bind_address_info->ai_addrlen) < 0) {
 | 
						|
    SOERR("bind");
 | 
						|
    goto exit;
 | 
						|
  }
 | 
						|
  if (listen(lfd, LISTEN_BACKLOG) < 0) {
 | 
						|
    SOERR("listen");
 | 
						|
    goto exit;
 | 
						|
  }
 | 
						|
  if (ev) {
 | 
						|
    if (grn_com_event_add(ctx, ev, lfd, GRN_COM_POLLIN, &cs)) { goto exit; }
 | 
						|
    ev->acceptor = cs;
 | 
						|
    ev->msg_handler = func;
 | 
						|
    cs->has_sid = 0;
 | 
						|
    cs->closed = 0;
 | 
						|
    cs->opaque = NULL;
 | 
						|
    GRN_COM_QUEUE_INIT(&cs->new_);
 | 
						|
  } else {
 | 
						|
    if (!(cs = GRN_MALLOC(sizeof(grn_com)))) { goto exit; }
 | 
						|
    cs->fd = lfd;
 | 
						|
  }
 | 
						|
  cs->accepting = GRN_TRUE;
 | 
						|
exit :
 | 
						|
  if (!cs && lfd != 1) { grn_sock_close(lfd); }
 | 
						|
  if (bind_address_info) { freeaddrinfo(bind_address_info); }
 | 
						|
  GRN_API_RETURN(ctx->rc);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
grn_hash *grn_edges = NULL;
 | 
						|
void (*grn_dispatcher)(grn_ctx *ctx, grn_edge *edge);
 | 
						|
 | 
						|
void
 | 
						|
grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge))
 | 
						|
{
 | 
						|
  grn_edges = grn_hash_create(ctx, NULL, sizeof(grn_com_addr), sizeof(grn_edge), 0);
 | 
						|
  grn_dispatcher = dispatcher;
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
grn_edges_fin(grn_ctx *ctx)
 | 
						|
{
 | 
						|
  grn_hash_close(ctx, grn_edges);
 | 
						|
}
 | 
						|
 | 
						|
grn_edge *
 | 
						|
grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added)
 | 
						|
{
 | 
						|
  if (grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) {
 | 
						|
    return NULL;
 | 
						|
  } else {
 | 
						|
    grn_edge *edge;
 | 
						|
    grn_id id = grn_hash_add(ctx, grn_edges, addr, sizeof(grn_com_addr),
 | 
						|
                             (void **)&edge, added);
 | 
						|
    grn_io_unlock(grn_edges->io);
 | 
						|
    if (id) { edge->id = id; }
 | 
						|
    return edge;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
grn_edges_delete(grn_ctx *ctx, grn_edge *edge)
 | 
						|
{
 | 
						|
  if (!grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) {
 | 
						|
    grn_hash_delete_by_id(ctx, grn_edges, edge->id, NULL);
 | 
						|
    grn_io_unlock(grn_edges->io);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
grn_edge *
 | 
						|
grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr)
 | 
						|
{
 | 
						|
  int added;
 | 
						|
  grn_edge *edge = grn_edges_add(ctx, addr, &added);
 | 
						|
  if (added) {
 | 
						|
    grn_ctx_init(&edge->ctx, 0);
 | 
						|
    GRN_COM_QUEUE_INIT(&edge->recv_new);
 | 
						|
    GRN_COM_QUEUE_INIT(&edge->send_old);
 | 
						|
    edge->com = NULL;
 | 
						|
    edge->stat = 0 /*EDGE_IDLE*/;
 | 
						|
    edge->flags = GRN_EDGE_COMMUNICATOR;
 | 
						|
  }
 | 
						|
  return edge;
 | 
						|
}
 | 
						|
 | 
						|
void
 | 
						|
grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg)
 | 
						|
{
 | 
						|
  grn_com_queue_enque(ctx, &edge->recv_new, (grn_com_queue_entry *)msg);
 | 
						|
  grn_dispatcher(ctx, edge);
 | 
						|
}
 |