mariadb/storage/spider/spd_conn.cc
Jacob Mathew afe5a51c2d MDEV-12900: spider tests failed in buildbot with valgrind
The failures with valgrind occur as a result of Spider sometimes using the
wrong transaction for operations in background threads that send requests to
the data nodes.  The use of the wrong transaction caused the networking to the
data nodes to use the wrong thread in some cases.  Valgrind eventually
detects this when such a thread is destroyed before it is used to disconnect
from the data node by that wrong transaction when it is freed.

I have fixed the problem by correcting the transaction used in each of these
cases.

Author:
  Jacob Mathew.

Reviewer:
  Kentoku Shiba.

Merged:
  Commit 4d576d9 on branch bb-10.3-MDEV-12900
2018-05-21 18:16:03 -07:00

4152 lines
132 KiB
C++

/* Copyright (C) 2008-2015 Kentoku Shiba
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
#define MYSQL_SERVER 1
#include "mysql_version.h"
#if MYSQL_VERSION_ID < 50500
#include "mysql_priv.h"
#include <mysql/plugin.h>
#else
#include "sql_priv.h"
#include "probes_mysql.h"
#include "sql_class.h"
#include "sql_partition.h"
#include "tztime.h"
#endif
#include "spd_param.h"
#include "spd_db_include.h"
#include "spd_include.h"
#include "ha_spider.h"
#include "spd_db_conn.h"
#include "spd_trx.h"
#include "spd_conn.h"
#include "spd_table.h"
#include "spd_direct_sql.h"
#include "spd_ping_table.h"
#include "spd_malloc.h"
extern handlerton *spider_hton_ptr;
extern SPIDER_DBTON spider_dbton[SPIDER_DBTON_SIZE];
pthread_mutex_t spider_conn_id_mutex;
ulonglong spider_conn_id = 1;
#ifndef WITHOUT_SPIDER_BG_SEARCH
extern pthread_attr_t spider_pt_attr;
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key spd_key_mutex_mta_conn;
#ifndef WITHOUT_SPIDER_BG_SEARCH
extern PSI_mutex_key spd_key_mutex_bg_conn_chain;
extern PSI_mutex_key spd_key_mutex_bg_conn_sync;
extern PSI_mutex_key spd_key_mutex_bg_conn;
extern PSI_mutex_key spd_key_mutex_bg_job_stack;
extern PSI_mutex_key spd_key_mutex_bg_mon;
extern PSI_cond_key spd_key_cond_bg_conn_sync;
extern PSI_cond_key spd_key_cond_bg_conn;
extern PSI_cond_key spd_key_cond_bg_sts;
extern PSI_cond_key spd_key_cond_bg_sts_sync;
extern PSI_cond_key spd_key_cond_bg_crd;
extern PSI_cond_key spd_key_cond_bg_crd_sync;
extern PSI_cond_key spd_key_cond_bg_mon;
extern PSI_cond_key spd_key_cond_bg_mon_sleep;
extern PSI_thread_key spd_key_thd_bg;
extern PSI_thread_key spd_key_thd_bg_sts;
extern PSI_thread_key spd_key_thd_bg_crd;
extern PSI_thread_key spd_key_thd_bg_mon;
#endif
#endif
#endif
HASH spider_open_connections;
uint spider_open_connections_id;
const char *spider_open_connections_func_name;
const char *spider_open_connections_file_name;
ulong spider_open_connections_line_no;
pthread_mutex_t spider_conn_mutex;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
HASH spider_hs_r_conn_hash;
uint spider_hs_r_conn_hash_id;
const char *spider_hs_r_conn_hash_func_name;
const char *spider_hs_r_conn_hash_file_name;
ulong spider_hs_r_conn_hash_line_no;
pthread_mutex_t spider_hs_r_conn_mutex;
HASH spider_hs_w_conn_hash;
uint spider_hs_w_conn_hash_id;
const char *spider_hs_w_conn_hash_func_name;
const char *spider_hs_w_conn_hash_file_name;
ulong spider_hs_w_conn_hash_line_no;
pthread_mutex_t spider_hs_w_conn_mutex;
#endif
/* for spider_open_connections and trx_conn_hash */
uchar *spider_conn_get_key(
SPIDER_CONN *conn,
size_t *length,
my_bool not_used __attribute__ ((unused))
) {
DBUG_ENTER("spider_conn_get_key");
*length = conn->conn_key_length;
DBUG_PRINT("info",("spider conn_kind=%u", conn->conn_kind));
#ifndef DBUG_OFF
spider_print_keys(conn->conn_key, conn->conn_key_length);
#endif
DBUG_RETURN((uchar*) conn->conn_key);
}
int spider_reset_conn_setted_parameter(
SPIDER_CONN *conn,
THD *thd
) {
DBUG_ENTER("spider_reset_conn_setted_parameter");
conn->autocommit = spider_param_remote_autocommit();
conn->sql_log_off = spider_param_remote_sql_log_off();
if (thd && spider_param_remote_time_zone())
{
int tz_length = strlen(spider_param_remote_time_zone());
String tz_str(spider_param_remote_time_zone(), tz_length,
&my_charset_latin1);
conn->time_zone = my_tz_find(thd, &tz_str);
} else
conn->time_zone = NULL;
conn->trx_isolation = spider_param_remote_trx_isolation();
DBUG_PRINT("info",("spider conn->trx_isolation=%d", conn->trx_isolation));
if (spider_param_remote_access_charset())
{
if (!(conn->access_charset =
get_charset_by_csname(spider_param_remote_access_charset(),
MY_CS_PRIMARY, MYF(MY_WME))))
DBUG_RETURN(ER_UNKNOWN_CHARACTER_SET);
} else
conn->access_charset = NULL;
char *default_database = spider_param_remote_default_database();
if (default_database)
{
uint default_database_length = strlen(default_database);
if (conn->default_database.reserve(default_database_length + 1))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
conn->default_database.q_append(default_database,
default_database_length + 1);
conn->default_database.length(default_database_length);
} else
conn->default_database.length(0);
DBUG_RETURN(0);
}
int spider_free_conn_alloc(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_free_conn_alloc");
spider_db_disconnect(conn);
#ifndef WITHOUT_SPIDER_BG_SEARCH
spider_free_conn_thread(conn);
#endif
if (conn->db_conn)
{
delete conn->db_conn;
conn->db_conn = NULL;
}
DBUG_ASSERT(!conn->mta_conn_mutex_file_pos.file_name);
pthread_mutex_destroy(&conn->mta_conn_mutex);
conn->default_database.free();
DBUG_RETURN(0);
}
void spider_free_conn_from_trx(
SPIDER_TRX *trx,
SPIDER_CONN *conn,
bool another,
bool trx_free,
int *roop_count
) {
ha_spider *spider;
DBUG_ENTER("spider_free_conn_from_trx");
spider_conn_clear_queue(conn);
conn->use_for_active_standby = FALSE;
conn->error_mode = 1;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (conn->conn_kind == SPIDER_CONN_KIND_MYSQL)
{
#endif
if (
trx_free ||
(
(
conn->server_lost ||
spider_param_conn_recycle_mode(trx->thd) != 2
) &&
!conn->opened_handlers
)
) {
conn->thd = NULL;
if (another)
{
ha_spider *next_spider;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
my_hash_delete_with_hash_value(&trx->trx_another_conn_hash,
conn->conn_key_hash_value, (uchar*) conn);
#else
my_hash_delete(&trx->trx_another_conn_hash, (uchar*) conn);
#endif
spider = (ha_spider*) conn->another_ha_first;
while (spider)
{
next_spider = spider->next;
spider_free_tmp_dbton_handler(spider);
spider_free_tmp_dbton_share(spider->share);
spider_free_tmp_share_alloc(spider->share);
spider_free(spider_current_trx, spider->share, MYF(0));
delete spider;
spider = next_spider;
}
conn->another_ha_first = NULL;
conn->another_ha_last = NULL;
} else {
#ifdef HASH_UPDATE_WITH_HASH_VALUE
my_hash_delete_with_hash_value(&trx->trx_conn_hash,
conn->conn_key_hash_value, (uchar*) conn);
#else
my_hash_delete(&trx->trx_conn_hash, (uchar*) conn);
#endif
}
if (
!trx_free &&
!conn->server_lost &&
!conn->queued_connect &&
spider_param_conn_recycle_mode(trx->thd) == 1
) {
/* conn_recycle_mode == 1 */
*conn->conn_key = '0';
conn->casual_read_base_conn = NULL;
if (
conn->quick_target &&
spider_db_free_result((ha_spider *) conn->quick_target, FALSE)
) {
spider_free_conn(conn);
} else {
pthread_mutex_lock(&spider_conn_mutex);
uint old_elements = spider_open_connections.array.max_element;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
if (my_hash_insert_with_hash_value(&spider_open_connections,
conn->conn_key_hash_value, (uchar*) conn))
#else
if (my_hash_insert(&spider_open_connections, (uchar*) conn))
#endif
{
pthread_mutex_unlock(&spider_conn_mutex);
spider_free_conn(conn);
} else {
if (spider_open_connections.array.max_element > old_elements)
{
spider_alloc_calc_mem(spider_current_trx,
spider_open_connections,
(spider_open_connections.array.max_element - old_elements) *
spider_open_connections.array.size_of_element);
}
pthread_mutex_unlock(&spider_conn_mutex);
}
}
} else {
/* conn_recycle_mode == 0 */
spider_free_conn(conn);
}
} else if (roop_count)
(*roop_count)++;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (conn->conn_kind == SPIDER_CONN_KIND_HS_READ)
{
spider_db_hs_request_buf_reset(conn);
if (
trx_free ||
(
(
conn->server_lost ||
spider_param_hs_r_conn_recycle_mode(trx->thd) != 2
) &&
!conn->opened_handlers
)
) {
conn->thd = NULL;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
my_hash_delete_with_hash_value(&trx->trx_hs_r_conn_hash,
conn->conn_key_hash_value, (uchar*) conn);
#else
my_hash_delete(&trx->trx_hs_r_conn_hash, (uchar*) conn);
#endif
DBUG_ASSERT(conn->opened_handlers ==
conn->db_conn->get_opened_handler_count());
if (conn->db_conn->get_opened_handler_count())
{
conn->db_conn->reset_opened_handler();
}
if (
!trx_free &&
!conn->server_lost &&
!conn->queued_connect &&
spider_param_hs_r_conn_recycle_mode(trx->thd) == 1
) {
/* conn_recycle_mode == 1 */
*conn->conn_key = '0';
pthread_mutex_lock(&spider_hs_r_conn_mutex);
uint old_elements = spider_hs_r_conn_hash.array.max_element;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
if (my_hash_insert_with_hash_value(&spider_hs_r_conn_hash,
conn->conn_key_hash_value, (uchar*) conn))
#else
if (my_hash_insert(&spider_hs_r_conn_hash, (uchar*) conn))
#endif
{
pthread_mutex_unlock(&spider_hs_r_conn_mutex);
spider_free_conn(conn);
} else {
if (spider_hs_r_conn_hash.array.max_element > old_elements)
{
spider_alloc_calc_mem(spider_current_trx,
spider_hs_r_conn_hash,
(spider_hs_r_conn_hash.array.max_element - old_elements) *
spider_hs_r_conn_hash.array.size_of_element);
}
pthread_mutex_unlock(&spider_hs_r_conn_mutex);
}
} else {
/* conn_recycle_mode == 0 */
spider_free_conn(conn);
}
} else if (roop_count)
(*roop_count)++;
} else {
spider_db_hs_request_buf_reset(conn);
if (
trx_free ||
(
(
conn->server_lost ||
spider_param_hs_w_conn_recycle_mode(trx->thd) != 2
) &&
!conn->opened_handlers
)
) {
conn->thd = NULL;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
my_hash_delete_with_hash_value(&trx->trx_hs_w_conn_hash,
conn->conn_key_hash_value, (uchar*) conn);
#else
my_hash_delete(&trx->trx_hs_w_conn_hash, (uchar*) conn);
#endif
DBUG_ASSERT(conn->opened_handlers ==
conn->db_conn->get_opened_handler_count());
if (conn->db_conn->get_opened_handler_count())
{
conn->db_conn->reset_opened_handler();
}
if (
!trx_free &&
!conn->server_lost &&
!conn->queued_connect &&
spider_param_hs_w_conn_recycle_mode(trx->thd) == 1
) {
/* conn_recycle_mode == 1 */
*conn->conn_key = '0';
pthread_mutex_lock(&spider_hs_w_conn_mutex);
uint old_elements = spider_hs_w_conn_hash.array.max_element;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
if (my_hash_insert_with_hash_value(&spider_hs_w_conn_hash,
conn->conn_key_hash_value, (uchar*) conn))
#else
if (my_hash_insert(&spider_hs_w_conn_hash, (uchar*) conn))
#endif
{
pthread_mutex_unlock(&spider_hs_w_conn_mutex);
spider_free_conn(conn);
} else {
if (spider_hs_w_conn_hash.array.max_element > old_elements)
{
spider_alloc_calc_mem(spider_current_trx,
spider_hs_w_conn_hash,
(spider_hs_w_conn_hash.array.max_element - old_elements) *
spider_hs_w_conn_hash.array.size_of_element);
}
pthread_mutex_unlock(&spider_hs_w_conn_mutex);
}
} else {
/* conn_recycle_mode == 0 */
spider_free_conn(conn);
}
} else if (roop_count)
(*roop_count)++;
}
#endif
DBUG_VOID_RETURN;
}
SPIDER_CONN *spider_create_conn(
SPIDER_SHARE *share,
ha_spider *spider,
int link_idx,
int base_link_idx,
uint conn_kind,
int *error_num
) {
int *need_mon;
SPIDER_CONN *conn;
char *tmp_name, *tmp_host, *tmp_username, *tmp_password, *tmp_socket;
char *tmp_wrapper, *tmp_ssl_ca, *tmp_ssl_capath, *tmp_ssl_cert;
char *tmp_ssl_cipher, *tmp_ssl_key, *tmp_default_file, *tmp_default_group;
DBUG_ENTER("spider_create_conn");
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (conn_kind == SPIDER_CONN_KIND_MYSQL)
{
#endif
if (!(conn = (SPIDER_CONN *)
spider_bulk_malloc(spider_current_trx, 18, MYF(MY_WME | MY_ZEROFILL),
&conn, sizeof(*conn),
&tmp_name, share->conn_keys_lengths[link_idx] + 1,
&tmp_host, share->tgt_hosts_lengths[link_idx] + 1,
&tmp_username,
share->tgt_usernames_lengths[link_idx] + 1,
&tmp_password,
share->tgt_passwords_lengths[link_idx] + 1,
&tmp_socket, share->tgt_sockets_lengths[link_idx] + 1,
&tmp_wrapper,
share->tgt_wrappers_lengths[link_idx] + 1,
&tmp_ssl_ca, share->tgt_ssl_cas_lengths[link_idx] + 1,
&tmp_ssl_capath,
share->tgt_ssl_capaths_lengths[link_idx] + 1,
&tmp_ssl_cert,
share->tgt_ssl_certs_lengths[link_idx] + 1,
&tmp_ssl_cipher,
share->tgt_ssl_ciphers_lengths[link_idx] + 1,
&tmp_ssl_key,
share->tgt_ssl_keys_lengths[link_idx] + 1,
&tmp_default_file,
share->tgt_default_files_lengths[link_idx] + 1,
&tmp_default_group,
share->tgt_default_groups_lengths[link_idx] + 1,
&need_mon, sizeof(int),
NullS))
) {
*error_num = HA_ERR_OUT_OF_MEM;
goto error_alloc_conn;
}
conn->default_database.init_calc_mem(75);
conn->conn_key_length = share->conn_keys_lengths[link_idx];
conn->conn_key = tmp_name;
memcpy(conn->conn_key, share->conn_keys[link_idx],
share->conn_keys_lengths[link_idx]);
#ifdef SPIDER_HAS_HASH_VALUE_TYPE
conn->conn_key_hash_value = share->conn_keys_hash_value[link_idx];
#endif
conn->tgt_host_length = share->tgt_hosts_lengths[link_idx];
conn->tgt_host = tmp_host;
memcpy(conn->tgt_host, share->tgt_hosts[link_idx],
share->tgt_hosts_lengths[link_idx]);
conn->tgt_username_length = share->tgt_usernames_lengths[link_idx];
conn->tgt_username = tmp_username;
memcpy(conn->tgt_username, share->tgt_usernames[link_idx],
share->tgt_usernames_lengths[link_idx]);
conn->tgt_password_length = share->tgt_passwords_lengths[link_idx];
conn->tgt_password = tmp_password;
memcpy(conn->tgt_password, share->tgt_passwords[link_idx],
share->tgt_passwords_lengths[link_idx]);
conn->tgt_socket_length = share->tgt_sockets_lengths[link_idx];
conn->tgt_socket = tmp_socket;
memcpy(conn->tgt_socket, share->tgt_sockets[link_idx],
share->tgt_sockets_lengths[link_idx]);
conn->tgt_wrapper_length = share->tgt_wrappers_lengths[link_idx];
conn->tgt_wrapper = tmp_wrapper;
memcpy(conn->tgt_wrapper, share->tgt_wrappers[link_idx],
share->tgt_wrappers_lengths[link_idx]);
conn->tgt_ssl_ca_length = share->tgt_ssl_cas_lengths[link_idx];
if (conn->tgt_ssl_ca_length)
{
conn->tgt_ssl_ca = tmp_ssl_ca;
memcpy(conn->tgt_ssl_ca, share->tgt_ssl_cas[link_idx],
share->tgt_ssl_cas_lengths[link_idx]);
} else
conn->tgt_ssl_ca = NULL;
conn->tgt_ssl_capath_length = share->tgt_ssl_capaths_lengths[link_idx];
if (conn->tgt_ssl_capath_length)
{
conn->tgt_ssl_capath = tmp_ssl_capath;
memcpy(conn->tgt_ssl_capath, share->tgt_ssl_capaths[link_idx],
share->tgt_ssl_capaths_lengths[link_idx]);
} else
conn->tgt_ssl_capath = NULL;
conn->tgt_ssl_cert_length = share->tgt_ssl_certs_lengths[link_idx];
if (conn->tgt_ssl_cert_length)
{
conn->tgt_ssl_cert = tmp_ssl_cert;
memcpy(conn->tgt_ssl_cert, share->tgt_ssl_certs[link_idx],
share->tgt_ssl_certs_lengths[link_idx]);
} else
conn->tgt_ssl_cert = NULL;
conn->tgt_ssl_cipher_length = share->tgt_ssl_ciphers_lengths[link_idx];
if (conn->tgt_ssl_cipher_length)
{
conn->tgt_ssl_cipher = tmp_ssl_cipher;
memcpy(conn->tgt_ssl_cipher, share->tgt_ssl_ciphers[link_idx],
share->tgt_ssl_ciphers_lengths[link_idx]);
} else
conn->tgt_ssl_cipher = NULL;
conn->tgt_ssl_key_length = share->tgt_ssl_keys_lengths[link_idx];
if (conn->tgt_ssl_key_length)
{
conn->tgt_ssl_key = tmp_ssl_key;
memcpy(conn->tgt_ssl_key, share->tgt_ssl_keys[link_idx],
share->tgt_ssl_keys_lengths[link_idx]);
} else
conn->tgt_ssl_key = NULL;
conn->tgt_default_file_length = share->tgt_default_files_lengths[link_idx];
if (conn->tgt_default_file_length)
{
conn->tgt_default_file = tmp_default_file;
memcpy(conn->tgt_default_file, share->tgt_default_files[link_idx],
share->tgt_default_files_lengths[link_idx]);
} else
conn->tgt_default_file = NULL;
conn->tgt_default_group_length =
share->tgt_default_groups_lengths[link_idx];
if (conn->tgt_default_group_length)
{
conn->tgt_default_group = tmp_default_group;
memcpy(conn->tgt_default_group, share->tgt_default_groups[link_idx],
share->tgt_default_groups_lengths[link_idx]);
} else
conn->tgt_default_group = NULL;
conn->tgt_port = share->tgt_ports[link_idx];
conn->tgt_ssl_vsc = share->tgt_ssl_vscs[link_idx];
conn->dbton_id = share->sql_dbton_ids[link_idx];
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (conn_kind == SPIDER_CONN_KIND_HS_READ) {
if (!(conn = (SPIDER_CONN *)
spider_bulk_malloc(spider_current_trx, 19, MYF(MY_WME | MY_ZEROFILL),
&conn, sizeof(*conn),
&tmp_name, share->hs_read_conn_keys_lengths[link_idx] + 1,
&tmp_host, share->tgt_hosts_lengths[link_idx] + 1,
&tmp_socket, share->hs_read_socks_lengths[link_idx] + 1,
&tmp_wrapper,
share->tgt_wrappers_lengths[link_idx] + 1,
&need_mon, sizeof(int),
NullS))
) {
*error_num = HA_ERR_OUT_OF_MEM;
goto error_alloc_conn;
}
conn->default_database.init_calc_mem(76);
conn->conn_key_length = share->hs_read_conn_keys_lengths[link_idx];
conn->conn_key = tmp_name;
memcpy(conn->conn_key, share->hs_read_conn_keys[link_idx],
share->hs_read_conn_keys_lengths[link_idx]);
#ifdef SPIDER_HAS_HASH_VALUE_TYPE
conn->conn_key_hash_value = share->hs_read_conn_keys_hash_value[link_idx];
#endif
conn->tgt_host_length = share->tgt_hosts_lengths[link_idx];
conn->tgt_host = tmp_host;
memcpy(conn->tgt_host, share->tgt_hosts[link_idx],
share->tgt_hosts_lengths[link_idx]);
conn->hs_sock_length = share->hs_read_socks_lengths[link_idx];
if (conn->hs_sock_length)
{
conn->hs_sock = tmp_socket;
memcpy(conn->hs_sock, share->hs_read_socks[link_idx],
share->hs_read_socks_lengths[link_idx]);
} else
conn->hs_sock = NULL;
conn->tgt_wrapper_length = share->tgt_wrappers_lengths[link_idx];
conn->tgt_wrapper = tmp_wrapper;
memcpy(conn->tgt_wrapper, share->tgt_wrappers[link_idx],
share->tgt_wrappers_lengths[link_idx]);
conn->hs_port = share->hs_read_ports[link_idx];
conn->dbton_id = share->hs_dbton_ids[link_idx];
} else {
if (!(conn = (SPIDER_CONN *)
spider_bulk_malloc(spider_current_trx, 20, MYF(MY_WME | MY_ZEROFILL),
&conn, sizeof(*conn),
&tmp_name, share->hs_write_conn_keys_lengths[link_idx] + 1,
&tmp_host, share->tgt_hosts_lengths[link_idx] + 1,
&tmp_socket, share->hs_write_socks_lengths[link_idx] + 1,
&tmp_wrapper,
share->tgt_wrappers_lengths[link_idx] + 1,
&need_mon, sizeof(int),
NullS))
) {
*error_num = HA_ERR_OUT_OF_MEM;
goto error_alloc_conn;
}
conn->default_database.init_calc_mem(77);
conn->conn_key_length = share->hs_write_conn_keys_lengths[link_idx];
conn->conn_key = tmp_name;
memcpy(conn->conn_key, share->hs_write_conn_keys[link_idx],
share->hs_write_conn_keys_lengths[link_idx]);
#ifdef SPIDER_HAS_HASH_VALUE_TYPE
conn->conn_key_hash_value = share->hs_write_conn_keys_hash_value[link_idx];
#endif
conn->tgt_host_length = share->tgt_hosts_lengths[link_idx];
conn->tgt_host = tmp_host;
memcpy(conn->tgt_host, share->tgt_hosts[link_idx],
share->tgt_hosts_lengths[link_idx]);
conn->hs_sock_length = share->hs_write_socks_lengths[link_idx];
if (conn->hs_sock_length)
{
conn->hs_sock = tmp_socket;
memcpy(conn->hs_sock, share->hs_write_socks[link_idx],
share->hs_write_socks_lengths[link_idx]);
} else
conn->hs_sock = NULL;
conn->tgt_wrapper_length = share->tgt_wrappers_lengths[link_idx];
conn->tgt_wrapper = tmp_wrapper;
memcpy(conn->tgt_wrapper, share->tgt_wrappers[link_idx],
share->tgt_wrappers_lengths[link_idx]);
conn->hs_port = share->hs_write_ports[link_idx];
conn->dbton_id = share->hs_dbton_ids[link_idx];
}
#endif
if (!(conn->db_conn = spider_dbton[conn->dbton_id].create_db_conn(conn)))
{
*error_num = HA_ERR_OUT_OF_MEM;
goto error_db_conn_create;
}
if ((*error_num = conn->db_conn->init()))
{
goto error_db_conn_init;
}
conn->join_trx = 0;
conn->thd = NULL;
conn->table_lock = 0;
conn->semi_trx_isolation = -2;
conn->semi_trx_isolation_chk = FALSE;
conn->semi_trx_chk = FALSE;
conn->link_idx = base_link_idx;
conn->conn_kind = conn_kind;
conn->conn_need_mon = need_mon;
if (spider)
conn->need_mon = &spider->need_mons[base_link_idx];
else
conn->need_mon = need_mon;
#if MYSQL_VERSION_ID < 50500
if (pthread_mutex_init(&conn->mta_conn_mutex, MY_MUTEX_INIT_FAST))
#else
if (mysql_mutex_init(spd_key_mutex_mta_conn, &conn->mta_conn_mutex,
MY_MUTEX_INIT_FAST))
#endif
{
*error_num = HA_ERR_OUT_OF_MEM;
goto error_mta_conn_mutex_init;
}
spider_conn_queue_connect(share, conn, link_idx);
conn->ping_time = (time_t) time((time_t*) 0);
conn->connect_error_time = conn->ping_time;
pthread_mutex_lock(&spider_conn_id_mutex);
conn->conn_id = spider_conn_id;
++spider_conn_id;
pthread_mutex_unlock(&spider_conn_id_mutex);
DBUG_RETURN(conn);
/*
error_init_lock_table_hash:
DBUG_ASSERT(!conn->mta_conn_mutex_file_pos.file_name);
pthread_mutex_destroy(&conn->mta_conn_mutex);
*/
error_mta_conn_mutex_init:
error_db_conn_init:
delete conn->db_conn;
error_db_conn_create:
spider_free(spider_current_trx, conn, MYF(0));
error_alloc_conn:
DBUG_RETURN(NULL);
}
SPIDER_CONN *spider_get_conn(
SPIDER_SHARE *share,
int link_idx,
char *conn_key,
SPIDER_TRX *trx,
ha_spider *spider,
bool another,
bool thd_chg,
uint conn_kind,
int *error_num
) {
SPIDER_CONN *conn = NULL;
int base_link_idx = link_idx;
DBUG_ENTER("spider_get_conn");
DBUG_PRINT("info",("spider conn_kind=%u", conn_kind));
if (spider)
link_idx = spider->conn_link_idx[base_link_idx];
DBUG_PRINT("info",("spider link_idx=%u", link_idx));
DBUG_PRINT("info",("spider base_link_idx=%u", base_link_idx));
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (conn_kind == SPIDER_CONN_KIND_MYSQL)
{
#endif
#ifndef DBUG_OFF
spider_print_keys(conn_key, share->conn_keys_lengths[link_idx]);
#endif
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
{
conn_key = share->hs_read_conn_keys[link_idx];
#ifndef DBUG_OFF
spider_print_keys(conn_key, share->hs_read_conn_keys_lengths[link_idx]);
#endif
} else {
conn_key = share->hs_write_conn_keys[link_idx];
#ifndef DBUG_OFF
spider_print_keys(conn_key, share->hs_write_conn_keys_lengths[link_idx]);
#endif
}
#endif
#ifdef SPIDER_HAS_HASH_VALUE_TYPE
if (
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
(conn_kind == SPIDER_CONN_KIND_MYSQL &&
(
#endif
(another &&
!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
&trx->trx_another_conn_hash,
share->conn_keys_hash_value[link_idx],
(uchar*) conn_key, share->conn_keys_lengths[link_idx]))) ||
(!another &&
!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
&trx->trx_conn_hash,
share->conn_keys_hash_value[link_idx],
(uchar*) conn_key, share->conn_keys_lengths[link_idx])))
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
)
) ||
(conn_kind == SPIDER_CONN_KIND_HS_READ &&
!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
&trx->trx_hs_r_conn_hash,
share->hs_read_conn_keys_hash_value[link_idx],
(uchar*) conn_key, share->hs_read_conn_keys_lengths[link_idx]))
) ||
(conn_kind == SPIDER_CONN_KIND_HS_WRITE &&
!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
&trx->trx_hs_w_conn_hash,
share->hs_write_conn_keys_hash_value[link_idx],
(uchar*) conn_key, share->hs_write_conn_keys_lengths[link_idx]))
)
#endif
)
#else
if (
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
(conn_kind == SPIDER_CONN_KIND_MYSQL &&
(
#endif
(another &&
!(conn = (SPIDER_CONN*) my_hash_search(&trx->trx_another_conn_hash,
(uchar*) conn_key, share->conn_keys_lengths[link_idx]))) ||
(!another &&
!(conn = (SPIDER_CONN*) my_hash_search(&trx->trx_conn_hash,
(uchar*) conn_key, share->conn_keys_lengths[link_idx])))
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
)
) ||
(conn_kind == SPIDER_CONN_KIND_HS_READ &&
!(conn = (SPIDER_CONN*) my_hash_search(&trx->trx_hs_r_conn_hash,
(uchar*) conn_key, share->hs_read_conn_keys_lengths[link_idx]))
) ||
(conn_kind == SPIDER_CONN_KIND_HS_WRITE &&
!(conn = (SPIDER_CONN*) my_hash_search(&trx->trx_hs_w_conn_hash,
(uchar*) conn_key, share->hs_write_conn_keys_lengths[link_idx]))
)
#endif
)
#endif
{
if (
!trx->thd ||
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
(conn_kind == SPIDER_CONN_KIND_MYSQL &&
#endif
(
(spider_param_conn_recycle_mode(trx->thd) & 1) ||
spider_param_conn_recycle_strict(trx->thd)
)
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
) ||
(conn_kind == SPIDER_CONN_KIND_HS_READ &&
(
(spider_param_hs_r_conn_recycle_mode(trx->thd) & 1) ||
spider_param_hs_r_conn_recycle_strict(trx->thd)
)
) ||
(conn_kind == SPIDER_CONN_KIND_HS_WRITE &&
(
(spider_param_hs_w_conn_recycle_mode(trx->thd) & 1) ||
spider_param_hs_w_conn_recycle_strict(trx->thd)
)
)
#endif
) {
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (conn_kind == SPIDER_CONN_KIND_MYSQL)
{
#endif
pthread_mutex_lock(&spider_conn_mutex);
#ifdef SPIDER_HAS_HASH_VALUE_TYPE
if (!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
&spider_open_connections, share->conn_keys_hash_value[link_idx],
(uchar*) share->conn_keys[link_idx],
share->conn_keys_lengths[link_idx])))
#else
if (!(conn = (SPIDER_CONN*) my_hash_search(&spider_open_connections,
(uchar*) share->conn_keys[link_idx],
share->conn_keys_lengths[link_idx])))
#endif
{
pthread_mutex_unlock(&spider_conn_mutex);
DBUG_PRINT("info",("spider create new conn"));
if (!(conn = spider_create_conn(share, spider, link_idx,
base_link_idx, conn_kind, error_num)))
goto error;
*conn->conn_key = *conn_key;
if (spider)
{
spider->conns[base_link_idx] = conn;
if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
conn->use_for_active_standby = TRUE;
}
} else {
#ifdef HASH_UPDATE_WITH_HASH_VALUE
my_hash_delete_with_hash_value(&spider_open_connections,
conn->conn_key_hash_value, (uchar*) conn);
#else
my_hash_delete(&spider_open_connections, (uchar*) conn);
#endif
pthread_mutex_unlock(&spider_conn_mutex);
DBUG_PRINT("info",("spider get global conn"));
if (spider)
{
spider->conns[base_link_idx] = conn;
if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
conn->use_for_active_standby = TRUE;
}
}
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
{
pthread_mutex_lock(&spider_hs_r_conn_mutex);
#ifdef SPIDER_HAS_HASH_VALUE_TYPE
if (!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
&spider_hs_r_conn_hash,
share->hs_read_conn_keys_hash_value[link_idx],
(uchar*) share->hs_read_conn_keys[link_idx],
share->hs_read_conn_keys_lengths[link_idx])))
#else
if (!(conn = (SPIDER_CONN*) my_hash_search(&spider_hs_r_conn_hash,
(uchar*) share->hs_read_conn_keys[link_idx],
share->hs_read_conn_keys_lengths[link_idx])))
#endif
{
pthread_mutex_unlock(&spider_hs_r_conn_mutex);
DBUG_PRINT("info",("spider create new hs r conn"));
if (!(conn = spider_create_conn(share, spider, link_idx,
base_link_idx, conn_kind, error_num)))
goto error;
*conn->conn_key = *conn_key;
if (spider)
{
spider->hs_r_conns[base_link_idx] = conn;
if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
conn->use_for_active_standby = TRUE;
}
} else {
#ifdef HASH_UPDATE_WITH_HASH_VALUE
my_hash_delete_with_hash_value(&spider_hs_r_conn_hash,
conn->conn_key_hash_value, (uchar*) conn);
#else
my_hash_delete(&spider_hs_r_conn_hash, (uchar*) conn);
#endif
pthread_mutex_unlock(&spider_hs_r_conn_mutex);
DBUG_PRINT("info",("spider get global hs r conn"));
if (spider)
{
spider->hs_r_conns[base_link_idx] = conn;
if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
conn->use_for_active_standby = TRUE;
}
}
} else {
pthread_mutex_lock(&spider_hs_w_conn_mutex);
#ifdef SPIDER_HAS_HASH_VALUE_TYPE
if (!(conn = (SPIDER_CONN*) my_hash_search_using_hash_value(
&spider_hs_w_conn_hash,
share->hs_write_conn_keys_hash_value[link_idx],
(uchar*) share->hs_write_conn_keys[link_idx],
share->hs_write_conn_keys_lengths[link_idx])))
#else
if (!(conn = (SPIDER_CONN*) my_hash_search(&spider_hs_w_conn_hash,
(uchar*) share->hs_write_conn_keys[link_idx],
share->hs_write_conn_keys_lengths[link_idx])))
#endif
{
pthread_mutex_unlock(&spider_hs_w_conn_mutex);
DBUG_PRINT("info",("spider create new hs w conn"));
if (!(conn = spider_create_conn(share, spider, link_idx,
base_link_idx, conn_kind, error_num)))
goto error;
*conn->conn_key = *conn_key;
if (spider)
{
spider->hs_w_conns[base_link_idx] = conn;
if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
conn->use_for_active_standby = TRUE;
}
} else {
#ifdef HASH_UPDATE_WITH_HASH_VALUE
my_hash_delete_with_hash_value(&spider_hs_w_conn_hash,
conn->conn_key_hash_value, (uchar*) conn);
#else
my_hash_delete(&spider_hs_w_conn_hash, (uchar*) conn);
#endif
pthread_mutex_unlock(&spider_hs_w_conn_mutex);
DBUG_PRINT("info",("spider get global hs w conn"));
if (spider)
{
spider->hs_w_conns[base_link_idx] = conn;
if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
conn->use_for_active_standby = TRUE;
}
}
}
#endif
} else {
DBUG_PRINT("info",("spider create new conn"));
/* conn_recycle_strict = 0 and conn_recycle_mode = 0 or 2 */
if (!(conn = spider_create_conn(share, spider, link_idx, base_link_idx,
conn_kind, error_num)))
goto error;
*conn->conn_key = *conn_key;
if (spider)
{
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (conn_kind == SPIDER_CONN_KIND_MYSQL)
{
#endif
spider->conns[base_link_idx] = conn;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
{
spider->hs_r_conns[base_link_idx] = conn;
} else {
spider->hs_w_conns[base_link_idx] = conn;
}
#endif
if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
conn->use_for_active_standby = TRUE;
}
}
conn->thd = trx->thd;
conn->priority = share->priority;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (conn_kind == SPIDER_CONN_KIND_MYSQL)
{
#endif
if (another)
{
uint old_elements = trx->trx_another_conn_hash.array.max_element;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
if (my_hash_insert_with_hash_value(&trx->trx_another_conn_hash,
share->conn_keys_hash_value[link_idx],
(uchar*) conn))
#else
if (my_hash_insert(&trx->trx_another_conn_hash, (uchar*) conn))
#endif
{
spider_free_conn(conn);
*error_num = HA_ERR_OUT_OF_MEM;
goto error;
}
if (trx->trx_another_conn_hash.array.max_element > old_elements)
{
spider_alloc_calc_mem(spider_current_trx,
trx->trx_another_conn_hash,
(trx->trx_another_conn_hash.array.max_element - old_elements) *
trx->trx_another_conn_hash.array.size_of_element);
}
} else {
uint old_elements = trx->trx_conn_hash.array.max_element;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
if (my_hash_insert_with_hash_value(&trx->trx_conn_hash,
share->conn_keys_hash_value[link_idx],
(uchar*) conn))
#else
if (my_hash_insert(&trx->trx_conn_hash, (uchar*) conn))
#endif
{
spider_free_conn(conn);
*error_num = HA_ERR_OUT_OF_MEM;
goto error;
}
if (trx->trx_conn_hash.array.max_element > old_elements)
{
spider_alloc_calc_mem(spider_current_trx,
trx->trx_conn_hash,
(trx->trx_conn_hash.array.max_element - old_elements) *
trx->trx_conn_hash.array.size_of_element);
}
}
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
{
uint old_elements = trx->trx_hs_r_conn_hash.array.max_element;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
if (my_hash_insert_with_hash_value(&trx->trx_hs_r_conn_hash,
share->hs_read_conn_keys_hash_value[link_idx],
(uchar*) conn))
#else
if (my_hash_insert(&trx->trx_hs_r_conn_hash, (uchar*) conn))
#endif
{
spider_free_conn(conn);
*error_num = HA_ERR_OUT_OF_MEM;
goto error;
}
if (trx->trx_hs_r_conn_hash.array.max_element > old_elements)
{
spider_alloc_calc_mem(spider_current_trx,
trx->trx_hs_r_conn_hash,
(trx->trx_hs_r_conn_hash.array.max_element - old_elements) *
trx->trx_hs_r_conn_hash.array.size_of_element);
}
} else {
uint old_elements = trx->trx_hs_w_conn_hash.array.max_element;
#ifdef HASH_UPDATE_WITH_HASH_VALUE
if (my_hash_insert_with_hash_value(&trx->trx_hs_w_conn_hash,
share->hs_write_conn_keys_hash_value[link_idx],
(uchar*) conn))
#else
if (my_hash_insert(&trx->trx_hs_w_conn_hash, (uchar*) conn))
#endif
{
spider_free_conn(conn);
*error_num = HA_ERR_OUT_OF_MEM;
goto error;
}
if (trx->trx_hs_w_conn_hash.array.max_element > old_elements)
{
spider_alloc_calc_mem(spider_current_trx,
trx->trx_hs_w_conn_hash,
(trx->trx_hs_w_conn_hash.array.max_element - old_elements) *
trx->trx_hs_w_conn_hash.array.size_of_element);
}
}
#endif
} else if (spider)
{
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (conn_kind == SPIDER_CONN_KIND_MYSQL)
{
#endif
spider->conns[base_link_idx] = conn;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (conn_kind == SPIDER_CONN_KIND_HS_READ)
{
spider->hs_r_conns[base_link_idx] = conn;
} else {
spider->hs_w_conns[base_link_idx] = conn;
}
#endif
if (spider_bit_is_set(spider->conn_can_fo, base_link_idx))
conn->use_for_active_standby = TRUE;
}
conn->link_idx = base_link_idx;
if (conn->queued_connect)
spider_conn_queue_connect_rewrite(share, conn, link_idx);
if (conn->queued_ping)
{
if (spider)
spider_conn_queue_ping_rewrite(spider, conn, base_link_idx);
else
conn->queued_ping = FALSE;
}
DBUG_PRINT("info",("spider conn=%p", conn));
DBUG_RETURN(conn);
error:
DBUG_RETURN(NULL);
}
int spider_free_conn(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_free_conn");
DBUG_PRINT("info", ("spider conn=%p", conn));
spider_free_conn_alloc(conn);
spider_free(spider_current_trx, conn, MYF(0));
DBUG_RETURN(0);
}
int spider_check_and_get_casual_read_conn(
THD *thd,
ha_spider *spider,
int link_idx
) {
int error_num;
DBUG_ENTER("spider_check_and_get_casual_read_conn");
if (spider->result_list.casual_read[link_idx])
{
SPIDER_CONN *conn = spider->conns[link_idx];
if (conn->casual_read_query_id != thd->query_id)
{
conn->casual_read_query_id = thd->query_id;
conn->casual_read_current_id = 2;
}
if (spider->result_list.casual_read[link_idx] == 1)
{
spider->result_list.casual_read[link_idx] = conn->casual_read_current_id;
++conn->casual_read_current_id;
if (conn->casual_read_current_id > 63)
{
conn->casual_read_current_id = 2;
}
}
char first_byte_bak = *spider->conn_keys[link_idx];
*spider->conn_keys[link_idx] =
'0' + spider->result_list.casual_read[link_idx];
if (
!(spider->conns[link_idx] =
spider_get_conn(spider->share, link_idx,
spider->conn_keys[link_idx], spider->trx,
spider, FALSE, TRUE, SPIDER_CONN_KIND_MYSQL,
&error_num))
) {
*spider->conn_keys[link_idx] = first_byte_bak;
DBUG_RETURN(error_num);
}
*spider->conn_keys[link_idx] = first_byte_bak;
spider->conns[link_idx]->casual_read_base_conn = conn;
conn = spider->conns[link_idx];
spider_check_and_set_autocommit(thd, conn, NULL);
}
DBUG_RETURN(0);
}
int spider_check_and_init_casual_read(
THD *thd,
ha_spider *spider,
int link_idx
) {
int error_num;
SPIDER_RESULT_LIST *result_list = &spider->result_list;
SPIDER_SHARE *share = spider->share;
DBUG_ENTER("spider_check_and_init_casual_read");
if (
spider_param_sync_autocommit(thd) &&
(!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(
result_list->direct_order_limit
#ifdef HANDLER_HAS_DIRECT_AGGREGATE
|| result_list->direct_aggregate
#endif
)
) {
if (!result_list->casual_read[link_idx])
{
result_list->casual_read[link_idx] =
spider_param_casual_read(thd, share->casual_read);
}
if ((error_num = spider_check_and_get_casual_read_conn(thd, spider,
link_idx)))
{
DBUG_RETURN(error_num);
}
SPIDER_CONN *conn = spider->conns[link_idx];
if (
conn->casual_read_base_conn &&
(error_num = spider_create_conn_thread(conn))
) {
DBUG_RETURN(error_num);
}
}
DBUG_RETURN(0);
}
void spider_conn_queue_connect(
SPIDER_SHARE *share,
SPIDER_CONN *conn,
int link_idx
) {
DBUG_ENTER("spider_conn_queue_connect");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_connect = TRUE;
/*
conn->queued_connect_share = share;
conn->queued_connect_link_idx = link_idx;
*/
DBUG_VOID_RETURN;
}
void spider_conn_queue_connect_rewrite(
SPIDER_SHARE *share,
SPIDER_CONN *conn,
int link_idx
) {
DBUG_ENTER("spider_conn_queue_connect_rewrite");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_connect_share = share;
conn->queued_connect_link_idx = link_idx;
DBUG_VOID_RETURN;
}
void spider_conn_queue_ping(
ha_spider *spider,
SPIDER_CONN *conn,
int link_idx
) {
DBUG_ENTER("spider_conn_queue_ping");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_ping = TRUE;
conn->queued_ping_spider = spider;
conn->queued_ping_link_idx = link_idx;
DBUG_VOID_RETURN;
}
void spider_conn_queue_ping_rewrite(
ha_spider *spider,
SPIDER_CONN *conn,
int link_idx
) {
DBUG_ENTER("spider_conn_queue_ping_rewrite");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_ping_spider = spider;
conn->queued_ping_link_idx = link_idx;
DBUG_VOID_RETURN;
}
void spider_conn_queue_trx_isolation(
SPIDER_CONN *conn,
int trx_isolation
) {
DBUG_ENTER("spider_conn_queue_trx_isolation");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_trx_isolation = TRUE;
conn->queued_trx_isolation_val = trx_isolation;
DBUG_VOID_RETURN;
}
void spider_conn_queue_semi_trx_isolation(
SPIDER_CONN *conn,
int trx_isolation
) {
DBUG_ENTER("spider_conn_queue_semi_trx_isolation");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_semi_trx_isolation = TRUE;
conn->queued_semi_trx_isolation_val = trx_isolation;
DBUG_VOID_RETURN;
}
void spider_conn_queue_autocommit(
SPIDER_CONN *conn,
bool autocommit
) {
DBUG_ENTER("spider_conn_queue_autocommit");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_autocommit = TRUE;
conn->queued_autocommit_val = autocommit;
DBUG_VOID_RETURN;
}
void spider_conn_queue_sql_log_off(
SPIDER_CONN *conn,
bool sql_log_off
) {
DBUG_ENTER("spider_conn_queue_sql_log_off");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_sql_log_off = TRUE;
conn->queued_sql_log_off_val = sql_log_off;
DBUG_VOID_RETURN;
}
void spider_conn_queue_time_zone(
SPIDER_CONN *conn,
Time_zone *time_zone
) {
DBUG_ENTER("spider_conn_queue_time_zone");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_time_zone = TRUE;
conn->queued_time_zone_val = time_zone;
DBUG_VOID_RETURN;
}
void spider_conn_queue_start_transaction(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_conn_queue_start_transaction");
DBUG_PRINT("info", ("spider conn=%p", conn));
DBUG_ASSERT(!conn->trx_start);
conn->queued_trx_start = TRUE;
conn->trx_start = TRUE;
DBUG_VOID_RETURN;
}
void spider_conn_queue_xa_start(
SPIDER_CONN *conn,
XID *xid
) {
DBUG_ENTER("spider_conn_queue_xa_start");
DBUG_PRINT("info", ("spider conn=%p", conn));
conn->queued_xa_start = TRUE;
conn->queued_xa_start_xid = xid;
DBUG_VOID_RETURN;
}
void spider_conn_clear_queue(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_conn_clear_queue");
DBUG_PRINT("info", ("spider conn=%p", conn));
/*
conn->queued_connect = FALSE;
conn->queued_ping = FALSE;
*/
conn->queued_trx_isolation = FALSE;
conn->queued_semi_trx_isolation = FALSE;
conn->queued_autocommit = FALSE;
conn->queued_sql_log_off = FALSE;
conn->queued_time_zone = FALSE;
conn->queued_trx_start = FALSE;
conn->queued_xa_start = FALSE;
DBUG_VOID_RETURN;
}
void spider_conn_clear_queue_at_commit(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_conn_clear_queue_at_commit");
DBUG_PRINT("info", ("spider conn=%p", conn));
if (conn->queued_trx_start)
{
conn->queued_trx_start = FALSE;
conn->trx_start = FALSE;
}
conn->queued_xa_start = FALSE;
DBUG_VOID_RETURN;
}
void spider_conn_set_timeout(
SPIDER_CONN *conn,
uint net_read_timeout,
uint net_write_timeout
) {
DBUG_ENTER("spider_conn_set_timeout");
DBUG_PRINT("info", ("spider conn=%p", conn));
if (net_read_timeout != conn->net_read_timeout)
{
DBUG_PRINT("info",("spider net_read_timeout set from %u to %u",
conn->net_read_timeout, net_read_timeout));
conn->queued_net_timeout = TRUE;
conn->net_read_timeout = net_read_timeout;
}
if (net_write_timeout != conn->net_write_timeout)
{
DBUG_PRINT("info",("spider net_write_timeout set from %u to %u",
conn->net_write_timeout, net_write_timeout));
conn->queued_net_timeout = TRUE;
conn->net_write_timeout = net_write_timeout;
}
DBUG_VOID_RETURN;
}
void spider_conn_set_timeout_from_share(
SPIDER_CONN *conn,
int link_idx,
THD *thd,
SPIDER_SHARE *share
) {
DBUG_ENTER("spider_conn_set_timeout_from_share");
spider_conn_set_timeout(
conn,
spider_param_net_read_timeout(thd, share->net_read_timeouts[link_idx]),
spider_param_net_write_timeout(thd, share->net_write_timeouts[link_idx])
);
DBUG_VOID_RETURN;
}
void spider_conn_set_timeout_from_direct_sql(
SPIDER_CONN *conn,
THD *thd,
SPIDER_DIRECT_SQL *direct_sql
) {
DBUG_ENTER("spider_conn_set_timeout_from_direct_sql");
spider_conn_set_timeout(
conn,
spider_param_net_read_timeout(thd, direct_sql->net_read_timeout),
spider_param_net_write_timeout(thd, direct_sql->net_write_timeout)
);
DBUG_VOID_RETURN;
}
void spider_tree_insert(
SPIDER_CONN *top,
SPIDER_CONN *conn
) {
SPIDER_CONN *current = top;
longlong priority = conn->priority;
DBUG_ENTER("spider_tree_insert");
while (TRUE)
{
if (priority < current->priority)
{
if (current->c_small == NULL)
{
conn->p_small = NULL;
conn->p_big = current;
conn->c_small = NULL;
conn->c_big = NULL;
current->c_small = conn;
break;
} else
current = current->c_small;
} else {
if (current->c_big == NULL)
{
conn->p_small = current;
conn->p_big = NULL;
conn->c_small = NULL;
conn->c_big = NULL;
current->c_big = conn;
break;
} else
current = current->c_big;
}
}
DBUG_VOID_RETURN;
}
SPIDER_CONN *spider_tree_first(
SPIDER_CONN *top
) {
SPIDER_CONN *current = top;
DBUG_ENTER("spider_tree_first");
while (current)
{
if (current->c_small == NULL)
break;
else
current = current->c_small;
}
DBUG_RETURN(current);
}
SPIDER_CONN *spider_tree_last(
SPIDER_CONN *top
) {
SPIDER_CONN *current = top;
DBUG_ENTER("spider_tree_last");
while (TRUE)
{
if (current->c_big == NULL)
break;
else
current = current->c_big;
}
DBUG_RETURN(current);
}
SPIDER_CONN *spider_tree_next(
SPIDER_CONN *current
) {
DBUG_ENTER("spider_tree_next");
if (current->c_big)
DBUG_RETURN(spider_tree_first(current->c_big));
while (TRUE)
{
if (current->p_big)
DBUG_RETURN(current->p_big);
if (!current->p_small)
DBUG_RETURN(NULL);
current = current->p_small;
}
}
SPIDER_CONN *spider_tree_delete(
SPIDER_CONN *conn,
SPIDER_CONN *top
) {
DBUG_ENTER("spider_tree_delete");
if (conn->p_small)
{
if (conn->c_small)
{
conn->c_small->p_big = NULL;
conn->c_small->p_small = conn->p_small;
conn->p_small->c_big = conn->c_small;
if (conn->c_big)
{
SPIDER_CONN *last = spider_tree_last(conn->c_small);
conn->c_big->p_small = last;
last->c_big = conn->c_big;
}
} else if (conn->c_big)
{
conn->c_big->p_small = conn->p_small;
conn->p_small->c_big = conn->c_big;
} else
conn->p_small->c_big = NULL;
} else if (conn->p_big)
{
if (conn->c_small)
{
conn->c_small->p_big = conn->p_big;
conn->p_big->c_small = conn->c_small;
if (conn->c_big)
{
SPIDER_CONN *last = spider_tree_last(conn->c_small);
conn->c_big->p_small = last;
last->c_big = conn->c_big;
}
} else if (conn->c_big)
{
conn->c_big->p_big = conn->p_big;
conn->c_big->p_small = NULL;
conn->p_big->c_small = conn->c_big;
} else
conn->p_big->c_small = NULL;
} else {
if (conn->c_small)
{
conn->c_small->p_big = NULL;
conn->c_small->p_small = NULL;
if (conn->c_big)
{
SPIDER_CONN *last = spider_tree_last(conn->c_small);
conn->c_big->p_small = last;
last->c_big = conn->c_big;
}
DBUG_RETURN(conn->c_small);
} else if (conn->c_big)
{
conn->c_big->p_small = NULL;
DBUG_RETURN(conn->c_big);
}
DBUG_RETURN(NULL);
}
DBUG_RETURN(top);
}
#ifndef WITHOUT_SPIDER_BG_SEARCH
int spider_set_conn_bg_param(
ha_spider *spider
) {
int error_num, roop_count, bgs_mode;
SPIDER_SHARE *share = spider->share;
SPIDER_RESULT_LIST *result_list = &spider->result_list;
THD *thd = spider->trx->thd;
DBUG_ENTER("spider_set_conn_bg_param");
bgs_mode =
spider_param_bgs_mode(thd, share->bgs_mode);
if (bgs_mode == 0)
result_list->bgs_phase = 0;
else if (
bgs_mode <= 2 &&
(result_list->lock_type == F_WRLCK || spider->lock_mode == 2)
)
result_list->bgs_phase = 0;
else if (bgs_mode <= 1 && spider->lock_mode == 1)
result_list->bgs_phase = 0;
else {
result_list->bgs_phase = 1;
result_list->bgs_split_read = spider_bg_split_read_param(spider);
if (spider->use_pre_call)
{
DBUG_PRINT("info",("spider use_pre_call=TRUE"));
result_list->bgs_first_read = result_list->bgs_split_read;
result_list->bgs_second_read = result_list->bgs_split_read;
} else {
DBUG_PRINT("info",("spider use_pre_call=FALSE"));
result_list->bgs_first_read =
spider_param_bgs_first_read(thd, share->bgs_first_read);
result_list->bgs_second_read =
spider_param_bgs_second_read(thd, share->bgs_second_read);
}
DBUG_PRINT("info",("spider bgs_split_read=%lld",
result_list->bgs_split_read));
DBUG_PRINT("info",("spider bgs_first_read=%lld", share->bgs_first_read));
DBUG_PRINT("info",("spider bgs_second_read=%lld", share->bgs_second_read));
result_list->split_read =
result_list->bgs_first_read > 0 ?
result_list->bgs_first_read :
result_list->bgs_split_read;
}
if (result_list->bgs_phase > 0)
{
for (
roop_count = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, -1, share->link_count,
spider->lock_mode ?
SPIDER_LINK_STATUS_RECOVERY : SPIDER_LINK_STATUS_OK);
roop_count < (int) share->link_count;
roop_count = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, roop_count, share->link_count,
spider->lock_mode ?
SPIDER_LINK_STATUS_RECOVERY : SPIDER_LINK_STATUS_OK)
) {
if ((error_num = spider_create_conn_thread(spider->conns[roop_count])))
DBUG_RETURN(error_num);
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if ((error_num = spider_create_conn_thread(
spider->hs_r_conns[roop_count])))
DBUG_RETURN(error_num);
if ((error_num = spider_create_conn_thread(
spider->hs_w_conns[roop_count])))
DBUG_RETURN(error_num);
#endif
}
}
DBUG_RETURN(0);
}
int spider_create_conn_thread(
SPIDER_CONN *conn
) {
int error_num;
DBUG_ENTER("spider_create_conn_thread");
if (conn && !conn->bg_init)
{
#if MYSQL_VERSION_ID < 50500
if (pthread_mutex_init(&conn->bg_conn_chain_mutex, MY_MUTEX_INIT_FAST))
#else
if (mysql_mutex_init(spd_key_mutex_bg_conn_chain,
&conn->bg_conn_chain_mutex, MY_MUTEX_INIT_FAST))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_chain_mutex_init;
}
conn->bg_conn_chain_mutex_ptr = NULL;
#if MYSQL_VERSION_ID < 50500
if (pthread_mutex_init(&conn->bg_conn_sync_mutex, MY_MUTEX_INIT_FAST))
#else
if (mysql_mutex_init(spd_key_mutex_bg_conn_sync,
&conn->bg_conn_sync_mutex, MY_MUTEX_INIT_FAST))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_sync_mutex_init;
}
#if MYSQL_VERSION_ID < 50500
if (pthread_mutex_init(&conn->bg_conn_mutex, MY_MUTEX_INIT_FAST))
#else
if (mysql_mutex_init(spd_key_mutex_bg_conn, &conn->bg_conn_mutex,
MY_MUTEX_INIT_FAST))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_mutex_init;
}
#if MYSQL_VERSION_ID < 50500
if (pthread_mutex_init(&conn->bg_job_stack_mutex, MY_MUTEX_INIT_FAST))
#else
if (mysql_mutex_init(spd_key_mutex_bg_job_stack, &conn->bg_job_stack_mutex,
MY_MUTEX_INIT_FAST))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_job_stack_mutex_init;
}
if (SPD_INIT_DYNAMIC_ARRAY2(&conn->bg_job_stack, sizeof(void *), NULL, 16,
16, MYF(MY_WME)))
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_job_stack_init;
}
spider_alloc_calc_mem_init(conn->bg_job_stack, 163);
spider_alloc_calc_mem(spider_current_trx,
conn->bg_job_stack,
conn->bg_job_stack.max_element *
conn->bg_job_stack.size_of_element);
conn->bg_job_stack_cur_pos = 0;
#if MYSQL_VERSION_ID < 50500
if (pthread_cond_init(&conn->bg_conn_sync_cond, NULL))
#else
if (mysql_cond_init(spd_key_cond_bg_conn_sync,
&conn->bg_conn_sync_cond, NULL))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_sync_cond_init;
}
#if MYSQL_VERSION_ID < 50500
if (pthread_cond_init(&conn->bg_conn_cond, NULL))
#else
if (mysql_cond_init(spd_key_cond_bg_conn,
&conn->bg_conn_cond, NULL))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_cond_init;
}
pthread_mutex_lock(&conn->bg_conn_mutex);
#if MYSQL_VERSION_ID < 50500
if (pthread_create(&conn->bg_thread, &spider_pt_attr,
spider_bg_conn_action, (void *) conn)
)
#else
if (mysql_thread_create(spd_key_thd_bg, &conn->bg_thread,
&spider_pt_attr, spider_bg_conn_action, (void *) conn)
)
#endif
{
pthread_mutex_unlock(&conn->bg_conn_mutex);
error_num = HA_ERR_OUT_OF_MEM;
goto error_thread_create;
}
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_mutex_unlock(&conn->bg_conn_mutex);
pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
if (!conn->bg_init)
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_thread_create;
}
}
DBUG_RETURN(0);
error_thread_create:
pthread_cond_destroy(&conn->bg_conn_cond);
error_cond_init:
pthread_cond_destroy(&conn->bg_conn_sync_cond);
error_sync_cond_init:
spider_free_mem_calc(spider_current_trx,
conn->bg_job_stack_id,
conn->bg_job_stack.max_element *
conn->bg_job_stack.size_of_element);
delete_dynamic(&conn->bg_job_stack);
error_job_stack_init:
pthread_mutex_destroy(&conn->bg_job_stack_mutex);
error_job_stack_mutex_init:
pthread_mutex_destroy(&conn->bg_conn_mutex);
error_mutex_init:
pthread_mutex_destroy(&conn->bg_conn_sync_mutex);
error_sync_mutex_init:
pthread_mutex_destroy(&conn->bg_conn_chain_mutex);
error_chain_mutex_init:
DBUG_RETURN(error_num);
}
void spider_free_conn_thread(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_free_conn_thread");
if (conn->bg_init)
{
spider_bg_conn_break(conn, NULL);
pthread_mutex_lock(&conn->bg_conn_mutex);
conn->bg_kill = TRUE;
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_cond);
pthread_mutex_unlock(&conn->bg_conn_mutex);
pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
pthread_join(conn->bg_thread, NULL);
pthread_cond_destroy(&conn->bg_conn_cond);
pthread_cond_destroy(&conn->bg_conn_sync_cond);
spider_free_mem_calc(spider_current_trx,
conn->bg_job_stack_id,
conn->bg_job_stack.max_element *
conn->bg_job_stack.size_of_element);
delete_dynamic(&conn->bg_job_stack);
pthread_mutex_destroy(&conn->bg_job_stack_mutex);
pthread_mutex_destroy(&conn->bg_conn_mutex);
pthread_mutex_destroy(&conn->bg_conn_sync_mutex);
pthread_mutex_destroy(&conn->bg_conn_chain_mutex);
conn->bg_kill = FALSE;
conn->bg_init = FALSE;
}
DBUG_VOID_RETURN;
}
void spider_bg_conn_wait(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_bg_conn_wait");
if (conn->bg_init)
{
pthread_mutex_lock(&conn->bg_conn_mutex);
pthread_mutex_unlock(&conn->bg_conn_mutex);
}
DBUG_VOID_RETURN;
}
void spider_bg_all_conn_wait(
ha_spider *spider
) {
int roop_count;
SPIDER_CONN *conn;
SPIDER_SHARE *share = spider->share;
SPIDER_RESULT_LIST *result_list = &spider->result_list;
DBUG_ENTER("spider_bg_all_conn_wait");
for (
roop_count = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, -1, share->link_count,
SPIDER_LINK_STATUS_RECOVERY);
roop_count < (int) share->link_count;
roop_count = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, roop_count, share->link_count,
SPIDER_LINK_STATUS_RECOVERY)
) {
conn = spider->conns[roop_count];
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (conn && result_list->bgs_working)
spider_bg_conn_wait(conn);
#endif
}
DBUG_VOID_RETURN;
}
int spider_bg_all_conn_pre_next(
ha_spider *spider,
int link_idx
) {
#ifndef WITHOUT_SPIDER_BG_SEARCH
int roop_start, roop_end, roop_count, lock_mode, link_ok, error_num;
SPIDER_RESULT_LIST *result_list = &spider->result_list;
SPIDER_SHARE *share = spider->share;
#endif
DBUG_ENTER("spider_bg_all_conn_pre_next");
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (result_list->bgs_phase > 0)
{
lock_mode = spider_conn_lock_mode(spider);
if (lock_mode)
{
/* "for update" or "lock in share mode" */
link_ok = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, -1, share->link_count,
SPIDER_LINK_STATUS_OK);
roop_start = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, -1, share->link_count,
SPIDER_LINK_STATUS_RECOVERY);
roop_end = spider->share->link_count;
} else {
link_ok = link_idx;
roop_start = link_idx;
roop_end = link_idx + 1;
}
for (roop_count = roop_start; roop_count < roop_end;
roop_count = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, roop_count, share->link_count,
SPIDER_LINK_STATUS_RECOVERY)
) {
if ((error_num = spider_bg_conn_search(spider, roop_count, roop_start,
TRUE, TRUE, (roop_count != link_ok))))
DBUG_RETURN(error_num);
}
}
#endif
DBUG_RETURN(0);
}
void spider_bg_conn_break(
SPIDER_CONN *conn,
ha_spider *spider
) {
DBUG_ENTER("spider_bg_conn_break");
if (
conn->bg_init &&
conn->bg_thd != current_thd &&
(
!spider ||
(
spider->result_list.bgs_working &&
conn->bg_target == spider
)
)
) {
conn->bg_break = TRUE;
pthread_mutex_lock(&conn->bg_conn_mutex);
pthread_mutex_unlock(&conn->bg_conn_mutex);
conn->bg_break = FALSE;
}
DBUG_VOID_RETURN;
}
void spider_bg_all_conn_break(
ha_spider *spider
) {
int roop_count;
SPIDER_CONN *conn;
SPIDER_SHARE *share = spider->share;
SPIDER_RESULT_LIST *result_list = &spider->result_list;
DBUG_ENTER("spider_bg_all_conn_break");
for (
roop_count = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, -1, share->link_count,
SPIDER_LINK_STATUS_RECOVERY);
roop_count < (int) share->link_count;
roop_count = spider_conn_link_idx_next(share->link_statuses,
spider->conn_link_idx, roop_count, share->link_count,
SPIDER_LINK_STATUS_RECOVERY)
) {
conn = spider->conns[roop_count];
#ifndef WITHOUT_SPIDER_BG_SEARCH
if (conn && result_list->bgs_working)
spider_bg_conn_break(conn, spider);
#endif
if (spider->quick_targets[roop_count])
{
DBUG_ASSERT(spider->quick_targets[roop_count] == conn->quick_target);
DBUG_PRINT("info", ("spider conn[%p]->quick_target=NULL", conn));
conn->quick_target = NULL;
spider->quick_targets[roop_count] = NULL;
}
}
DBUG_VOID_RETURN;
}
bool spider_bg_conn_get_job(
SPIDER_CONN *conn
) {
DBUG_ENTER("spider_bg_conn_get_job");
pthread_mutex_lock(&conn->bg_job_stack_mutex);
if (conn->bg_job_stack_cur_pos >= conn->bg_job_stack.elements)
{
DBUG_PRINT("info",("spider bg all jobs are completed"));
conn->bg_get_job_stack_off = FALSE;
pthread_mutex_unlock(&conn->bg_job_stack_mutex);
DBUG_RETURN(FALSE);
}
DBUG_PRINT("info",("spider bg get job %u",
conn->bg_job_stack_cur_pos));
conn->bg_target = ((void **) (conn->bg_job_stack.buffer +
conn->bg_job_stack.size_of_element * conn->bg_job_stack_cur_pos))[0];
conn->bg_job_stack_cur_pos++;
if (conn->bg_job_stack_cur_pos == conn->bg_job_stack.elements)
{
DBUG_PRINT("info",("spider bg shift job stack"));
conn->bg_job_stack_cur_pos = 0;
conn->bg_job_stack.elements = 0;
}
pthread_mutex_unlock(&conn->bg_job_stack_mutex);
DBUG_RETURN(TRUE);
}
int spider_bg_conn_search(
ha_spider *spider,
int link_idx,
int first_link_idx,
bool first,
bool pre_next,
bool discard_result
) {
int error_num;
SPIDER_CONN *conn, *first_conn = NULL;
SPIDER_RESULT_LIST *result_list = &spider->result_list;
bool with_lock = FALSE;
DBUG_ENTER("spider_bg_conn_search");
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_MYSQL)
{
#endif
conn = spider->conns[link_idx];
with_lock = (spider_conn_lock_mode(spider) != SPIDER_LOCK_MODE_NO_LOCK);
first_conn = spider->conns[first_link_idx];
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_HS_READ)
conn = spider->hs_r_conns[link_idx];
else
conn = spider->hs_w_conns[link_idx];
#endif
if (first)
{
if (spider->use_pre_call)
{
DBUG_PRINT("info",("spider skip bg first search"));
} else {
DBUG_PRINT("info",("spider bg first search"));
pthread_mutex_lock(&conn->bg_conn_mutex);
result_list->bgs_working = TRUE;
conn->bg_search = TRUE;
conn->bg_caller_wait = TRUE;
conn->bg_target = spider;
conn->link_idx = link_idx;
conn->bg_discard_result = discard_result;
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_cond);
pthread_mutex_unlock(&conn->bg_conn_mutex);
pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
conn->bg_caller_wait = FALSE;
if (result_list->bgs_error)
{
if (result_list->bgs_error_with_message)
my_message(result_list->bgs_error,
result_list->bgs_error_msg, MYF(0));
DBUG_RETURN(result_list->bgs_error);
}
}
if (!result_list->finish_flg)
{
pthread_mutex_lock(&conn->bg_conn_mutex);
if (!result_list->finish_flg)
{
DBUG_PRINT("info",("spider bg second search"));
if (!spider->use_pre_call || pre_next)
{
if (result_list->bgs_error)
{
pthread_mutex_unlock(&conn->bg_conn_mutex);
DBUG_PRINT("info",("spider bg error"));
if (result_list->bgs_error == HA_ERR_END_OF_FILE)
{
DBUG_PRINT("info",("spider bg current->finish_flg=%s",
result_list->current ?
(result_list->current->finish_flg ? "TRUE" : "FALSE") : "NULL"));
DBUG_RETURN(0);
}
if (result_list->bgs_error_with_message)
my_message(result_list->bgs_error,
result_list->bgs_error_msg, MYF(0));
DBUG_RETURN(result_list->bgs_error);
}
if (
result_list->quick_mode == 0 ||
!result_list->bgs_current->result
) {
result_list->split_read =
result_list->bgs_second_read > 0 ?
result_list->bgs_second_read :
result_list->bgs_split_read;
result_list->limit_num =
result_list->internal_limit - result_list->record_num >=
result_list->split_read ?
result_list->split_read :
result_list->internal_limit - result_list->record_num;
if (spider->sql_kinds & SPIDER_SQL_KIND_SQL)
{
if ((error_num = spider->reappend_limit_sql_part(
result_list->internal_offset + result_list->record_num,
result_list->limit_num,
SPIDER_SQL_TYPE_SELECT_SQL)))
{
pthread_mutex_unlock(&conn->bg_conn_mutex);
DBUG_RETURN(error_num);
}
if (
!result_list->use_union &&
(error_num = spider->append_select_lock_sql_part(
SPIDER_SQL_TYPE_SELECT_SQL))
) {
pthread_mutex_unlock(&conn->bg_conn_mutex);
DBUG_RETURN(error_num);
}
}
if (spider->sql_kinds & SPIDER_SQL_KIND_HANDLER)
{
spider_db_append_handler_next(spider);
if ((error_num = spider->reappend_limit_sql_part(
0, result_list->limit_num,
SPIDER_SQL_TYPE_HANDLER)))
{
pthread_mutex_unlock(&conn->bg_conn_mutex);
DBUG_RETURN(error_num);
}
}
}
result_list->bgs_phase = 2;
}
result_list->bgs_working = TRUE;
conn->bg_search = TRUE;
if (with_lock)
conn->bg_conn_chain_mutex_ptr = &first_conn->bg_conn_chain_mutex;
conn->bg_caller_sync_wait = TRUE;
conn->bg_target = spider;
conn->link_idx = link_idx;
conn->bg_discard_result = discard_result;
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_cond);
pthread_mutex_unlock(&conn->bg_conn_mutex);
pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
conn->bg_caller_sync_wait = FALSE;
} else {
pthread_mutex_unlock(&conn->bg_conn_mutex);
DBUG_PRINT("info",("spider bg current->finish_flg=%s",
result_list->current ?
(result_list->current->finish_flg ? "TRUE" : "FALSE") : "NULL"));
}
} else {
DBUG_PRINT("info",("spider bg current->finish_flg=%s",
result_list->current ?
(result_list->current->finish_flg ? "TRUE" : "FALSE") : "NULL"));
}
} else {
DBUG_PRINT("info",("spider bg search"));
if (result_list->current->finish_flg)
{
DBUG_PRINT("info",("spider bg end of file"));
result_list->table->status = STATUS_NOT_FOUND;
DBUG_RETURN(HA_ERR_END_OF_FILE);
}
if (result_list->bgs_working)
{
/* wait */
DBUG_PRINT("info",("spider bg working wait"));
pthread_mutex_lock(&conn->bg_conn_mutex);
pthread_mutex_unlock(&conn->bg_conn_mutex);
}
if (result_list->bgs_error)
{
DBUG_PRINT("info",("spider bg error"));
if (result_list->bgs_error == HA_ERR_END_OF_FILE)
{
result_list->current = result_list->current->next;
result_list->current_row_num = 0;
result_list->table->status = STATUS_NOT_FOUND;
}
if (result_list->bgs_error_with_message)
my_message(result_list->bgs_error,
result_list->bgs_error_msg, MYF(0));
DBUG_RETURN(result_list->bgs_error);
}
result_list->current = result_list->current->next;
result_list->current_row_num = 0;
if (result_list->current == result_list->bgs_current)
{
DBUG_PRINT("info",("spider bg next search"));
if (!result_list->current->finish_flg)
{
pthread_mutex_lock(&conn->bg_conn_mutex);
result_list->bgs_phase = 3;
if (
result_list->quick_mode == 0 ||
!result_list->bgs_current->result
) {
result_list->split_read = result_list->bgs_split_read;
result_list->limit_num =
result_list->internal_limit - result_list->record_num >=
result_list->split_read ?
result_list->split_read :
result_list->internal_limit - result_list->record_num;
if (spider->sql_kinds & SPIDER_SQL_KIND_SQL)
{
if ((error_num = spider->reappend_limit_sql_part(
result_list->internal_offset + result_list->record_num,
result_list->limit_num,
SPIDER_SQL_TYPE_SELECT_SQL)))
{
pthread_mutex_unlock(&conn->bg_conn_mutex);
DBUG_RETURN(error_num);
}
if (
!result_list->use_union &&
(error_num = spider->append_select_lock_sql_part(
SPIDER_SQL_TYPE_SELECT_SQL))
) {
pthread_mutex_unlock(&conn->bg_conn_mutex);
DBUG_RETURN(error_num);
}
}
if (spider->sql_kinds & SPIDER_SQL_KIND_HANDLER)
{
spider_db_append_handler_next(spider);
if ((error_num = spider->reappend_limit_sql_part(
0, result_list->limit_num,
SPIDER_SQL_TYPE_HANDLER)))
{
pthread_mutex_unlock(&conn->bg_conn_mutex);
DBUG_RETURN(error_num);
}
}
}
conn->bg_target = spider;
conn->link_idx = link_idx;
conn->bg_discard_result = discard_result;
result_list->bgs_working = TRUE;
conn->bg_search = TRUE;
if (with_lock)
conn->bg_conn_chain_mutex_ptr = &first_conn->bg_conn_chain_mutex;
conn->bg_caller_sync_wait = TRUE;
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_cond);
pthread_mutex_unlock(&conn->bg_conn_mutex);
pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
conn->bg_caller_sync_wait = FALSE;
}
}
}
DBUG_RETURN(0);
}
void spider_bg_conn_simple_action(
SPIDER_CONN *conn,
uint simple_action,
bool caller_wait,
void *target,
uint link_idx,
int *error_num
) {
DBUG_ENTER("spider_bg_conn_simple_action");
pthread_mutex_lock(&conn->bg_conn_mutex);
conn->bg_target = target;
conn->link_idx = link_idx;
conn->bg_simple_action = simple_action;
conn->bg_error_num = error_num;
if (caller_wait)
{
conn->bg_caller_wait = TRUE;
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
} else {
conn->bg_caller_sync_wait = TRUE;
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
}
pthread_cond_signal(&conn->bg_conn_cond);
pthread_mutex_unlock(&conn->bg_conn_mutex);
if (caller_wait)
{
pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
conn->bg_caller_wait = FALSE;
} else {
pthread_cond_wait(&conn->bg_conn_sync_cond, &conn->bg_conn_sync_mutex);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
conn->bg_caller_sync_wait = FALSE;
}
DBUG_VOID_RETURN;
}
void *spider_bg_conn_action(
void *arg
) {
int error_num;
SPIDER_CONN *conn = (SPIDER_CONN*) arg;
SPIDER_TRX *trx;
ha_spider *spider;
SPIDER_RESULT_LIST *result_list;
THD *thd;
my_thread_init();
DBUG_ENTER("spider_bg_conn_action");
/* init start */
if (!(thd = new THD(next_thread_id())))
{
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_sync_cond);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
my_thread_end();
DBUG_RETURN(NULL);
}
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
thd->thread_stack = (char*) &thd;
thd->store_globals();
if (!(trx = spider_get_trx(thd, FALSE, &error_num)))
{
delete thd;
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_sync_cond);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
DBUG_RETURN(NULL);
}
/* lex_start(thd); */
conn->bg_thd = thd;
pthread_mutex_lock(&conn->bg_conn_mutex);
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_sync_cond);
conn->bg_init = TRUE;
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
/* init end */
while (TRUE)
{
if (conn->bg_conn_chain_mutex_ptr)
{
pthread_mutex_unlock(conn->bg_conn_chain_mutex_ptr);
conn->bg_conn_chain_mutex_ptr = NULL;
}
thd->clear_error();
pthread_cond_wait(&conn->bg_conn_cond, &conn->bg_conn_mutex);
DBUG_PRINT("info",("spider bg roop start"));
#ifndef DBUG_OFF
DBUG_PRINT("info",("spider conn->thd=%p", conn->thd));
if (conn->thd)
{
DBUG_PRINT("info",("spider query_id=%lld", conn->thd->query_id));
}
#endif
if (conn->bg_caller_sync_wait)
{
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
if (conn->bg_direct_sql)
conn->bg_get_job_stack_off = TRUE;
pthread_cond_signal(&conn->bg_conn_sync_cond);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
if (conn->bg_conn_chain_mutex_ptr)
{
pthread_mutex_lock(conn->bg_conn_chain_mutex_ptr);
if ((&conn->bg_conn_chain_mutex) != conn->bg_conn_chain_mutex_ptr)
{
pthread_mutex_unlock(conn->bg_conn_chain_mutex_ptr);
conn->bg_conn_chain_mutex_ptr = NULL;
}
}
}
if (conn->bg_kill)
{
DBUG_PRINT("info",("spider bg kill start"));
if (conn->bg_conn_chain_mutex_ptr)
{
pthread_mutex_unlock(conn->bg_conn_chain_mutex_ptr);
conn->bg_conn_chain_mutex_ptr = NULL;
}
spider_free_trx(trx, TRUE);
/* lex_end(thd->lex); */
delete thd;
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_sync_cond);
pthread_mutex_unlock(&conn->bg_conn_mutex);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
DBUG_RETURN(NULL);
}
if (conn->bg_get_job_stack)
{
conn->bg_get_job_stack = FALSE;
if (!spider_bg_conn_get_job(conn))
{
conn->bg_direct_sql = FALSE;
}
}
if (conn->bg_search)
{
SPIDER_SHARE *share;
spider_db_handler *dbton_handler;
DBUG_PRINT("info",("spider bg search start"));
spider = (ha_spider*) conn->bg_target;
share = spider->share;
dbton_handler = spider->dbton_handler[conn->dbton_id];
result_list = &spider->result_list;
result_list->bgs_error = 0;
result_list->bgs_error_with_message = FALSE;
if (
result_list->quick_mode == 0 ||
result_list->bgs_phase == 1 ||
!result_list->bgs_current->result
) {
ulong sql_type;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (conn->conn_kind == SPIDER_CONN_KIND_MYSQL)
{
#endif
if (spider->sql_kind[conn->link_idx] == SPIDER_SQL_KIND_SQL)
{
sql_type = SPIDER_SQL_TYPE_SELECT_SQL | SPIDER_SQL_TYPE_TMP_SQL;
} else {
sql_type = SPIDER_SQL_TYPE_HANDLER;
}
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else {
sql_type = SPIDER_SQL_TYPE_SELECT_HS;
}
#endif
if (dbton_handler->need_lock_before_set_sql_for_exec(sql_type))
{
pthread_mutex_lock(&conn->mta_conn_mutex);
SPIDER_SET_FILE_POS(&conn->mta_conn_mutex_file_pos);
}
if ((error_num = dbton_handler->set_sql_for_exec(sql_type,
conn->link_idx)))
{
result_list->bgs_error = error_num;
if ((result_list->bgs_error_with_message = thd->is_error()))
strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
}
if (!dbton_handler->need_lock_before_set_sql_for_exec(sql_type))
{
pthread_mutex_lock(&conn->mta_conn_mutex);
SPIDER_SET_FILE_POS(&conn->mta_conn_mutex_file_pos);
}
sql_type &= ~SPIDER_SQL_TYPE_TMP_SQL;
DBUG_PRINT("info",("spider sql_type=%lu", sql_type));
#ifdef HA_CAN_BULK_ACCESS
if (spider->is_bulk_access_clone)
{
spider->connection_ids[conn->link_idx] = conn->connection_id;
spider_trx_add_bulk_access_conn(spider->trx, conn);
}
#endif
if (!result_list->bgs_error)
{
conn->need_mon = &spider->need_mons[conn->link_idx];
conn->mta_conn_mutex_lock_already = TRUE;
conn->mta_conn_mutex_unlock_later = TRUE;
#ifdef HA_CAN_BULK_ACCESS
if (!spider->is_bulk_access_clone)
{
#endif
if (!(result_list->bgs_error =
spider_db_set_names(spider, conn, conn->link_idx)))
{
if (
result_list->tmp_table_join && spider->bka_mode != 2 &&
spider_bit_is_set(result_list->tmp_table_join_first,
conn->link_idx)
) {
spider_clear_bit(result_list->tmp_table_join_first,
conn->link_idx);
spider_set_bit(result_list->tmp_table_created,
conn->link_idx);
result_list->tmp_tables_created = TRUE;
spider_conn_set_timeout_from_share(conn, conn->link_idx,
spider->trx->thd, share);
if (dbton_handler->execute_sql(
SPIDER_SQL_TYPE_TMP_SQL,
conn,
-1,
&spider->need_mons[conn->link_idx])
) {
result_list->bgs_error = spider_db_errorno(conn);
if ((result_list->bgs_error_with_message = thd->is_error()))
strmov(result_list->bgs_error_msg,
spider_stmt_da_message(thd));
} else
spider_db_discard_multiple_result(spider, conn->link_idx,
conn);
}
if (!result_list->bgs_error)
{
spider_conn_set_timeout_from_share(conn, conn->link_idx,
spider->trx->thd, share);
if (dbton_handler->execute_sql(
sql_type,
conn,
result_list->quick_mode,
&spider->need_mons[conn->link_idx])
) {
result_list->bgs_error = spider_db_errorno(conn);
if ((result_list->bgs_error_with_message = thd->is_error()))
strmov(result_list->bgs_error_msg,
spider_stmt_da_message(thd));
} else {
spider->connection_ids[conn->link_idx] = conn->connection_id;
if (!conn->bg_discard_result)
{
if (!(result_list->bgs_error =
spider_db_store_result(spider, conn->link_idx,
result_list->table)))
spider->result_link_idx = conn->link_idx;
else {
if ((result_list->bgs_error_with_message =
thd->is_error()))
strmov(result_list->bgs_error_msg,
spider_stmt_da_message(thd));
}
} else {
result_list->bgs_error = 0;
spider_db_discard_result(spider, conn->link_idx, conn);
}
}
}
} else {
if ((result_list->bgs_error_with_message = thd->is_error()))
strmov(result_list->bgs_error_msg,
spider_stmt_da_message(thd));
}
#ifdef HA_CAN_BULK_ACCESS
}
#endif
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
SPIDER_CLEAR_FILE_POS(&conn->mta_conn_mutex_file_pos);
pthread_mutex_unlock(&conn->mta_conn_mutex);
} else {
SPIDER_CLEAR_FILE_POS(&conn->mta_conn_mutex_file_pos);
pthread_mutex_unlock(&conn->mta_conn_mutex);
}
} else {
spider->connection_ids[conn->link_idx] = conn->connection_id;
conn->mta_conn_mutex_unlock_later = TRUE;
result_list->bgs_error =
spider_db_store_result(spider, conn->link_idx, result_list->table);
if ((result_list->bgs_error_with_message = thd->is_error()))
strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
conn->mta_conn_mutex_unlock_later = FALSE;
}
conn->bg_search = FALSE;
result_list->bgs_working = FALSE;
if (conn->bg_caller_wait)
{
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_sync_cond);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
}
continue;
}
if (conn->bg_direct_sql)
{
bool is_error = FALSE;
DBUG_PRINT("info",("spider bg direct sql start"));
do {
SPIDER_DIRECT_SQL *direct_sql = (SPIDER_DIRECT_SQL *) conn->bg_target;
if (
(error_num = spider_db_udf_direct_sql(direct_sql))
) {
if (thd->is_error())
{
if (
direct_sql->error_rw_mode &&
spider_db_conn_is_network_error(error_num)
) {
thd->clear_error();
} else {
SPIDER_BG_DIRECT_SQL *bg_direct_sql =
(SPIDER_BG_DIRECT_SQL *) direct_sql->parent;
pthread_mutex_lock(direct_sql->bg_mutex);
bg_direct_sql->bg_error = spider_stmt_da_sql_errno(thd);
strmov((char *) bg_direct_sql->bg_error_msg,
spider_stmt_da_message(thd));
pthread_mutex_unlock(direct_sql->bg_mutex);
is_error = TRUE;
}
}
}
if (direct_sql->modified_non_trans_table)
{
SPIDER_BG_DIRECT_SQL *bg_direct_sql =
(SPIDER_BG_DIRECT_SQL *) direct_sql->parent;
pthread_mutex_lock(direct_sql->bg_mutex);
bg_direct_sql->modified_non_trans_table = TRUE;
pthread_mutex_unlock(direct_sql->bg_mutex);
}
spider_udf_free_direct_sql_alloc(direct_sql, TRUE);
} while (!is_error && spider_bg_conn_get_job(conn));
if (is_error)
{
while (spider_bg_conn_get_job(conn))
spider_udf_free_direct_sql_alloc(
(SPIDER_DIRECT_SQL *) conn->bg_target, TRUE);
}
conn->bg_direct_sql = FALSE;
continue;
}
if (conn->bg_exec_sql)
{
DBUG_PRINT("info",("spider bg exec sql start"));
spider = (ha_spider*) conn->bg_target;
*conn->bg_error_num = spider_db_query_with_set_names(
conn->bg_sql_type,
spider,
conn,
conn->link_idx
);
conn->bg_exec_sql = FALSE;
continue;
}
if (conn->bg_simple_action)
{
switch (conn->bg_simple_action)
{
case SPIDER_BG_SIMPLE_CONNECT:
conn->db_conn->bg_connect();
break;
case SPIDER_BG_SIMPLE_DISCONNECT:
conn->db_conn->bg_disconnect();
break;
case SPIDER_BG_SIMPLE_RECORDS:
DBUG_PRINT("info",("spider bg simple records"));
spider = (ha_spider*) conn->bg_target;
*conn->bg_error_num =
spider->dbton_handler[conn->dbton_id]->
show_records(conn->link_idx);
break;
default:
break;
}
conn->bg_simple_action = SPIDER_BG_SIMPLE_NO_ACTION;
if (conn->bg_caller_wait)
{
pthread_mutex_lock(&conn->bg_conn_sync_mutex);
pthread_cond_signal(&conn->bg_conn_sync_cond);
pthread_mutex_unlock(&conn->bg_conn_sync_mutex);
}
continue;
}
if (conn->bg_break)
{
DBUG_PRINT("info",("spider bg break start"));
spider = (ha_spider*) conn->bg_target;
result_list = &spider->result_list;
result_list->bgs_working = FALSE;
continue;
}
}
}
int spider_create_sts_thread(
SPIDER_SHARE *share
) {
int error_num;
DBUG_ENTER("spider_create_sts_thread");
if (!share->bg_sts_init)
{
#if MYSQL_VERSION_ID < 50500
if (pthread_cond_init(&share->bg_sts_cond, NULL))
#else
if (mysql_cond_init(spd_key_cond_bg_sts,
&share->bg_sts_cond, NULL))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_cond_init;
}
#if MYSQL_VERSION_ID < 50500
if (pthread_cond_init(&share->bg_sts_sync_cond, NULL))
#else
if (mysql_cond_init(spd_key_cond_bg_sts_sync,
&share->bg_sts_sync_cond, NULL))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_sync_cond_init;
}
#if MYSQL_VERSION_ID < 50500
if (pthread_create(&share->bg_sts_thread, &spider_pt_attr,
spider_bg_sts_action, (void *) share)
)
#else
if (mysql_thread_create(spd_key_thd_bg_sts, &share->bg_sts_thread,
&spider_pt_attr, spider_bg_sts_action, (void *) share)
)
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_thread_create;
}
share->bg_sts_init = TRUE;
}
DBUG_RETURN(0);
error_thread_create:
pthread_cond_destroy(&share->bg_sts_sync_cond);
error_sync_cond_init:
pthread_cond_destroy(&share->bg_sts_cond);
error_cond_init:
DBUG_RETURN(error_num);
}
void spider_free_sts_thread(
SPIDER_SHARE *share
) {
DBUG_ENTER("spider_free_sts_thread");
if (share->bg_sts_init)
{
pthread_mutex_lock(&share->sts_mutex);
share->bg_sts_kill = TRUE;
pthread_cond_signal(&share->bg_sts_cond);
pthread_cond_wait(&share->bg_sts_sync_cond, &share->sts_mutex);
pthread_mutex_unlock(&share->sts_mutex);
pthread_join(share->bg_sts_thread, NULL);
pthread_cond_destroy(&share->bg_sts_sync_cond);
pthread_cond_destroy(&share->bg_sts_cond);
share->bg_sts_thd_wait = FALSE;
share->bg_sts_kill = FALSE;
share->bg_sts_init = FALSE;
}
DBUG_VOID_RETURN;
}
void *spider_bg_sts_action(
void *arg
) {
SPIDER_SHARE *share = (SPIDER_SHARE*) arg;
SPIDER_TRX *trx;
int error_num = 0, roop_count;
ha_spider spider;
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
int *need_mons;
SPIDER_CONN **conns;
uint *conn_link_idx;
uchar *conn_can_fo;
char **conn_keys;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
char **hs_r_conn_keys;
char **hs_w_conn_keys;
#endif
spider_db_handler **dbton_hdl;
#else
int need_mons[share->link_count];
SPIDER_CONN *conns[share->link_count];
uint conn_link_idx[share->link_count];
uchar conn_can_fo[share->link_bitmap_size];
char *conn_keys[share->link_count];
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
char *hs_r_conn_keys[share->link_count];
char *hs_w_conn_keys[share->link_count];
#endif
spider_db_handler *dbton_hdl[SPIDER_DBTON_SIZE];
#endif
THD *thd;
my_thread_init();
DBUG_ENTER("spider_bg_sts_action");
/* init start */
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (!(need_mons = (int *)
spider_bulk_malloc(spider_current_trx, 21, MYF(MY_WME),
&need_mons, sizeof(int) * share->link_count,
&conns, sizeof(SPIDER_CONN *) * share->link_count,
&conn_link_idx, sizeof(uint) * share->link_count,
&conn_can_fo, sizeof(uchar) * share->link_bitmap_size,
&conn_keys, sizeof(char *) * share->link_count,
&hs_r_conn_keys, sizeof(char *) * share->link_count,
&hs_w_conn_keys, sizeof(char *) * share->link_count,
&dbton_hdl, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
NullS))
)
#else
if (!(need_mons = (int *)
spider_bulk_malloc(spider_current_trx, 21, MYF(MY_WME),
&need_mons, sizeof(int) * share->link_count,
&conns, sizeof(SPIDER_CONN *) * share->link_count,
&conn_link_idx, sizeof(uint) * share->link_count,
&conn_can_fo, sizeof(uchar) * share->link_bitmap_size,
&conn_keys, sizeof(char *) * share->link_count,
&dbton_hdl, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
NullS))
)
#endif
{
pthread_mutex_lock(&share->sts_mutex);
share->bg_sts_thd_wait = FALSE;
share->bg_sts_kill = FALSE;
share->bg_sts_init = FALSE;
pthread_mutex_unlock(&share->sts_mutex);
my_thread_end();
DBUG_RETURN(NULL);
}
#endif
pthread_mutex_lock(&share->sts_mutex);
if (!(thd = new THD(next_thread_id())))
{
share->bg_sts_thd_wait = FALSE;
share->bg_sts_kill = FALSE;
share->bg_sts_init = FALSE;
pthread_mutex_unlock(&share->sts_mutex);
my_thread_end();
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(NULL, need_mons, MYF(MY_WME));
#endif
DBUG_RETURN(NULL);
}
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
thd->thread_stack = (char*) &thd;
thd->store_globals();
if (!(trx = spider_get_trx(thd, FALSE, &error_num)))
{
delete thd;
share->bg_sts_thd_wait = FALSE;
share->bg_sts_kill = FALSE;
share->bg_sts_init = FALSE;
pthread_mutex_unlock(&share->sts_mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(NULL, need_mons, MYF(MY_WME));
#endif
DBUG_RETURN(NULL);
}
share->bg_sts_thd = thd;
spider.trx = trx;
spider.share = share;
spider.conns = conns;
spider.conn_link_idx = conn_link_idx;
spider.conn_can_fo = conn_can_fo;
spider.need_mons = need_mons;
spider.conn_keys_first_ptr = share->conn_keys[0];
spider.conn_keys = conn_keys;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
spider.hs_r_conn_keys = hs_r_conn_keys;
spider.hs_w_conn_keys = hs_w_conn_keys;
#endif
spider.dbton_handler = dbton_hdl;
memset(conns, 0, sizeof(SPIDER_CONN *) * share->link_count);
memset(need_mons, 0, sizeof(int) * share->link_count);
memset(dbton_hdl, 0, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE);
spider_trx_set_link_idx_for_all(&spider);
spider.search_link_idx = spider_conn_first_link_idx(thd,
share->link_statuses, share->access_balances, spider.conn_link_idx,
share->link_count, SPIDER_LINK_STATUS_OK);
for (roop_count = 0; roop_count < SPIDER_DBTON_SIZE; roop_count++)
{
if (
spider_bit_is_set(share->dbton_bitmap, roop_count) &&
spider_dbton[roop_count].create_db_handler
) {
if ((dbton_hdl[roop_count] = spider_dbton[roop_count].create_db_handler(
&spider, share->dbton_share[roop_count])))
break;
if (dbton_hdl[roop_count]->init())
break;
}
}
if (roop_count == SPIDER_DBTON_SIZE)
{
DBUG_PRINT("info",("spider handler init error"));
for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
{
if (
spider_bit_is_set(share->dbton_bitmap, roop_count) &&
dbton_hdl[roop_count]
) {
delete dbton_hdl[roop_count];
dbton_hdl[roop_count] = NULL;
}
}
spider_free_trx(trx, TRUE);
delete thd;
share->bg_sts_thd_wait = FALSE;
share->bg_sts_kill = FALSE;
share->bg_sts_init = FALSE;
pthread_mutex_unlock(&share->sts_mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(NULL, need_mons, MYF(MY_WME));
#endif
DBUG_RETURN(NULL);
}
/* init end */
while (TRUE)
{
DBUG_PRINT("info",("spider bg sts roop start"));
if (share->bg_sts_kill)
{
DBUG_PRINT("info",("spider bg sts kill start"));
for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
{
if (
spider_bit_is_set(share->dbton_bitmap, roop_count) &&
dbton_hdl[roop_count]
) {
delete dbton_hdl[roop_count];
dbton_hdl[roop_count] = NULL;
}
}
spider_free_trx(trx, TRUE);
delete thd;
pthread_cond_signal(&share->bg_sts_sync_cond);
pthread_mutex_unlock(&share->sts_mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(NULL, need_mons, MYF(MY_WME));
#endif
DBUG_RETURN(NULL);
}
if (spider.search_link_idx == -1)
{
spider_trx_set_link_idx_for_all(&spider);
/*
spider.search_link_idx = spider_conn_next_link_idx(
thd, share->link_statuses, share->access_balances,
spider.conn_link_idx, spider.search_link_idx, share->link_count,
SPIDER_LINK_STATUS_OK);
*/
spider.search_link_idx = spider_conn_first_link_idx(thd,
share->link_statuses, share->access_balances, spider.conn_link_idx,
share->link_count, SPIDER_LINK_STATUS_OK);
}
if (spider.search_link_idx >= 0)
{
if (difftime(share->bg_sts_try_time, share->sts_get_time) >=
share->bg_sts_interval)
{
if (!conns[spider.search_link_idx])
{
spider_get_conn(share, spider.search_link_idx,
share->conn_keys[spider.search_link_idx],
trx, &spider, FALSE, FALSE, SPIDER_CONN_KIND_MYSQL,
&error_num);
conns[spider.search_link_idx]->error_mode = 0;
/*
if (
error_num &&
share->monitoring_kind[spider.search_link_idx] &&
need_mons[spider.search_link_idx]
) {
lex_start(thd);
error_num = spider_ping_table_mon_from_table(
trx,
thd,
share,
(uint32) share->monitoring_sid[spider.search_link_idx],
share->table_name,
share->table_name_length,
spider.conn_link_idx[spider.search_link_idx],
NULL,
0,
share->monitoring_kind[spider.search_link_idx],
share->monitoring_limit[spider.search_link_idx],
share->monitoring_flag[spider.search_link_idx],
TRUE
);
lex_end(thd->lex);
}
*/
spider.search_link_idx = -1;
}
if (spider.search_link_idx != -1 && conns[spider.search_link_idx])
{
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (spider_get_sts(share, spider.search_link_idx,
share->bg_sts_try_time, &spider,
share->bg_sts_interval, share->bg_sts_mode,
share->bg_sts_sync,
2, HA_STATUS_CONST | HA_STATUS_VARIABLE))
#else
if (spider_get_sts(share, spider.search_link_idx,
share->bg_sts_try_time, &spider,
share->bg_sts_interval, share->bg_sts_mode,
2, HA_STATUS_CONST | HA_STATUS_VARIABLE))
#endif
{
/*
if (
share->monitoring_kind[spider.search_link_idx] &&
need_mons[spider.search_link_idx]
) {
lex_start(thd);
error_num = spider_ping_table_mon_from_table(
trx,
thd,
share,
(uint32) share->monitoring_sid[spider.search_link_idx],
share->table_name,
share->table_name_length,
spider.conn_link_idx[spider.search_link_idx],
NULL,
0,
share->monitoring_kind[spider.search_link_idx],
share->monitoring_limit[spider.search_link_idx],
share->monitoring_flag[spider.search_link_idx],
TRUE
);
lex_end(thd->lex);
}
*/
spider.search_link_idx = -1;
}
}
}
}
memset(need_mons, 0, sizeof(int) * share->link_count);
share->bg_sts_thd_wait = TRUE;
pthread_cond_wait(&share->bg_sts_cond, &share->sts_mutex);
}
}
int spider_create_crd_thread(
SPIDER_SHARE *share
) {
int error_num;
DBUG_ENTER("spider_create_crd_thread");
if (!share->bg_crd_init)
{
#if MYSQL_VERSION_ID < 50500
if (pthread_cond_init(&share->bg_crd_cond, NULL))
#else
if (mysql_cond_init(spd_key_cond_bg_crd,
&share->bg_crd_cond, NULL))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_cond_init;
}
#if MYSQL_VERSION_ID < 50500
if (pthread_cond_init(&share->bg_crd_sync_cond, NULL))
#else
if (mysql_cond_init(spd_key_cond_bg_crd_sync,
&share->bg_crd_sync_cond, NULL))
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_sync_cond_init;
}
#if MYSQL_VERSION_ID < 50500
if (pthread_create(&share->bg_crd_thread, &spider_pt_attr,
spider_bg_crd_action, (void *) share)
)
#else
if (mysql_thread_create(spd_key_thd_bg_crd, &share->bg_crd_thread,
&spider_pt_attr, spider_bg_crd_action, (void *) share)
)
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_thread_create;
}
share->bg_crd_init = TRUE;
}
DBUG_RETURN(0);
error_thread_create:
pthread_cond_destroy(&share->bg_crd_sync_cond);
error_sync_cond_init:
pthread_cond_destroy(&share->bg_crd_cond);
error_cond_init:
DBUG_RETURN(error_num);
}
void spider_free_crd_thread(
SPIDER_SHARE *share
) {
DBUG_ENTER("spider_free_crd_thread");
if (share->bg_crd_init)
{
pthread_mutex_lock(&share->crd_mutex);
share->bg_crd_kill = TRUE;
pthread_cond_signal(&share->bg_crd_cond);
pthread_cond_wait(&share->bg_crd_sync_cond, &share->crd_mutex);
pthread_mutex_unlock(&share->crd_mutex);
pthread_join(share->bg_crd_thread, NULL);
pthread_cond_destroy(&share->bg_crd_sync_cond);
pthread_cond_destroy(&share->bg_crd_cond);
share->bg_crd_thd_wait = FALSE;
share->bg_crd_kill = FALSE;
share->bg_crd_init = FALSE;
}
DBUG_VOID_RETURN;
}
void *spider_bg_crd_action(
void *arg
) {
SPIDER_SHARE *share = (SPIDER_SHARE*) arg;
SPIDER_TRX *trx;
int error_num = 0, roop_count;
ha_spider spider;
TABLE table;
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
int *need_mons;
SPIDER_CONN **conns;
uint *conn_link_idx;
uchar *conn_can_fo;
char **conn_keys;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
char **hs_r_conn_keys;
char **hs_w_conn_keys;
#endif
spider_db_handler **dbton_hdl;
#else
int need_mons[share->link_count];
SPIDER_CONN *conns[share->link_count];
uint conn_link_idx[share->link_count];
uchar conn_can_fo[share->link_bitmap_size];
char *conn_keys[share->link_count];
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
char *hs_r_conn_keys[share->link_count];
char *hs_w_conn_keys[share->link_count];
#endif
spider_db_handler *dbton_hdl[SPIDER_DBTON_SIZE];
#endif
THD *thd;
my_thread_init();
DBUG_ENTER("spider_bg_crd_action");
/* init start */
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (!(need_mons = (int *)
spider_bulk_malloc(spider_current_trx, 22, MYF(MY_WME),
&need_mons, sizeof(int) * share->link_count,
&conns, sizeof(SPIDER_CONN *) * share->link_count,
&conn_link_idx, sizeof(uint) * share->link_count,
&conn_can_fo, sizeof(uchar) * share->link_bitmap_size,
&conn_keys, sizeof(char *) * share->link_count,
&hs_r_conn_keys, sizeof(char *) * share->link_count,
&hs_w_conn_keys, sizeof(char *) * share->link_count,
&dbton_hdl, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
NullS))
)
#else
if (!(need_mons = (int *)
spider_bulk_malloc(spider_current_trx, 22, MYF(MY_WME),
&need_mons, sizeof(int) * share->link_count,
&conns, sizeof(SPIDER_CONN *) * share->link_count,
&conn_link_idx, sizeof(uint) * share->link_count,
&conn_can_fo, sizeof(uchar) * share->link_bitmap_size,
&conn_keys, sizeof(char *) * share->link_count,
&dbton_hdl, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE,
NullS))
)
#endif
{
pthread_mutex_lock(&share->crd_mutex);
share->bg_crd_thd_wait = FALSE;
share->bg_crd_kill = FALSE;
share->bg_crd_init = FALSE;
pthread_mutex_unlock(&share->crd_mutex);
my_thread_end();
DBUG_RETURN(NULL);
}
#endif
pthread_mutex_lock(&share->crd_mutex);
if (!(thd = new THD(next_thread_id())))
{
share->bg_crd_thd_wait = FALSE;
share->bg_crd_kill = FALSE;
share->bg_crd_init = FALSE;
pthread_mutex_unlock(&share->crd_mutex);
my_thread_end();
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(NULL, need_mons, MYF(MY_WME));
#endif
DBUG_RETURN(NULL);
}
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
thd->thread_stack = (char*) &thd;
thd->store_globals();
if (!(trx = spider_get_trx(thd, FALSE, &error_num)))
{
delete thd;
share->bg_crd_thd_wait = FALSE;
share->bg_crd_kill = FALSE;
share->bg_crd_init = FALSE;
pthread_mutex_unlock(&share->crd_mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(NULL, need_mons, MYF(MY_WME));
#endif
DBUG_RETURN(NULL);
}
share->bg_crd_thd = thd;
table.s = share->table_share;
table.field = share->table_share->field;
table.key_info = share->table_share->key_info;
spider.trx = trx;
spider.change_table_ptr(&table, share->table_share);
spider.share = share;
spider.conns = conns;
spider.conn_link_idx = conn_link_idx;
spider.conn_can_fo = conn_can_fo;
spider.need_mons = need_mons;
spider.conn_keys_first_ptr = share->conn_keys[0];
spider.conn_keys = conn_keys;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
spider.hs_r_conn_keys = hs_r_conn_keys;
spider.hs_w_conn_keys = hs_w_conn_keys;
#endif
spider.dbton_handler = dbton_hdl;
memset(conns, 0, sizeof(SPIDER_CONN *) * share->link_count);
memset(need_mons, 0, sizeof(int) * share->link_count);
memset(dbton_hdl, 0, sizeof(spider_db_handler *) * SPIDER_DBTON_SIZE);
spider_trx_set_link_idx_for_all(&spider);
spider.search_link_idx = spider_conn_first_link_idx(thd,
share->link_statuses, share->access_balances, spider.conn_link_idx,
share->link_count, SPIDER_LINK_STATUS_OK);
for (roop_count = 0; roop_count < SPIDER_DBTON_SIZE; roop_count++)
{
if (
spider_bit_is_set(share->dbton_bitmap, roop_count) &&
spider_dbton[roop_count].create_db_handler
) {
if ((dbton_hdl[roop_count] = spider_dbton[roop_count].create_db_handler(
&spider, share->dbton_share[roop_count])))
break;
if (dbton_hdl[roop_count]->init())
break;
}
}
if (roop_count == SPIDER_DBTON_SIZE)
{
DBUG_PRINT("info",("spider handler init error"));
for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
{
if (
spider_bit_is_set(share->dbton_bitmap, roop_count) &&
dbton_hdl[roop_count]
) {
delete dbton_hdl[roop_count];
dbton_hdl[roop_count] = NULL;
}
}
spider_free_trx(trx, TRUE);
delete thd;
share->bg_crd_thd_wait = FALSE;
share->bg_crd_kill = FALSE;
share->bg_crd_init = FALSE;
pthread_mutex_unlock(&share->crd_mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(NULL, need_mons, MYF(MY_WME));
#endif
DBUG_RETURN(NULL);
}
/* init end */
while (TRUE)
{
DBUG_PRINT("info",("spider bg crd roop start"));
if (share->bg_crd_kill)
{
DBUG_PRINT("info",("spider bg crd kill start"));
for (roop_count = SPIDER_DBTON_SIZE - 1; roop_count >= 0; --roop_count)
{
if (
spider_bit_is_set(share->dbton_bitmap, roop_count) &&
dbton_hdl[roop_count]
) {
delete dbton_hdl[roop_count];
dbton_hdl[roop_count] = NULL;
}
}
spider_free_trx(trx, TRUE);
delete thd;
pthread_cond_signal(&share->bg_crd_sync_cond);
pthread_mutex_unlock(&share->crd_mutex);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(NULL, need_mons, MYF(MY_WME));
#endif
DBUG_RETURN(NULL);
}
if (spider.search_link_idx == -1)
{
spider_trx_set_link_idx_for_all(&spider);
/*
spider.search_link_idx = spider_conn_next_link_idx(
thd, share->link_statuses, share->access_balances,
spider.conn_link_idx, spider.search_link_idx, share->link_count,
SPIDER_LINK_STATUS_OK);
*/
spider.search_link_idx = spider_conn_first_link_idx(thd,
share->link_statuses, share->access_balances, spider.conn_link_idx,
share->link_count, SPIDER_LINK_STATUS_OK);
}
if (spider.search_link_idx >= 0)
{
if (difftime(share->bg_crd_try_time, share->crd_get_time) >=
share->bg_crd_interval)
{
if (!conns[spider.search_link_idx])
{
spider_get_conn(share, spider.search_link_idx,
share->conn_keys[spider.search_link_idx],
trx, &spider, FALSE, FALSE, SPIDER_CONN_KIND_MYSQL,
&error_num);
conns[spider.search_link_idx]->error_mode = 0;
/*
if (
error_num &&
share->monitoring_kind[spider.search_link_idx] &&
need_mons[spider.search_link_idx]
) {
lex_start(thd);
error_num = spider_ping_table_mon_from_table(
trx,
thd,
share,
(uint32) share->monitoring_sid[spider.search_link_idx],
share->table_name,
share->table_name_length,
spider.conn_link_idx[spider.search_link_idx],
NULL,
0,
share->monitoring_kind[spider.search_link_idx],
share->monitoring_limit[spider.search_link_idx],
share->monitoring_flag[spider.search_link_idx],
TRUE
);
lex_end(thd->lex);
}
*/
spider.search_link_idx = -1;
}
if (spider.search_link_idx != -1 && conns[spider.search_link_idx])
{
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (spider_get_crd(share, spider.search_link_idx,
share->bg_crd_try_time, &spider, &table,
share->bg_crd_interval, share->bg_crd_mode,
share->bg_crd_sync,
2))
#else
if (spider_get_crd(share, spider.search_link_idx,
share->bg_crd_try_time, &spider, &table,
share->bg_crd_interval, share->bg_crd_mode,
2))
#endif
{
/*
if (
share->monitoring_kind[spider.search_link_idx] &&
need_mons[spider.search_link_idx]
) {
lex_start(thd);
error_num = spider_ping_table_mon_from_table(
trx,
thd,
share,
(uint32) share->monitoring_sid[spider.search_link_idx],
share->table_name,
share->table_name_length,
spider.conn_link_idx[spider.search_link_idx],
NULL,
0,
share->monitoring_kind[spider.search_link_idx],
share->monitoring_limit[spider.search_link_idx],
share->monitoring_flag[spider.search_link_idx],
TRUE
);
lex_end(thd->lex);
}
*/
spider.search_link_idx = -1;
}
}
}
}
memset(need_mons, 0, sizeof(int) * share->link_count);
share->bg_crd_thd_wait = TRUE;
pthread_cond_wait(&share->bg_crd_cond, &share->crd_mutex);
}
}
int spider_create_mon_threads(
SPIDER_TRX *trx,
SPIDER_SHARE *share
) {
bool create_bg_mons = FALSE;
int error_num, roop_count, roop_count2;
SPIDER_LINK_PACK link_pack;
SPIDER_TABLE_MON_LIST *table_mon_list;
DBUG_ENTER("spider_create_mon_threads");
if (!share->bg_mon_init)
{
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (share->monitoring_bg_kind[roop_count])
{
create_bg_mons = TRUE;
break;
}
}
if (create_bg_mons)
{
char link_idx_str[SPIDER_SQL_INT_LEN];
int link_idx_str_length;
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_string conv_name_str(share->table_name_length +
SPIDER_SQL_INT_LEN + 1);
conv_name_str.set_charset(system_charset_info);
#else
char buf[share->table_name_length + SPIDER_SQL_INT_LEN + 1];
spider_string conv_name_str(buf, share->table_name_length +
SPIDER_SQL_INT_LEN + 1, system_charset_info);
#endif
conv_name_str.init_calc_mem(105);
conv_name_str.length(0);
conv_name_str.q_append(share->table_name, share->table_name_length);
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (share->monitoring_bg_kind[roop_count])
{
conv_name_str.length(share->table_name_length);
link_idx_str_length = my_sprintf(link_idx_str, (link_idx_str,
"%010d", roop_count));
conv_name_str.q_append(link_idx_str, link_idx_str_length + 1);
conv_name_str.length(conv_name_str.length() - 1);
if (!(table_mon_list = spider_get_ping_table_mon_list(trx, trx->thd,
&conv_name_str, share->table_name_length, roop_count,
(uint32) share->monitoring_sid[roop_count], FALSE, &error_num)))
goto error_get_ping_table_mon_list;
spider_free_ping_table_mon_list(table_mon_list);
}
}
if (!(share->bg_mon_thds = (THD **)
spider_bulk_malloc(spider_current_trx, 23, MYF(MY_WME | MY_ZEROFILL),
&share->bg_mon_thds, sizeof(THD *) * share->all_link_count,
&share->bg_mon_threads, sizeof(pthread_t) * share->all_link_count,
&share->bg_mon_mutexes, sizeof(pthread_mutex_t) *
share->all_link_count,
&share->bg_mon_conds, sizeof(pthread_cond_t) * share->all_link_count,
&share->bg_mon_sleep_conds,
sizeof(pthread_cond_t) * share->all_link_count,
NullS))
) {
error_num = HA_ERR_OUT_OF_MEM;
goto error_alloc_base;
}
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (
share->monitoring_bg_kind[roop_count] &&
#if MYSQL_VERSION_ID < 50500
pthread_mutex_init(&share->bg_mon_mutexes[roop_count],
MY_MUTEX_INIT_FAST)
#else
mysql_mutex_init(spd_key_mutex_bg_mon,
&share->bg_mon_mutexes[roop_count], MY_MUTEX_INIT_FAST)
#endif
) {
error_num = HA_ERR_OUT_OF_MEM;
goto error_mutex_init;
}
}
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (
share->monitoring_bg_kind[roop_count] &&
#if MYSQL_VERSION_ID < 50500
pthread_cond_init(&share->bg_mon_conds[roop_count], NULL)
#else
mysql_cond_init(spd_key_cond_bg_mon,
&share->bg_mon_conds[roop_count], NULL)
#endif
) {
error_num = HA_ERR_OUT_OF_MEM;
goto error_cond_init;
}
}
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (
share->monitoring_bg_kind[roop_count] &&
#if MYSQL_VERSION_ID < 50500
pthread_cond_init(&share->bg_mon_sleep_conds[roop_count], NULL)
#else
mysql_cond_init(spd_key_cond_bg_mon_sleep,
&share->bg_mon_sleep_conds[roop_count], NULL)
#endif
) {
error_num = HA_ERR_OUT_OF_MEM;
goto error_sleep_cond_init;
}
}
link_pack.share = share;
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (share->monitoring_bg_kind[roop_count])
{
link_pack.link_idx = roop_count;
pthread_mutex_lock(&share->bg_mon_mutexes[roop_count]);
#if MYSQL_VERSION_ID < 50500
if (pthread_create(&share->bg_mon_threads[roop_count],
&spider_pt_attr, spider_bg_mon_action, (void *) &link_pack)
)
#else
if (mysql_thread_create(spd_key_thd_bg_mon,
&share->bg_mon_threads[roop_count], &spider_pt_attr,
spider_bg_mon_action, (void *) &link_pack)
)
#endif
{
error_num = HA_ERR_OUT_OF_MEM;
goto error_thread_create;
}
pthread_cond_wait(&share->bg_mon_conds[roop_count],
&share->bg_mon_mutexes[roop_count]);
pthread_mutex_unlock(&share->bg_mon_mutexes[roop_count]);
}
}
share->bg_mon_init = TRUE;
}
}
DBUG_RETURN(0);
error_thread_create:
roop_count2 = roop_count;
for (roop_count--; roop_count >= 0; roop_count--)
{
if (share->monitoring_bg_kind[roop_count])
pthread_mutex_lock(&share->bg_mon_mutexes[roop_count]);
}
share->bg_mon_kill = TRUE;
for (roop_count = roop_count2 - 1; roop_count >= 0; roop_count--)
{
if (share->monitoring_bg_kind[roop_count])
{
pthread_cond_wait(&share->bg_mon_conds[roop_count],
&share->bg_mon_mutexes[roop_count]);
pthread_mutex_unlock(&share->bg_mon_mutexes[roop_count]);
}
}
share->bg_mon_kill = FALSE;
roop_count = share->all_link_count;
error_sleep_cond_init:
for (roop_count--; roop_count >= 0; roop_count--)
{
if (share->monitoring_bg_kind[roop_count])
pthread_cond_destroy(&share->bg_mon_sleep_conds[roop_count]);
}
roop_count = share->all_link_count;
error_cond_init:
for (roop_count--; roop_count >= 0; roop_count--)
{
if (share->monitoring_bg_kind[roop_count])
pthread_cond_destroy(&share->bg_mon_conds[roop_count]);
}
roop_count = share->all_link_count;
error_mutex_init:
for (roop_count--; roop_count >= 0; roop_count--)
{
if (share->monitoring_bg_kind[roop_count])
pthread_mutex_destroy(&share->bg_mon_mutexes[roop_count]);
}
spider_free(spider_current_trx, share->bg_mon_thds, MYF(0));
error_alloc_base:
error_get_ping_table_mon_list:
DBUG_RETURN(error_num);
}
void spider_free_mon_threads(
SPIDER_SHARE *share
) {
int roop_count;
DBUG_ENTER("spider_free_mon_threads");
if (share->bg_mon_init)
{
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (
share->monitoring_bg_kind[roop_count] &&
share->bg_mon_thds[roop_count]
) {
share->bg_mon_thds[roop_count]->killed = SPIDER_THD_KILL_CONNECTION;
}
}
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (share->monitoring_bg_kind[roop_count])
pthread_mutex_lock(&share->bg_mon_mutexes[roop_count]);
}
share->bg_mon_kill = TRUE;
for (roop_count = 0; roop_count < (int) share->all_link_count;
roop_count++)
{
if (share->monitoring_bg_kind[roop_count])
{
pthread_cond_signal(&share->bg_mon_sleep_conds[roop_count]);
pthread_cond_wait(&share->bg_mon_conds[roop_count],
&share->bg_mon_mutexes[roop_count]);
pthread_mutex_unlock(&share->bg_mon_mutexes[roop_count]);
pthread_join(share->bg_mon_threads[roop_count], NULL);
pthread_cond_destroy(&share->bg_mon_conds[roop_count]);
pthread_cond_destroy(&share->bg_mon_sleep_conds[roop_count]);
pthread_mutex_destroy(&share->bg_mon_mutexes[roop_count]);
}
}
spider_free(spider_current_trx, share->bg_mon_thds, MYF(0));
share->bg_mon_kill = FALSE;
share->bg_mon_init = FALSE;
}
DBUG_VOID_RETURN;
}
void *spider_bg_mon_action(
void *arg
) {
SPIDER_LINK_PACK *link_pack = (SPIDER_LINK_PACK*) arg;
SPIDER_SHARE *share = link_pack->share;
SPIDER_TRX *trx;
int error_num, link_idx = link_pack->link_idx;
THD *thd;
my_thread_init();
DBUG_ENTER("spider_bg_mon_action");
/* init start */
pthread_mutex_lock(&share->bg_mon_mutexes[link_idx]);
if (!(thd = new THD(next_thread_id())))
{
share->bg_mon_kill = FALSE;
share->bg_mon_init = FALSE;
pthread_cond_signal(&share->bg_mon_conds[link_idx]);
pthread_mutex_unlock(&share->bg_mon_mutexes[link_idx]);
my_thread_end();
DBUG_RETURN(NULL);
}
#ifdef HAVE_PSI_INTERFACE
mysql_thread_set_psi_id(thd->thread_id);
#endif
thd->thread_stack = (char*) &thd;
thd->store_globals();
if (!(trx = spider_get_trx(thd, FALSE, &error_num)))
{
delete thd;
share->bg_mon_kill = FALSE;
share->bg_mon_init = FALSE;
pthread_cond_signal(&share->bg_mon_conds[link_idx]);
pthread_mutex_unlock(&share->bg_mon_mutexes[link_idx]);
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
DBUG_RETURN(NULL);
}
share->bg_mon_thds[link_idx] = thd;
pthread_cond_signal(&share->bg_mon_conds[link_idx]);
/*
pthread_mutex_unlock(&share->bg_mon_mutexes[link_idx]);
*/
/* init end */
while (TRUE)
{
DBUG_PRINT("info",("spider bg mon sleep %lld",
share->monitoring_bg_interval[link_idx]));
if (!share->bg_mon_kill)
{
struct timespec abstime;
set_timespec_nsec(abstime,
share->monitoring_bg_interval[link_idx] * 1000);
pthread_cond_timedwait(&share->bg_mon_sleep_conds[link_idx],
&share->bg_mon_mutexes[link_idx], &abstime);
/*
my_sleep((ulong) share->monitoring_bg_interval[link_idx]);
*/
}
DBUG_PRINT("info",("spider bg mon roop start"));
if (share->bg_mon_kill)
{
DBUG_PRINT("info",("spider bg mon kill start"));
/*
pthread_mutex_lock(&share->bg_mon_mutexes[link_idx]);
*/
pthread_cond_signal(&share->bg_mon_conds[link_idx]);
pthread_mutex_unlock(&share->bg_mon_mutexes[link_idx]);
spider_free_trx(trx, TRUE);
delete thd;
#if !defined(MYSQL_DYNAMIC_PLUGIN) || !defined(_WIN32)
my_pthread_setspecific_ptr(THR_THD, NULL);
#endif
my_thread_end();
DBUG_RETURN(NULL);
}
if (share->monitoring_bg_kind[link_idx])
{
lex_start(thd);
error_num = spider_ping_table_mon_from_table(
trx,
thd,
share,
(uint32) share->monitoring_sid[link_idx],
share->table_name,
share->table_name_length,
link_idx,
NULL,
0,
share->monitoring_bg_kind[link_idx],
share->monitoring_limit[link_idx],
share->monitoring_bg_flag[link_idx],
TRUE
);
lex_end(thd->lex);
}
}
}
#endif
int spider_conn_first_link_idx(
THD *thd,
long *link_statuses,
long *access_balances,
uint *conn_link_idx,
int link_count,
int link_status
) {
int roop_count, active_links = 0;
longlong balance_total = 0, balance_val;
double rand_val;
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
int *link_idxs, link_idx;
long *balances;
#else
int link_idxs[link_count];
long balances[link_count];
#endif
DBUG_ENTER("spider_conn_first_link_idx");
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
if (!(link_idxs = (int *)
spider_bulk_malloc(spider_current_trx, 24, MYF(MY_WME),
&link_idxs, sizeof(int) * link_count,
&balances, sizeof(long) * link_count,
NullS))
) {
DBUG_PRINT("info",("spider out of memory"));
DBUG_RETURN(-1);
}
#endif
for (roop_count = 0; roop_count < link_count; roop_count++)
{
DBUG_ASSERT((conn_link_idx[roop_count] - roop_count) % link_count == 0);
if (link_statuses[conn_link_idx[roop_count]] <= link_status)
{
link_idxs[active_links] = roop_count;
balances[active_links] = access_balances[roop_count];
balance_total += access_balances[roop_count];
active_links++;
}
}
if (active_links == 0)
{
DBUG_PRINT("info",("spider all links are failed"));
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
spider_free(spider_current_trx, link_idxs, MYF(MY_WME));
#endif
DBUG_RETURN(-1);
}
#if defined(MARIADB_BASE_VERSION) && MYSQL_VERSION_ID >= 100002
DBUG_PRINT("info",("spider server_id=%lu", thd->variables.server_id));
#else
DBUG_PRINT("info",("spider server_id=%u", thd->server_id));
#endif
DBUG_PRINT("info",("spider thread_id=%lu", thd_get_thread_id(thd)));
#if defined(MARIADB_BASE_VERSION) && MYSQL_VERSION_ID >= 100002
rand_val = spider_rand(thd->variables.server_id + thd_get_thread_id(thd));
#else
rand_val = spider_rand(thd->server_id + thd_get_thread_id(thd));
#endif
DBUG_PRINT("info",("spider rand_val=%f", rand_val));
balance_val = (longlong) (rand_val * balance_total);
DBUG_PRINT("info",("spider balance_val=%lld", balance_val));
for (roop_count = 0; roop_count < active_links - 1; roop_count++)
{
DBUG_PRINT("info",("spider balances[%d]=%ld",
roop_count, balances[roop_count]));
if (balance_val < balances[roop_count])
break;
balance_val -= balances[roop_count];
}
DBUG_PRINT("info",("spider first link_idx=%d", link_idxs[roop_count]));
#if defined(_MSC_VER) || defined(__SUNPRO_CC)
link_idx = link_idxs[roop_count];
spider_free(spider_current_trx, link_idxs, MYF(MY_WME));
DBUG_RETURN(link_idx);
#else
DBUG_RETURN(link_idxs[roop_count]);
#endif
}
int spider_conn_next_link_idx(
THD *thd,
long *link_statuses,
long *access_balances,
uint *conn_link_idx,
int link_idx,
int link_count,
int link_status
) {
int tmp_link_idx;
DBUG_ENTER("spider_conn_next_link_idx");
DBUG_ASSERT((conn_link_idx[link_idx] - link_idx) % link_count == 0);
tmp_link_idx = spider_conn_first_link_idx(thd, link_statuses,
access_balances, conn_link_idx, link_count, link_status);
if (
tmp_link_idx >= 0 &&
tmp_link_idx == link_idx
) {
do {
tmp_link_idx++;
if (tmp_link_idx >= link_count)
tmp_link_idx = 0;
if (tmp_link_idx == link_idx)
break;
} while (link_statuses[conn_link_idx[tmp_link_idx]] > link_status);
DBUG_PRINT("info",("spider next link_idx=%d", tmp_link_idx));
DBUG_RETURN(tmp_link_idx);
}
DBUG_PRINT("info",("spider next link_idx=%d", tmp_link_idx));
DBUG_RETURN(tmp_link_idx);
}
int spider_conn_link_idx_next(
long *link_statuses,
uint *conn_link_idx,
int link_idx,
int link_count,
int link_status
) {
DBUG_ENTER("spider_conn_link_idx_next");
do {
link_idx++;
if (link_idx >= link_count)
break;
DBUG_ASSERT((conn_link_idx[link_idx] - link_idx) % link_count == 0);
} while (link_statuses[conn_link_idx[link_idx]] > link_status);
DBUG_PRINT("info",("spider link_idx=%d", link_idx));
DBUG_RETURN(link_idx);
}
int spider_conn_lock_mode(
ha_spider *spider
) {
SPIDER_RESULT_LIST *result_list = &spider->result_list;
DBUG_ENTER("spider_conn_lock_mode");
if (result_list->lock_type == F_WRLCK || spider->lock_mode == 2)
DBUG_RETURN(SPIDER_LOCK_MODE_EXCLUSIVE);
else if (spider->lock_mode == 1)
DBUG_RETURN(SPIDER_LOCK_MODE_SHARED);
DBUG_RETURN(SPIDER_LOCK_MODE_NO_LOCK);
}
bool spider_conn_check_recovery_link(
SPIDER_SHARE *share
) {
int roop_count;
DBUG_ENTER("spider_check_recovery_link");
for (roop_count = 0; roop_count < (int) share->link_count; roop_count++)
{
if (share->link_statuses[roop_count] == SPIDER_LINK_STATUS_RECOVERY)
DBUG_RETURN(TRUE);
}
DBUG_RETURN(FALSE);
}
bool spider_conn_use_handler(
ha_spider *spider,
int lock_mode,
int link_idx
) {
THD *thd = spider->trx->thd;
int use_handler = spider_param_use_handler(thd,
spider->share->use_handlers[link_idx]);
DBUG_ENTER("spider_conn_use_handler");
DBUG_PRINT("info",("spider use_handler=%d", use_handler));
DBUG_PRINT("info",("spider spider->conn_kind[link_idx]=%u",
spider->conn_kind[link_idx]));
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (spider->conn_kind[link_idx] != SPIDER_CONN_KIND_MYSQL)
{
DBUG_PRINT("info",("spider TRUE by HS"));
spider->sql_kinds |= SPIDER_SQL_KIND_HS;
spider->sql_kind[link_idx] = SPIDER_SQL_KIND_HS;
#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
if (
spider->do_direct_update &&
spider_bit_is_set(spider->do_hs_direct_update, link_idx)
) {
DBUG_PRINT("info",("spider using HS direct_update"));
spider->direct_update_kinds |= SPIDER_SQL_KIND_HS;
}
#endif
DBUG_RETURN(TRUE);
}
#endif
#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
if (spider->do_direct_update)
{
spider->sql_kinds |= SPIDER_SQL_KIND_SQL;
spider->sql_kind[link_idx] = SPIDER_SQL_KIND_SQL;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
if (spider_bit_is_set(spider->do_hs_direct_update, link_idx))
{
spider->direct_update_kinds |= SPIDER_SQL_KIND_HS;
DBUG_PRINT("info",("spider TRUE by using HS direct_update"));
DBUG_RETURN(TRUE);
} else {
#endif
spider->direct_update_kinds |= SPIDER_SQL_KIND_SQL;
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
}
if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_MYSQL)
{
#endif
DBUG_PRINT("info",("spider FALSE by using direct_update"));
DBUG_RETURN(FALSE);
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else {
DBUG_PRINT("info",("spider TRUE by using BOTH"));
DBUG_RETURN(TRUE);
}
#endif
}
#endif
if (spider->use_spatial_index)
{
DBUG_PRINT("info",("spider FALSE by use_spatial_index"));
spider->sql_kinds |= SPIDER_SQL_KIND_SQL;
spider->sql_kind[link_idx] = SPIDER_SQL_KIND_SQL;
DBUG_RETURN(FALSE);
}
uint dbton_id;
spider_db_handler *dbton_hdl;
dbton_id = spider->share->sql_dbton_ids[spider->conn_link_idx[link_idx]];
dbton_hdl = spider->dbton_handler[dbton_id];
if (!dbton_hdl->support_use_handler(use_handler))
{
DBUG_PRINT("info",("spider FALSE by dbton"));
spider->sql_kinds |= SPIDER_SQL_KIND_SQL;
spider->sql_kind[link_idx] = SPIDER_SQL_KIND_SQL;
DBUG_RETURN(FALSE);
}
if (
spider->sql_command == SQLCOM_HA_READ &&
(
!(use_handler & 2) ||
(
spider_param_sync_trx_isolation(thd) &&
thd_tx_isolation(thd) == ISO_SERIALIZABLE
)
)
) {
DBUG_PRINT("info",("spider TRUE by HA"));
spider->sql_kinds |= SPIDER_SQL_KIND_HANDLER;
spider->sql_kind[link_idx] = SPIDER_SQL_KIND_HANDLER;
DBUG_RETURN(TRUE);
}
if (
spider->sql_command != SQLCOM_HA_READ &&
lock_mode == SPIDER_LOCK_MODE_NO_LOCK &&
spider_param_sync_trx_isolation(thd) &&
thd_tx_isolation(thd) != ISO_SERIALIZABLE &&
(use_handler & 1)
) {
DBUG_PRINT("info",("spider TRUE by PARAM"));
spider->sql_kinds |= SPIDER_SQL_KIND_HANDLER;
spider->sql_kind[link_idx] = SPIDER_SQL_KIND_HANDLER;
DBUG_RETURN(TRUE);
}
spider->sql_kinds |= SPIDER_SQL_KIND_SQL;
spider->sql_kind[link_idx] = SPIDER_SQL_KIND_SQL;
DBUG_RETURN(FALSE);
}
bool spider_conn_need_open_handler(
ha_spider *spider,
uint idx,
int link_idx
) {
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
SPIDER_CONN *conn;
#endif
DBUG_ENTER("spider_conn_need_open_handler");
DBUG_PRINT("info",("spider spider=%p", spider));
if (spider->handler_opened(link_idx, spider->conn_kind[link_idx]))
{
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
if (
spider->do_direct_update &&
spider_bit_is_set(spider->do_hs_direct_update, link_idx)
) {
conn = spider->hs_w_conns[link_idx];
if (
!conn->server_lost &&
conn->hs_pre_age == spider->hs_w_conn_ages[link_idx]
) {
DBUG_PRINT("info",("spider hs_write is already opened"));
DBUG_RETURN(FALSE);
}
} else
#endif
if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_MYSQL)
{
#endif
DBUG_PRINT("info",("spider HA already opened"));
DBUG_RETURN(FALSE);
#if defined(HS_HAS_SQLCOM) && defined(HAVE_HANDLERSOCKET)
} else if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_HS_READ)
{
DBUG_PRINT("info",("spider r_handler_index[%d]=%d",
link_idx, spider->r_handler_index[link_idx]));
DBUG_PRINT("info",("spider idx=%d", idx));
DBUG_PRINT("info",("spider hs_pushed_ret_fields_num=%zu",
spider->hs_pushed_ret_fields_num));
DBUG_PRINT("info",("spider hs_r_ret_fields_num[%d]=%lu",
link_idx, spider->hs_r_ret_fields_num[link_idx]));
DBUG_PRINT("info",("spider hs_r_ret_fields[%d]=%p",
link_idx, spider->hs_r_ret_fields[link_idx]));
#ifndef DBUG_OFF
if (
spider->hs_pushed_ret_fields_num < MAX_FIELDS &&
spider->hs_r_ret_fields[link_idx] &&
spider->hs_pushed_ret_fields_num ==
spider->hs_r_ret_fields_num[link_idx]
) {
int roop_count;
for (roop_count = 0; roop_count < (int) spider->hs_pushed_ret_fields_num;
++roop_count)
{
DBUG_PRINT("info",("spider hs_pushed_ret_fields[%d]=%u",
roop_count, spider->hs_pushed_ret_fields[roop_count]));
DBUG_PRINT("info",("spider hs_r_ret_fields[%d][%d]=%u",
link_idx, roop_count,
spider->hs_r_ret_fields[link_idx][roop_count]));
}
}
#endif
if (
spider->r_handler_index[link_idx] == idx
#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
&& (
(
spider->hs_pushed_ret_fields_num == MAX_FIELDS &&
spider->hs_r_ret_fields_num[link_idx] == MAX_FIELDS
) ||
(
spider->hs_pushed_ret_fields_num < MAX_FIELDS &&
spider->hs_r_ret_fields[link_idx] &&
spider->hs_pushed_ret_fields_num ==
spider->hs_r_ret_fields_num[link_idx] &&
!memcmp(spider->hs_pushed_ret_fields,
spider->hs_r_ret_fields[link_idx],
sizeof(uint32) * spider->hs_pushed_ret_fields_num)
)
)
#endif
) {
SPIDER_CONN *conn = spider->hs_r_conns[link_idx];
DBUG_PRINT("info",("spider conn=%p", conn));
DBUG_PRINT("info",("spider conn->conn_id=%llu", conn->conn_id));
DBUG_PRINT("info",("spider conn->connection_id=%llu",
conn->connection_id));
DBUG_PRINT("info",("spider conn->server_lost=%s",
conn->server_lost ? "TRUE" : "FALSE"));
DBUG_PRINT("info",("spider conn->hs_pre_age=%llu", conn->hs_pre_age));
DBUG_PRINT("info",("spider hs_w_conn_ages[%d]=%llu",
link_idx, spider->hs_w_conn_ages[link_idx]));
if (
!conn->server_lost &&
conn->hs_pre_age == spider->hs_r_conn_ages[link_idx]
) {
DBUG_PRINT("info",("spider hs_r same idx"));
DBUG_RETURN(FALSE);
}
}
} else if (spider->conn_kind[link_idx] == SPIDER_CONN_KIND_HS_WRITE)
{
DBUG_PRINT("info",("spider w_handler_index[%d]=%d",
link_idx, spider->w_handler_index[link_idx]));
DBUG_PRINT("info",("spider idx=%d", idx));
DBUG_PRINT("info",("spider hs_pushed_ret_fields_num=%zu",
spider->hs_pushed_ret_fields_num));
DBUG_PRINT("info",("spider hs_w_ret_fields_num[%d]=%lu",
link_idx, spider->hs_w_ret_fields_num[link_idx]));
DBUG_PRINT("info",("spider hs_w_ret_fields[%d]=%p",
link_idx, spider->hs_w_ret_fields[link_idx]));
#ifndef DBUG_OFF
if (
spider->hs_pushed_ret_fields_num < MAX_FIELDS &&
spider->hs_w_ret_fields[link_idx] &&
spider->hs_pushed_ret_fields_num ==
spider->hs_w_ret_fields_num[link_idx]
) {
int roop_count;
for (roop_count = 0; roop_count < (int) spider->hs_pushed_ret_fields_num;
++roop_count)
{
DBUG_PRINT("info",("spider hs_pushed_ret_fields[%d]=%u",
roop_count, spider->hs_pushed_ret_fields[roop_count]));
DBUG_PRINT("info",("spider hs_w_ret_fields[%d][%d]=%u",
link_idx, roop_count,
spider->hs_w_ret_fields[link_idx][roop_count]));
}
}
#endif
if (
spider->w_handler_index[link_idx] == idx
#ifdef HANDLER_HAS_DIRECT_UPDATE_ROWS
&& (
(
spider->hs_pushed_ret_fields_num == MAX_FIELDS &&
spider->hs_w_ret_fields_num[link_idx] == MAX_FIELDS
) ||
(
spider->hs_pushed_ret_fields_num < MAX_FIELDS &&
spider->hs_w_ret_fields[link_idx] &&
spider->hs_pushed_ret_fields_num ==
spider->hs_w_ret_fields_num[link_idx] &&
!memcmp(spider->hs_pushed_ret_fields,
spider->hs_w_ret_fields[link_idx],
sizeof(uint32) * spider->hs_pushed_ret_fields_num)
)
)
#endif
) {
SPIDER_CONN *conn = spider->hs_w_conns[link_idx];
DBUG_PRINT("info",("spider conn=%p", conn));
DBUG_PRINT("info",("spider conn->conn_id=%llu", conn->conn_id));
DBUG_PRINT("info",("spider conn->connection_id=%llu",
conn->connection_id));
DBUG_PRINT("info",("spider conn->server_lost=%s",
conn->server_lost ? "TRUE" : "FALSE"));
DBUG_PRINT("info",("spider conn->hs_pre_age=%llu", conn->hs_pre_age));
DBUG_PRINT("info",("spider hs_w_conn_ages[%d]=%llu",
link_idx, spider->hs_w_conn_ages[link_idx]));
if (
!conn->server_lost &&
conn->hs_pre_age == spider->hs_w_conn_ages[link_idx]
) {
DBUG_PRINT("info",("spider hs_w same idx"));
DBUG_RETURN(FALSE);
}
}
}
#endif
}
DBUG_RETURN(TRUE);
}