Port of cursors to be pushed into 5.0 tree:

- client side part is simple and may be considered stable
- server side part now just joggles with THD state to save execution
  state and has no additional locking wisdom.
  Lot's of it are to be rewritten.


include/mysql.h:
  Cursor patch to push into the main tree, client library part (considered 
  stable):
  - new statement attribute STMT_ATTR_CURSOR_TYPE
  - MYSQL_STMT::flags to store statement cursor type
  - MYSQL_STMT::server_status to store server status (i. e. if the server
  was able to open a cursor for this query).
include/mysql_com.h:
  Cursor patch to push into the main tree, client library part (considered 
  stable):
  - new COMmand, COM_FETCH, to fetch K rows from read-only cursor.
    By design should support scrollable cursors as well.
  - a few new server statuses:
    SERVER_STATUS_CURSOR_EXISTS is sent by server in reply to COM_EXECUTE,
    when cursor was successfully opened for this query
    SERVER_STATUS_LAST_ROW_SENT is sent along with the last row to prevent one
    more round trip just for finding out that all rows were fetched from 
    this cursor (this is server mem savier also).
  - and finally, all possible values of STMT_ATTR_CURSOR_TYPE, 
    while now we support only CURSORT_TYPE_NO_CURSOR and 
    CURSOR_TYPE_READ_ONLY
libmysql/libmysql.c:
  Cursor patch to push into the main tree, client library part (considered 
  stable):
  - simple additions to mysql_stmt_fetch implementation to read data 
    from an opened cursor: we can read up to iteration count rows per
    one request; read rows are buffered in the same way as rows of
    mysql_stmt_store_result.
  - now send stmt->flags to server to let him now if we wish to have 
    a cursor for this statement.
  - support for setting/getting statement cursor type.
libmysqld/examples/Makefile.am:
  Testing cursors was originally implemented in C++. Now when these tests
  go into client_test, it's time to convert it to C++ as well.
libmysqld/lib_sql.cc:
  - cleanup: send_fields flags are now named.
sql/ha_innodb.cc:
  - cleanup: send_fields flags are now named.
sql/mysql_priv.h:
  - cursors support: declaration for server-side handler of COM_FETCH
sql/protocol.cc:
  - cleanup: send_fields flags are now named.
  - we can't anymore assert that field_types[field_pos] is sensible:
    if we have COM_EXCUTE(stmt1), COM_EXECUTE(stmt2), COM_FETCH(stmt1)
    field_types[field_pos] will point to fields of stmt2.
sql/protocol.h:
  - cleanup: send_fields flag_s_ are now named.
sql/protocol_cursor.cc:
  - cleanup: send_fields flags are now named.
sql/repl_failsafe.cc:
  - cleanup: send_fields flags are now named.
sql/slave.cc:
  - cleanup: send_fields flags are now named.
sql/sp.cc:
  - cleanup: send_fields flags are now named.
sql/sp_head.cc:
  - cleanup: send_fields flags are now named.
sql/sql_acl.cc:
  - cleanup: send_fields flags are now named.
sql/sql_class.cc:
  - cleanup: send_fields flags are now named.
sql/sql_class.h:
  - cleanup: send_fields flags are now named.
sql/sql_error.cc:
  - cleanup: send_fields flags are now named.
sql/sql_handler.cc:
  - cleanup: send_fields flags are now named.
sql/sql_help.cc:
  - cleanup: send_fields flags are now named.
sql/sql_parse.cc:
  Server side support for cursors:
  - handle COM_FETCH
  - enforce assumption that whenever we free thd->free_list, 
    we reset it to zero. This way it's much easier to handle free_list
    in prepared statements implementation.
sql/sql_prepare.cc:
  Server side support for cursors:
  - implementation of mysql_stmt_fetch (fetch some rows from open cursor).
  - management of cursors memory is quite tricky now.
  - execute_stmt can't be reused anymore in mysql_stmt_execute and 
    mysql_sql_stmt_execute
sql/sql_repl.cc:
  - cleanup: send_fields flags are now named.
sql/sql_select.cc:
  Server side support for cursors:
  - implementation of Cursor::open, Cursor::fetch (buggy when it comes to
    non-equi joins), cursor cleanups.
  - -4 -3 -0 constants indicating return value of sub_select and end_send are
    to be renamed to something more readable:
    it turned out to be not so simple, so it should come with the other patch.
sql/sql_select.h:
  Server side support for cursors:
  - declaration of Cursor class.
  - JOIN::fetch_limit contains runtime value of rows fetched via cursor.
sql/sql_show.cc:
  - cleanup: send_fields flags are now named.
sql/sql_table.cc:
  - cleanup: send_fields flags are now named.
sql/sql_union.cc:
  - if there was a cursor, don't cleanup unit: we'll need it to fetch
    the rest of the rows.
tests/Makefile.am:
  Now client_test is in C++.
tests/client_test.cc:
  A few elementary tests for cursors.
BitKeeper/etc/ignore:
  Added libmysqld/examples/client_test.cc to the ignore list
This commit is contained in:
unknown 2004-08-03 03:32:21 -07:00
parent 4467bcf26e
commit eaf34dd8e3
31 changed files with 1091 additions and 150 deletions

View file

@ -800,3 +800,4 @@ vio/test-sslclient
vio/test-sslserver
vio/viotest-ssl
libmysqld/sql_view.cc
libmysqld/examples/client_test.cc

View file

@ -580,6 +580,12 @@ typedef struct st_mysql_stmt
int (*read_row_func)(struct st_mysql_stmt *stmt,
unsigned char **row);
unsigned long stmt_id; /* Id for prepared statement */
unsigned long flags; /* i.e. type of cursor to open */
/*
Copied from mysql->server_status after execute/fetch to know
server-side cursor status for this statement.
*/
unsigned int server_status;
unsigned int last_errno; /* error code */
unsigned int param_count; /* inpute parameters count */
unsigned int field_count; /* number of columns in result set */
@ -608,7 +614,12 @@ enum enum_stmt_attr_type
In the new API we do that only by request because it slows down
mysql_stmt_store_result sufficiently.
*/
STMT_ATTR_UPDATE_MAX_LENGTH
STMT_ATTR_UPDATE_MAX_LENGTH,
/*
unsigned long with combination of cursor flags (read only, for update,
etc)
*/
STMT_ATTR_CURSOR_TYPE
};

View file

@ -49,7 +49,7 @@ enum enum_server_command
COM_TIME, COM_DELAYED_INSERT, COM_CHANGE_USER, COM_BINLOG_DUMP,
COM_TABLE_DUMP, COM_CONNECT_OUT, COM_REGISTER_SLAVE,
COM_PREPARE, COM_EXECUTE, COM_LONG_DATA, COM_CLOSE_STMT,
COM_RESET_STMT, COM_SET_OPTION,
COM_RESET_STMT, COM_SET_OPTION, COM_FETCH,
COM_END /* Must be last */
};
@ -132,6 +132,17 @@ enum enum_server_command
#define SERVER_MORE_RESULTS_EXISTS 8 /* Multi query - next query exists */
#define SERVER_QUERY_NO_GOOD_INDEX_USED 16
#define SERVER_QUERY_NO_INDEX_USED 32
/*
The server was able to fulfill client request and open read-only
non-scrollable cursor for the query. This flag comes in server
status with reply to COM_EXECUTE and COM_EXECUTE_DIRECT commands.
*/
#define SERVER_STATUS_CURSOR_EXISTS 64
/*
This flag is sent with last row of read-only cursor, in reply to
COM_FETCH command.
*/
#define SERVER_STATUS_LAST_ROW_SENT 128
#define MYSQL_ERRMSG_SIZE 512
#define NET_READ_TIMEOUT 30 /* Timeout on read */
@ -257,6 +268,16 @@ enum enum_shutdown_level {
KILL_CONNECTION= 255
};
enum enum_cursor_type
{
CURSOR_TYPE_NO_CURSOR= 0,
CURSOR_TYPE_READ_ONLY= 1,
CURSOR_TYPE_FOR_UPDATE= 2,
CURSOR_TYPE_SCROLLABLE= 4
};
/* options for mysql_set_option */
enum enum_mysql_set_option
{

View file

@ -1668,6 +1668,7 @@ myodbc_remove_escape(MYSQL *mysql,char *name)
static int stmt_read_row_unbuffered(MYSQL_STMT *stmt, unsigned char **row);
static int stmt_read_row_buffered(MYSQL_STMT *stmt, unsigned char **row);
static int stmt_read_row_from_cursor(MYSQL_STMT *stmt, unsigned char **row);
static int stmt_read_row_no_data(MYSQL_STMT *stmt, unsigned char **row);
/*
@ -2387,7 +2388,7 @@ static my_bool execute(MYSQL_STMT *stmt, char *packet, ulong length)
mysql->last_used_con= mysql;
int4store(buff, stmt->stmt_id); /* Send stmt id to server */
buff[4]= (char) 0; /* no flags */
buff[4]= (char) stmt->flags;
int4store(buff+5, 1); /* iteration count */
if (cli_advanced_command(mysql, COM_EXECUTE, buff, sizeof(buff),
packet, length, 1) ||
@ -2397,6 +2398,7 @@ static my_bool execute(MYSQL_STMT *stmt, char *packet, ulong length)
DBUG_RETURN(1);
}
stmt->affected_rows= mysql->affected_rows;
stmt->server_status= mysql->server_status;
stmt->insert_id= mysql->insert_id;
DBUG_RETURN(0);
}
@ -2552,6 +2554,59 @@ error:
return rc;
}
/*
Fetch statement row using server side cursor.
SYNOPSIS
stmt_read_row_from_cursor()
RETURN VALUE
0 success
1 error
MYSQL_NO_DATA end of data
*/
static int
stmt_read_row_from_cursor(MYSQL_STMT *stmt, unsigned char **row)
{
if (stmt->data_cursor)
return stmt_read_row_buffered(stmt, row);
if (stmt->server_status & SERVER_STATUS_LAST_ROW_SENT)
stmt->server_status &= ~SERVER_STATUS_LAST_ROW_SENT;
else
{
MYSQL *mysql= stmt->mysql;
NET *net= &mysql->net;
MYSQL_DATA *result= &stmt->result;
char buff[4 /* statement id */ +
4 /* number of rows to fetch */];
free_root(&result->alloc, MYF(MY_KEEP_PREALLOC));
result->data= NULL;
result->rows= 0;
/* Send row request to the server */
int4store(buff, stmt->stmt_id);
int4store(buff + 4, 1); /* number of rows to fetch */
if (cli_advanced_command(mysql, COM_FETCH, buff, sizeof(buff),
NullS, 0, 1))
{
set_stmt_errmsg(stmt, net->last_error, net->last_errno, net->sqlstate);
return 1;
}
stmt->server_status= mysql->server_status;
if (cli_read_binary_rows(stmt))
return 1;
stmt->server_status= mysql->server_status;
stmt->data_cursor= result->data;
return stmt_read_row_buffered(stmt, row);
}
*row= 0;
return MYSQL_NO_DATA;
}
/*
Default read row function to not SIGSEGV in client in
case of wrong sequence of API calls.
@ -2593,6 +2648,9 @@ my_bool STDCALL mysql_stmt_attr_set(MYSQL_STMT *stmt,
case STMT_ATTR_UPDATE_MAX_LENGTH:
stmt->update_max_length= value ? *(const my_bool*) value : 0;
break;
case STMT_ATTR_CURSOR_TYPE:
stmt->flags= value ? *(const unsigned long *) value : 0;
break;
default:
return TRUE;
}
@ -2608,6 +2666,9 @@ my_bool STDCALL mysql_stmt_attr_get(MYSQL_STMT *stmt,
case STMT_ATTR_UPDATE_MAX_LENGTH:
*(unsigned long *) value= stmt->update_max_length;
break;
case STMT_ATTR_CURSOR_TYPE:
*(unsigned long *) value= stmt->flags;
break;
default:
return TRUE;
}
@ -2711,9 +2772,17 @@ int STDCALL mysql_stmt_execute(MYSQL_STMT *stmt)
stmt->state= MYSQL_STMT_EXECUTE_DONE;
if (stmt->field_count)
{
stmt->mysql->unbuffered_fetch_owner= &stmt->unbuffered_fetch_cancelled;
stmt->unbuffered_fetch_cancelled= FALSE;
stmt->read_row_func= stmt_read_row_unbuffered;
if (stmt->server_status & SERVER_STATUS_CURSOR_EXISTS)
{
mysql->status= MYSQL_STATUS_READY;
stmt->read_row_func= stmt_read_row_from_cursor;
}
else
{
stmt->mysql->unbuffered_fetch_owner= &stmt->unbuffered_fetch_cancelled;
stmt->unbuffered_fetch_cancelled= FALSE;
stmt->read_row_func= stmt_read_row_unbuffered;
}
}
DBUG_RETURN(0);
}

View file

@ -27,7 +27,7 @@ mysql_SOURCES = mysql.cc readline.cc completion_hash.cc \
mysql_LDADD = @readline_link@ @TERMCAP_LIB@ $(LDADD)
client_test_LINK = $(CXXLINK)
client_test_SOURCES = client_test.c
client_test_SOURCES = client_test.cc
clean:
rm -f $(client_sources)

View file

@ -568,7 +568,7 @@ err:
C_MODE_END
bool Protocol::send_fields(List<Item> *list, uint flag)
bool Protocol::send_fields(List<Item> *list, uint flags)
{
List_iterator_fast<Item> it(*list);
Item *item;
@ -615,7 +615,7 @@ bool Protocol::send_fields(List<Item> *list, uint flag)
if (INTERNAL_NUM_FIELD(client_field))
client_field->flags|= NUM_FLAG;
if (flag & 2)
if (flags & Protocol::SEND_DEFAULTS)
{
char buff[80];
String tmp(buff, sizeof(buff), default_charset_info), *res;

View file

@ -4891,7 +4891,8 @@ innodb_show_status(
field_list.push_back(new Item_empty_string("Status", flen));
if (protocol->send_fields(&field_list, 1)) {
if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF)) {
my_free(str, MYF(0));

View file

@ -694,6 +694,7 @@ int mysql_stmt_prepare(THD *thd, char *packet, uint packet_length,
LEX_STRING *name=NULL);
void mysql_stmt_execute(THD *thd, char *packet, uint packet_length);
void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name);
void mysql_stmt_fetch(THD *thd, char *packet, uint packet_length);
void mysql_stmt_free(THD *thd, char *packet);
void mysql_stmt_reset(THD *thd, char *packet);
void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length);

View file

@ -487,6 +487,7 @@ void Protocol::init(THD *thd_arg)
flag Bit mask with the following functions:
1 send number of rows
2 send default values
4 don't write eof packet
DESCRIPTION
Sum fields has table name empty and field_name.
@ -497,7 +498,7 @@ void Protocol::init(THD *thd_arg)
*/
#ifndef EMBEDDED_LIBRARY
bool Protocol::send_fields(List<Item> *list, uint flag)
bool Protocol::send_fields(List<Item> *list, uint flags)
{
List_iterator_fast<Item> it(*list);
Item *item;
@ -508,7 +509,7 @@ bool Protocol::send_fields(List<Item> *list, uint flag)
CHARSET_INFO *thd_charset= thd->variables.character_set_results;
DBUG_ENTER("send_fields");
if (flag & 1)
if (flags & SEND_NUM_ROWS)
{ // Packet with number of elements
char *pos=net_store_length(buff, (uint) list->elements);
(void) my_net_write(&thd->net, buff,(uint) (pos-buff));
@ -594,7 +595,7 @@ bool Protocol::send_fields(List<Item> *list, uint flag)
}
}
local_packet->length((uint) (pos - local_packet->ptr()));
if (flag & 2)
if (flags & SEND_DEFAULTS)
item->send(&prot, &tmp); // Send default value
if (prot.write())
break; /* purecov: inspected */
@ -603,7 +604,8 @@ bool Protocol::send_fields(List<Item> *list, uint flag)
#endif
}
my_net_write(&thd->net, eof_buff, 1);
if (flags & SEND_EOF)
my_net_write(&thd->net, eof_buff, 1);
DBUG_RETURN(prepare_for_send(list));
err:
@ -962,12 +964,6 @@ void Protocol_prep::prepare_for_resend()
bool Protocol_prep::store(const char *from, uint length, CHARSET_INFO *fromcs)
{
CHARSET_INFO *tocs= thd->variables.character_set_results;
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_DECIMAL ||
(field_types[field_pos] >= MYSQL_TYPE_ENUM &&
field_types[field_pos] <= MYSQL_TYPE_GEOMETRY));
#endif
field_pos++;
return store_string_aux(from, length, fromcs, tocs);
}
@ -975,12 +971,6 @@ bool Protocol_prep::store(const char *from, uint length, CHARSET_INFO *fromcs)
bool Protocol_prep::store(const char *from,uint length,
CHARSET_INFO *fromcs, CHARSET_INFO *tocs)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_DECIMAL ||
(field_types[field_pos] >= MYSQL_TYPE_ENUM &&
field_types[field_pos] <= MYSQL_TYPE_GEOMETRY));
#endif
field_pos++;
return store_string_aux(from, length, fromcs, tocs);
}
@ -998,10 +988,6 @@ bool Protocol_prep::store_null()
bool Protocol_prep::store_tiny(longlong from)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_TINY);
#endif
char buff[1];
field_pos++;
buff[0]= (uchar) from;
@ -1011,11 +997,6 @@ bool Protocol_prep::store_tiny(longlong from)
bool Protocol_prep::store_short(longlong from)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_SHORT ||
field_types[field_pos] == MYSQL_TYPE_YEAR);
#endif
field_pos++;
char *to= packet->prep_append(2, PACKET_BUFFER_EXTRA_ALLOC);
if (!to)
@ -1027,11 +1008,6 @@ bool Protocol_prep::store_short(longlong from)
bool Protocol_prep::store_long(longlong from)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_INT24 ||
field_types[field_pos] == MYSQL_TYPE_LONG);
#endif
field_pos++;
char *to= packet->prep_append(4, PACKET_BUFFER_EXTRA_ALLOC);
if (!to)
@ -1043,10 +1019,6 @@ bool Protocol_prep::store_long(longlong from)
bool Protocol_prep::store_longlong(longlong from, bool unsigned_flag)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_LONGLONG);
#endif
field_pos++;
char *to= packet->prep_append(8, PACKET_BUFFER_EXTRA_ALLOC);
if (!to)
@ -1058,10 +1030,6 @@ bool Protocol_prep::store_longlong(longlong from, bool unsigned_flag)
bool Protocol_prep::store(float from, uint32 decimals, String *buffer)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_FLOAT);
#endif
field_pos++;
char *to= packet->prep_append(4, PACKET_BUFFER_EXTRA_ALLOC);
if (!to)
@ -1073,10 +1041,6 @@ bool Protocol_prep::store(float from, uint32 decimals, String *buffer)
bool Protocol_prep::store(double from, uint32 decimals, String *buffer)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_DOUBLE);
#endif
field_pos++;
char *to= packet->prep_append(8, PACKET_BUFFER_EXTRA_ALLOC);
if (!to)
@ -1100,12 +1064,6 @@ bool Protocol_prep::store(Field *field)
bool Protocol_prep::store(TIME *tm)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_DATETIME ||
field_types[field_pos] == MYSQL_TYPE_DATE ||
field_types[field_pos] == MYSQL_TYPE_TIMESTAMP);
#endif
char buff[12],*pos;
uint length;
field_pos++;
@ -1140,10 +1098,6 @@ bool Protocol_prep::store_date(TIME *tm)
bool Protocol_prep::store_time(TIME *tm)
{
#ifndef DEBUG_OFF
DBUG_ASSERT(field_types == 0 ||
field_types[field_pos] == MYSQL_TYPE_TIME);
#endif
char buff[13], *pos;
uint length;
field_pos++;

View file

@ -50,7 +50,12 @@ public:
Protocol(THD *thd_arg) { init(thd_arg); }
virtual ~Protocol() {}
void init(THD* thd_arg);
virtual bool send_fields(List<Item> *list, uint flag);
static const uint SEND_NUM_ROWS= 1;
static const uint SEND_DEFAULTS= 2;
static const uint SEND_EOF= 4;
virtual bool send_fields(List<Item> *list, uint flags);
bool send_records_num(List<Item> *list, ulonglong records);
bool store(I_List<i_string> *str_list);
bool store(const char *from, CHARSET_INFO *cs);
@ -163,7 +168,7 @@ public:
prev_record= &data;
return Protocol_simple::prepare_for_send(item_list);
}
bool send_fields(List<Item> *list, uint flag);
bool send_fields(List<Item> *list, uint flags);
bool write();
uint get_field_count() { return field_count; }
};

View file

@ -26,7 +26,7 @@
#include "mysql_priv.h"
#include <mysql.h>
bool Protocol_cursor::send_fields(List<Item> *list, uint flag)
bool Protocol_cursor::send_fields(List<Item> *list, uint flags)
{
List_iterator_fast<Item> it(*list);
Item *item;
@ -67,7 +67,7 @@ bool Protocol_cursor::send_fields(List<Item> *list, uint flag)
if (INTERNAL_NUM_FIELD(client_field))
client_field->flags|= NUM_FLAG;
if (flag & 2)
if (flags & Protocol::SEND_DEFAULTS)
{
char buff[80];
String tmp(buff, sizeof(buff), default_charset_info), *res;

View file

@ -461,7 +461,8 @@ int show_new_master(THD* thd)
field_list.push_back(new Item_empty_string("Log_name", 20));
field_list.push_back(new Item_return_int("Log_pos", 10,
MYSQL_TYPE_LONGLONG));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(-1);
protocol->prepare_for_resend();
protocol->store(lex_mi->log_file_name, &my_charset_bin);
@ -651,7 +652,8 @@ int show_slave_hosts(THD* thd)
field_list.push_back(new Item_return_int("Master_id", 10,
MYSQL_TYPE_LONG));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(-1);
pthread_mutex_lock(&LOCK_slave_list);

View file

@ -2335,7 +2335,8 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
MYSQL_TYPE_LONGLONG));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(-1);
if (mi->host[0])

View file

@ -563,7 +563,8 @@ db_show_routine_status(THD *thd, int type, const char *wild)
}
}
/* Print header */
if (thd->protocol->send_fields(&field_list,1))
if (thd->protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF))
{
res= SP_INTERNAL_ERROR;
goto err_case;

View file

@ -898,7 +898,8 @@ sp_head::show_create_procedure(THD *thd)
// 1024 is for not to confuse old clients
field_list.push_back(new Item_empty_string("Create Procedure",
max(buffer.length(), 1024)));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF))
{
res= 1;
goto done;
@ -964,7 +965,8 @@ sp_head::show_create_function(THD *thd)
field_list.push_back(new Item_empty_string("sql_mode", sql_mode_len));
field_list.push_back(new Item_empty_string("Create Function",
max(buffer.length(),1024)));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
{
res= 1;
goto done;

View file

@ -3163,7 +3163,8 @@ int mysql_show_grants(THD *thd,LEX_USER *lex_user)
strxmov(buff,"Grants for ",lex_user->user.str,"@",
lex_user->host.str,NullS);
field_list.push_back(field);
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(-1);
rw_wrlock(&LOCK_grant);

View file

@ -675,7 +675,8 @@ int THD::send_explain_fields(select_result *result)
item->maybe_null=1;
field_list.push_back(new Item_return_int("rows",10, MYSQL_TYPE_LONGLONG));
field_list.push_back(new Item_empty_string("Extra",255));
return (result->send_fields(field_list,1));
return (result->send_fields(field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
}
#ifdef SIGNAL_WITH_VIO_CLOSE
@ -722,9 +723,9 @@ sql_exchange::sql_exchange(char *name,bool flag)
escaped= &default_escaped;
}
bool select_send::send_fields(List<Item> &list,uint flag)
bool select_send::send_fields(List<Item> &list, uint flags)
{
return thd->protocol->send_fields(&list,flag);
return thd->protocol->send_fields(&list, flags);
}
/* Send data to client. Returns 0 if ok */
@ -1354,7 +1355,8 @@ Statement::Statement(THD *thd)
allow_sum_func(0),
lex(&main_lex),
query(0),
query_length(0)
query_length(0),
cursor(0)
{
name.str= NULL;
}
@ -1372,7 +1374,8 @@ Statement::Statement()
allow_sum_func(0), /* initialized later */
lex(&main_lex),
query(0), /* these two are set */
query_length(0) /* in alloc_query() */
query_length(0), /* in alloc_query() */
cursor(0)
{
}
@ -1391,6 +1394,7 @@ void Statement::set_statement(Statement *stmt)
lex= stmt->lex;
query= stmt->query;
query_length= stmt->query_length;
cursor= stmt->cursor;
}

View file

@ -487,6 +487,9 @@ public:
void set_item_arena(Item_arena *set);
};
class Cursor;
/*
State of a single command executed against this connection.
One connection can contain a lot of simultaneously running statements,
@ -543,6 +546,7 @@ public:
*/
char *query;
uint32 query_length; // current query length
Cursor *cursor;
public:
/* We build without RTTI, so dynamic_cast can't be used. */
@ -1054,6 +1058,7 @@ public:
{
DBUG_ASSERT(current_arena!=0);
cleanup_items(current_arena->free_list);
/* no need to reset free_list as it won't be used anymore */
free_items(free_list);
close_thread_tables(this); // to close derived tables
free_root(&mem_root, MYF(0));
@ -1108,7 +1113,7 @@ public:
unit= u;
return 0;
}
virtual bool send_fields(List<Item> &list,uint flag)=0;
virtual bool send_fields(List<Item> &list, uint flags)=0;
virtual bool send_data(List<Item> &items)=0;
virtual bool initialize_tables (JOIN *join=0) { return 0; }
virtual void send_error(uint errcode,const char *err);
@ -1120,7 +1125,7 @@ public:
class select_send :public select_result {
public:
select_send() {}
bool send_fields(List<Item> &list,uint flag);
bool send_fields(List<Item> &list, uint flags);
bool send_data(List<Item> &items);
bool send_eof();
};
@ -1138,7 +1143,7 @@ public:
select_to_file(sql_exchange *ex) :exchange(ex), file(-1),row_count(0L)
{ path[0]=0; }
~select_to_file();
bool send_fields(List<Item> &list, uint flag) { return 0; }
bool send_fields(List<Item> &list, uint flags) { return 0; }
void send_error(uint errcode,const char *err);
};
@ -1185,8 +1190,7 @@ class select_insert :public select_result {
}
~select_insert();
int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
bool send_fields(List<Item> &list, uint flag)
{ return 0; }
bool send_fields(List<Item> &list, uint flags) { return 0; }
bool send_data(List<Item> &items);
void send_error(uint errcode,const char *err);
bool send_eof();
@ -1273,8 +1277,7 @@ class select_union :public select_result {
select_union(TABLE *table_par);
~select_union();
int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
bool send_fields(List<Item> &list, uint flag)
{ return 0; }
bool send_fields(List<Item> &list, uint flags) { return 0; }
bool send_data(List<Item> &items);
bool send_eof();
bool flush();
@ -1288,7 +1291,7 @@ protected:
Item_subselect *item;
public:
select_subselect(Item_subselect *item);
bool send_fields(List<Item> &list, uint flag) { return 0; };
bool send_fields(List<Item> &list, uint flags) { return 0; }
bool send_data(List<Item> &items)=0;
bool send_eof() { return 0; };
@ -1457,8 +1460,7 @@ public:
multi_delete(THD *thd, TABLE_LIST *dt, uint num_of_tables);
~multi_delete();
int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
bool send_fields(List<Item> &list,
uint flag) { return 0; }
bool send_fields(List<Item> &list, uint flags) { return 0; }
bool send_data(List<Item> &items);
bool initialize_tables (JOIN *join);
void send_error(uint errcode,const char *err);
@ -1486,7 +1488,7 @@ public:
List<Item> *values, enum_duplicates handle_duplicates);
~multi_update();
int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
bool send_fields(List<Item> &list, uint flag) { return 0; }
bool send_fields(List<Item> &list, uint flags) { return 0; }
bool send_data(List<Item> &items);
bool initialize_tables (JOIN *join);
void send_error(uint errcode,const char *err);
@ -1515,7 +1517,7 @@ public:
select_dumpvar(void) { var_list.empty(); local_vars.empty(); vars.empty(); row_count=0;}
~select_dumpvar() {}
int prepare(List<Item> &list, SELECT_LEX_UNIT *u);
bool send_fields(List<Item> &list, uint flag) {return 0;}
bool send_fields(List<Item> &list, uint flags) { return 0; }
bool send_data(List<Item> &items);
bool send_eof();
};

View file

@ -185,7 +185,8 @@ my_bool mysqld_show_warnings(THD *thd, ulong levels_to_show)
field_list.push_back(new Item_return_int("Code",4, MYSQL_TYPE_LONG));
field_list.push_back(new Item_empty_string("Message",MYSQL_ERRMSG_SIZE));
if (thd->protocol->send_fields(&field_list,1))
if (thd->protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
MYSQL_ERROR *err;

View file

@ -252,7 +252,7 @@ int mysql_ha_read(THD *thd, TABLE_LIST *tables,
insert_fields(thd, tables, tables->db, tables->alias, &it, 0);
select_limit+=offset_limit;
protocol->send_fields(&list,1);
protocol->send_fields(&list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
HANDLER_TABLES_HACK(thd);
MYSQL_LOCK *lock=mysql_lock_tables(thd,&tables->table,1);

View file

@ -426,7 +426,8 @@ int send_answer_1(Protocol *protocol, String *s1, String *s2, String *s3)
field_list.push_back(new Item_empty_string("description",1000));
field_list.push_back(new Item_empty_string("example",1000));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
protocol->prepare_for_resend();
@ -468,7 +469,8 @@ int send_header_2(Protocol *protocol, bool for_category)
field_list.push_back(new Item_empty_string("source_category_name",64));
field_list.push_back(new Item_empty_string("name",64));
field_list.push_back(new Item_empty_string("is_it_category",1));
DBUG_RETURN(protocol->send_fields(&field_list,1));
DBUG_RETURN(protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF));
}
/*

View file

@ -1452,6 +1452,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
mysql_stmt_execute(thd, packet, packet_length);
break;
}
case COM_FETCH:
{
mysql_stmt_fetch(thd, packet, packet_length);
break;
}
case COM_LONG_DATA:
{
mysql_stmt_get_longdata(thd, packet, packet_length);
@ -1545,7 +1550,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
send_error(thd,ER_NO_DB_ERROR);
break;
}
thd->free_list=0;
pend= strend(packet);
thd->convert_string(&conv_name, system_charset_info,
packet, (uint) (pend-packet), thd->charset());
@ -1567,6 +1571,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
break;
mysqld_list_fields(thd,&table_list,fields);
free_items(thd->free_list);
thd->free_list=0; /* free_list should never point to garbage */
break;
}
#endif
@ -4443,6 +4448,7 @@ void mysql_parse(THD *thd, char *inBuf, uint length)
}
thd->proc_info="freeing items";
free_items(thd->free_list); /* Free strings used by items */
thd->free_list= 0; /* free_list should never point to garbage */
lex_end(lex);
}
DBUG_VOID_RETURN;
@ -4470,6 +4476,7 @@ bool mysql_test_parse_for_slave(THD *thd, char *inBuf, uint length)
all_tables_not_ok(thd,(TABLE_LIST*) lex->select_lex.table_list.first))
error= 1; /* Ignore question */
free_items(thd->free_list); /* Free strings used by items */
thd->free_list= 0; /* free_list should never point to garbage */
lex_end(lex);
DBUG_RETURN(error);

View file

@ -75,6 +75,8 @@ Long data handling:
#ifdef EMBEDDED_LIBRARY
/* include MYSQL_BIND headers */
#include <mysql.h>
#else
#include <mysql_com.h>
#endif
/******************************************************************************
@ -107,7 +109,7 @@ public:
};
static void execute_stmt(THD *thd, Prepared_statement *stmt,
String *expanded_query, bool set_context);
String *expanded_query);
/******************************************************************************
Implementation
@ -166,7 +168,8 @@ static bool send_prep_stmt(Prepared_statement *stmt, uint columns)
return my_net_write(net, buff, sizeof(buff)) ||
(stmt->param_count &&
stmt->thd->protocol_simple.send_fields((List<Item> *)
&stmt->lex->param_list, 0)) ||
&stmt->lex->param_list,
Protocol::SEND_EOF)) ||
net_flush(net);
return 0;
}
@ -1098,7 +1101,8 @@ static int mysql_test_select(Prepared_statement *stmt,
if (!text_protocol)
{
if (send_prep_stmt(stmt, lex->select_lex.item_list.elements) ||
thd->protocol_simple.send_fields(&lex->select_lex.item_list, 0)
thd->protocol_simple.send_fields(&lex->select_lex.item_list,
Protocol::SEND_EOF)
#ifndef EMBEDDED_LIBRARY
|| net_flush(&thd->net)
#endif
@ -1476,6 +1480,12 @@ static int send_prepare_results(Prepared_statement *stmt, bool text_protocol)
case SQLCOM_SHOW_GRANTS:
case SQLCOM_DROP_TABLE:
case SQLCOM_RENAME_TABLE:
case SQLCOM_ALTER_TABLE:
case SQLCOM_COMMIT:
case SQLCOM_CREATE_INDEX:
case SQLCOM_DROP_INDEX:
case SQLCOM_ROLLBACK:
case SQLCOM_TRUNCATE:
break;
default:
@ -1756,6 +1766,7 @@ static void reset_stmt_params(Prepared_statement *stmt)
void mysql_stmt_execute(THD *thd, char *packet, uint packet_length)
{
ulong stmt_id= uint4korr(packet);
ulong flags= (ulong) ((uchar) packet[4]);
/*
Query text for binary log, or empty string if the query is not put into
binary log.
@ -1782,6 +1793,28 @@ void mysql_stmt_execute(THD *thd, char *packet, uint packet_length)
DBUG_VOID_RETURN;
}
if (flags & (ulong) CURSOR_TYPE_READ_ONLY)
{
if (stmt->lex->result)
{
/*
If lex->result is set in the parser, this is not a SELECT
statement: we can't open a cursor for it.
*/
flags= 0;
}
else
{
if (!stmt->cursor &&
!(stmt->cursor= new (&stmt->mem_root) Cursor()))
{
send_error(thd, ER_OUT_OF_RESOURCES);
DBUG_VOID_RETURN;
}
/* If lex->result is set, mysql_execute_command will use it */
stmt->lex->result= &stmt->cursor->result;
}
}
#ifndef EMBEDDED_LIBRARY
if (stmt->param_count)
{
@ -1800,16 +1833,55 @@ void mysql_stmt_execute(THD *thd, char *packet, uint packet_length)
if (stmt->param_count && stmt->set_params_data(stmt, &expanded_query))
goto set_params_data_err;
#endif
thd->stmt_backup.set_statement(thd);
thd->set_statement(stmt);
thd->current_arena= stmt;
reset_stmt_for_execute(thd, stmt->lex);
/* From now cursors assume that thd->mem_root is clean */
if (expanded_query.length() &&
alloc_query(thd, (char *)expanded_query.ptr(),
expanded_query.length()+1))
{
my_error(ER_OUTOFMEMORY, 0, expanded_query.length());
goto err;
}
thd->protocol= &thd->protocol_prep; // Switch to binary protocol
execute_stmt(thd, stmt, &expanded_query, true);
if (!(specialflag & SPECIAL_NO_PRIOR))
my_pthread_setprio(pthread_self(),QUERY_PRIOR);
mysql_execute_command(thd);
if (!(specialflag & SPECIAL_NO_PRIOR))
my_pthread_setprio(pthread_self(), WAIT_PRIOR);
thd->protocol= &thd->protocol_simple; // Use normal protocol
if (flags & (ulong) CURSOR_TYPE_READ_ONLY)
{
if (stmt->cursor->is_open())
stmt->cursor->init_from_thd(thd);
thd->set_item_arena(&thd->stmt_backup);
}
else
{
thd->lex->unit.cleanup();
cleanup_items(stmt->free_list);
reset_stmt_params(stmt);
close_thread_tables(thd); /* to close derived tables */
/*
Free items that were created during this execution of the PS by
query optimizer.
*/
free_items(thd->free_list);
thd->free_list= 0;
}
thd->set_statement(&thd->stmt_backup);
thd->current_arena= 0;
DBUG_VOID_RETURN;
set_params_data_err:
reset_stmt_params(stmt);
my_error(ER_WRONG_ARGUMENTS, MYF(0), "mysql_stmt_execute");
err:
send_error(thd);
DBUG_VOID_RETURN;
}
@ -1845,7 +1917,6 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name)
DBUG_VOID_RETURN;
}
thd->free_list= NULL;
thd->stmt_backup.set_statement(thd);
thd->set_statement(stmt);
if (stmt->set_params_from_vars(stmt,
@ -1856,7 +1927,7 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name)
send_error(thd);
}
thd->current_arena= stmt;
execute_stmt(thd, stmt, &expanded_query, false);
execute_stmt(thd, stmt, &expanded_query);
thd->current_arena= 0;
DBUG_VOID_RETURN;
}
@ -1872,20 +1943,13 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name)
placeholders replaced with actual values. Otherwise empty
string.
NOTES
Caller must set parameter values and thd::protocol.
thd->free_list is assumed to be garbage.
Caller must set parameter values and thd::protocol.
*/
static void execute_stmt(THD *thd, Prepared_statement *stmt,
String *expanded_query, bool set_context)
String *expanded_query)
{
DBUG_ENTER("execute_stmt");
if (set_context)
{
thd->free_list= NULL;
thd->stmt_backup.set_statement(thd);
thd->set_statement(stmt);
}
reset_stmt_for_execute(thd, stmt->lex);
if (expanded_query->length() &&
@ -1899,21 +1963,76 @@ static void execute_stmt(THD *thd, Prepared_statement *stmt,
if (!(specialflag & SPECIAL_NO_PRIOR))
my_pthread_setprio(pthread_self(),QUERY_PRIOR);
mysql_execute_command(thd);
thd->lex->unit.cleanup();
if (!(specialflag & SPECIAL_NO_PRIOR))
my_pthread_setprio(pthread_self(), WAIT_PRIOR);
thd->lex->unit.cleanup();
cleanup_items(stmt->free_list);
reset_stmt_params(stmt);
close_thread_tables(thd); // to close derived tables
thd->set_statement(&thd->stmt_backup);
/* Free Items that were created during this execution of the PS. */
free_items(thd->free_list);
/*
In the rest of prepared statements code we assume that free_list
never points to garbage: keep this predicate true.
*/
thd->free_list= 0;
DBUG_VOID_RETURN;
}
/*
COM_FETCH handler: fetches requested amount of rows from cursor
SYNOPSIS
*/
void mysql_stmt_fetch(THD *thd, char *packet, uint packet_length)
{
/* assume there is always place for 8-16 bytes */
ulong stmt_id= uint4korr(packet);
ulong num_rows= uint4korr(packet+=4);
Statement *stmt;
int error;
DBUG_ENTER("mysql_stmt_fetch");
if (!(stmt= thd->stmt_map.find(stmt_id)) ||
!stmt->cursor ||
!stmt->cursor->is_open())
{
my_error(ER_UNKNOWN_STMT_HANDLER, MYF(0), stmt_id, "fetch");
send_error(thd);
DBUG_VOID_RETURN;
}
thd->stmt_backup.set_statement(thd);
thd->stmt_backup.set_item_arena(thd);
thd->set_statement(stmt);
stmt->cursor->init_thd(thd);
if (!(specialflag & SPECIAL_NO_PRIOR))
my_pthread_setprio(pthread_self(), QUERY_PRIOR);
thd->protocol= &thd->protocol_prep; // Switch to binary protocol
error= stmt->cursor->fetch(num_rows);
thd->protocol= &thd->protocol_simple; // Use normal protocol
if (!(specialflag & SPECIAL_NO_PRIOR))
my_pthread_setprio(pthread_self(), WAIT_PRIOR);
/* Restore THD state */
stmt->cursor->reset_thd(thd);
thd->set_statement(&thd->stmt_backup);
thd->set_item_arena(&thd->stmt_backup);
if (error && error != -4)
send_error(thd, ER_OUT_OF_RESOURCES);
DBUG_VOID_RETURN;
}
/*
Reset a prepared statement in case there was a recoverable error.
SYNOPSIS
@ -2084,8 +2203,11 @@ void Prepared_statement::setup_set_params()
}
}
Prepared_statement::~Prepared_statement()
{
if (cursor)
cursor->Cursor::~Cursor();
free_items(free_list);
}

View file

@ -1297,7 +1297,8 @@ int show_binlog_events(THD* thd)
Format_description_log_event(3); /* MySQL 4.0 by default */
Log_event::init_show_field_list(&field_list);
if (protocol-> send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(-1);
if (mysql_bin_log.is_open())
@ -1426,7 +1427,8 @@ int show_binlog_info(THD* thd)
field_list.push_back(new Item_empty_string("Binlog_Do_DB",255));
field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(-1);
protocol->prepare_for_resend();
@ -1476,7 +1478,8 @@ int show_binlogs(THD* thd)
}
field_list.push_back(new Item_empty_string("Log_name", 255));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
mysql_bin_log.lock_index();
index_file=mysql_bin_log.get_index_file();

View file

@ -1105,7 +1105,8 @@ JOIN::exec()
(zero_result_cause?zero_result_cause:"No tables used"));
else
{
result->send_fields(fields_list,1);
result->send_fields(fields_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
if (!having || having->val_int())
{
if (do_send_rows && (procedure ? (procedure->send_row(fields_list) ||
@ -1512,12 +1513,45 @@ JOIN::exec()
DBUG_VOID_RETURN;
}
}
/* XXX: When can we have here thd->net.report_error not zero? */
if (thd->net.report_error)
{
error= thd->net.report_error;
DBUG_VOID_RETURN;
}
curr_join->having= curr_join->tmp_having;
thd->proc_info="Sending data";
error= thd->net.report_error ||
do_select(curr_join, curr_fields_list, NULL, procedure);
thd->limit_found_rows= curr_join->send_records;
thd->examined_row_count= curr_join->examined_rows;
curr_join->fields= curr_fields_list;
curr_join->procedure= procedure;
if (unit == &thd->lex->unit &&
(unit->fake_select_lex == 0 || select_lex == unit->fake_select_lex) &&
thd->cursor && tables != const_tables)
{
/*
We are here if this is JOIN::exec for the last select of the main unit
and the client requested to open a cursor.
We check that not all tables are constant because this case is not
handled by do_select() separately, and this case is not implemented
for cursors yet.
*/
DBUG_ASSERT(error == 0);
/*
curr_join is used only for reusable joins - that is,
to perform SELECT for each outer row (like in subselects).
This join is main, so we know for sure that curr_join == join.
*/
DBUG_ASSERT(curr_join == this);
/* Open cursor for the last join sweep */
error= thd->cursor->open(this);
}
else
{
thd->proc_info="Sending data";
error= do_select(curr_join, curr_fields_list, NULL, procedure);
thd->limit_found_rows= curr_join->send_records;
thd->examined_row_count= curr_join->examined_rows;
}
DBUG_VOID_RETURN;
}
@ -1566,6 +1600,306 @@ JOIN::cleanup()
}
/************************* Cursor ******************************************/
void
Cursor::init_from_thd(THD *thd)
{
/*
We need to save and reset thd->mem_root, otherwise it'll be freed
later in mysql_parse.
*/
mem_root= thd->mem_root;
init_sql_alloc(&thd->mem_root,
thd->variables.query_alloc_block_size,
thd->variables.query_prealloc_size);
/*
The same is true for open tables and lock: save tables and zero THD
pointers to prevent table close in close_thread_tables (This is a part
of the temporary solution to make cursors work with minimal changes to
the current source base).
*/
derived_tables= thd->derived_tables;
open_tables= thd->open_tables;
lock= thd->lock;
query_id= thd->query_id;
free_list= thd->free_list;
reset_thd(thd);
/*
XXX: thd->locked_tables is not changed.
What problems can we have with it if cursor is open?
*/
/*
TODO: grab thd->free_list here?
*/
}
void
Cursor::init_thd(THD *thd)
{
thd->mem_root= mem_root;
DBUG_ASSERT(thd->derived_tables == 0);
thd->derived_tables= derived_tables;
DBUG_ASSERT(thd->open_tables == 0);
thd->open_tables= open_tables;
DBUG_ASSERT(thd->lock== 0);
thd->lock= lock;
thd->query_id= query_id;
thd->free_list= free_list;
}
void
Cursor::reset_thd(THD *thd)
{
thd->derived_tables= 0;
thd->open_tables= 0;
thd->lock= 0;
thd->free_list= 0;
}
int
Cursor::open(JOIN *join_arg)
{
join= join_arg;
THD *thd= join->thd;
/* First non-constant table */
JOIN_TAB *join_tab= join->join_tab + join->const_tables;
/*
Send fields description to the client; server_status is sent
in 'EOF' packet, which ends send_fields().
*/
thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
join->result->send_fields(*join->fields, Protocol::SEND_NUM_ROWS);
::send_eof(thd);
thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
/* Prepare JOIN for reading rows. */
Next_select_func end_select= join->sort_and_group || join->procedure &&
join->procedure->flags & PROC_GROUP ?
end_send_group : end_send;
join->join_tab[join->tables-1].next_select= end_select;
join->send_records= 0;
join->fetch_limit= join->unit->offset_limit_cnt;
/* Disable JOIN CACHE as it is not working with cursors yet */
for (JOIN_TAB *tab= join_tab; tab != join->join_tab + join->tables - 1; ++tab)
{
if (tab->next_select == sub_select_cache)
tab->next_select= sub_select;
}
DBUG_ASSERT(join_tab->table->reginfo.not_exists_optimize == 0);
DBUG_ASSERT(join_tab->not_used_in_distinct == 0);
/*
null_row is set only if row not found and it's outer join: should never
happen for the first table in join_tab list
*/
DBUG_ASSERT(join_tab->table->null_row == 0);
return join_tab->read_first_record(join_tab);
}
/*
DESCRIPTION
Fetch next num_rows rows from the cursor and sent them to the client
PRECONDITION:
Cursor is open
RETURN VALUES:
-4 there are more rows, send_eof sent to the client
0 no more rows, send_eof was sent to the client, cursor is closed
other fatal fetch error, cursor is closed (error is not reported)
*/
int
Cursor::fetch(ulong num_rows)
{
THD *thd= join->thd;
JOIN_TAB *join_tab= join->join_tab + join->const_tables;;
COND *on_expr= join_tab->on_expr;
COND *select_cond= join_tab->select_cond;
READ_RECORD *info= &join_tab->read_record;
int error= 0;
join->fetch_limit+= num_rows;
/*
Run while there are new rows in the first table;
For each row, satisfying ON and WHERE clauses (those parts of them which
can be evaluated early), call next_select.
*/
do
{
int no_more_rows;
join->examined_rows++;
if (thd->killed) /* Aborted by user */
{
my_error(ER_SERVER_SHUTDOWN,MYF(0));
return -1;
}
if (on_expr == 0 || on_expr->val_int())
{
if (select_cond == 0 || select_cond->val_int())
{
/*
TODO: call table->unlock_row() to unlock row failed selection,
when this feature will be used.
*/
error= join_tab->next_select(join, join_tab + 1, 0);
DBUG_ASSERT(error <= 0);
if (error)
{
/* real error or LIMIT/FETCH LIMIT worked */
if (error == -4)
{
/*
FETCH LIMIT, read ahead one row, and close cursor
if there is no more rows XXX: to be fixed to support
non-equi-joins!
*/
if ((no_more_rows= info->read_record(info)))
error= no_more_rows > 0 ? -1: 0;
}
break;
}
}
}
/* read next row; break loop if there was an error */
if ((no_more_rows= info->read_record(info)))
{
if (no_more_rows > 0)
error= -1;
else
{
enum { END_OF_RECORDS= 1 };
error= join_tab->next_select(join, join_tab+1, (int) END_OF_RECORDS);
}
break;
}
}
while (thd->net.report_error == 0);
if (thd->net.report_error)
error= -1;
switch (error) {
/* Fetch limit worked, possibly more rows are there */
case -4:
if (thd->transaction.all.innobase_tid)
ha_release_temporary_latches(thd);
thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
::send_eof(thd);
thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
/* save references to memory, allocated during fetch */
mem_root= thd->mem_root;
free_list= thd->free_list;
break;
/* Limit clause worked: this is the same as 'no more rows' */
case -3: /* LIMIT clause worked */
error= 0;
/* fallthrough */
case 0: /* No more rows */
if (thd->transaction.all.innobase_tid)
ha_release_temporary_latches(thd);
close();
thd->server_status|= SERVER_STATUS_LAST_ROW_SENT;
::send_eof(thd);
thd->server_status&= ~SERVER_STATUS_LAST_ROW_SENT;
join= 0;
unit= 0;
free_items(thd->free_list);
thd->free_list= free_list= 0;
/*
Must be last, as some memory might be allocated for free purposes,
like in free_tmp_table() (TODO: fix this issue)
*/
mem_root= thd->mem_root;
free_root(&mem_root, MYF(0));
break;
default:
close();
join= 0;
unit= 0;
free_items(thd->free_list);
thd->free_list= free_list= 0;
/*
Must be last, as some memory might be allocated for free purposes,
like in free_tmp_table() (TODO: fix this issue)
*/
mem_root= thd->mem_root;
free_root(&mem_root, MYF(0));
break;
}
return error;
}
void
Cursor::close()
{
THD *thd= join->thd;
join->join_free(0);
if (unit)
{
/* In case of UNIONs JOIN is freed inside unit->cleanup() */
unit->cleanup();
}
else
{
join->cleanup();
delete join;
}
/* XXX: Another hack: closing tables used in the cursor */
{
DBUG_ASSERT(lock || open_tables || derived_tables);
TABLE *tmp_open_tables= thd->open_tables;
TABLE *tmp_derived_tables= thd->derived_tables;
MYSQL_LOCK *tmp_lock= thd->lock;
thd->open_tables= open_tables;
thd->derived_tables= derived_tables;
thd->lock= lock;
close_thread_tables(thd);
thd->open_tables= tmp_derived_tables;
thd->derived_tables= tmp_derived_tables;
thd->lock= tmp_lock;
}
}
Cursor::~Cursor()
{
if (is_open())
close();
free_items(free_list);
/*
Must be last, as some memory might be allocated for free purposes,
like in free_tmp_table() (TODO: fix this issue)
*/
free_root(&mem_root, MYF(0));
}
/*********************************************************************/
int
mysql_select(THD *thd, Item ***rref_pointer_array,
TABLE_LIST *tables, uint wild_num, List<Item> &fields,
@ -1637,6 +1971,16 @@ mysql_select(THD *thd, Item ***rref_pointer_array,
join->exec();
if (thd->cursor && thd->cursor->is_open())
{
/*
A cursor was opened for the last sweep in exec().
We are here only if this is mysql_select for top-level SELECT_LEX_UNIT
and there were no error.
*/
free_join= 0;
}
if (thd->lex->describe & DESCRIBE_EXTENDED)
{
select_lex->where= join->conds_history;
@ -5310,7 +5654,8 @@ return_zero_rows(JOIN *join, select_result *result,TABLE_LIST *tables,
if (having && having->val_int() == 0)
send_row=0;
}
if (!(result->send_fields(fields,1)))
if (!(result->send_fields(fields,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)))
{
if (send_row)
{
@ -7035,7 +7380,7 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure)
{
int error= 0;
JOIN_TAB *join_tab;
int (*end_select)(JOIN *, struct st_join_table *,bool);
Next_select_func end_select;
DBUG_ENTER("do_select");
join->procedure=procedure;
@ -7043,7 +7388,8 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure)
Tell the client how many fields there are in a row
*/
if (!table)
join->result->send_fields(*fields,1);
join->result->send_fields(*fields,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
else
{
VOID(table->file->extra(HA_EXTRA_WRITE_CACHE));
@ -8076,6 +8422,14 @@ end_send(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
}
DBUG_RETURN(-3); // Abort nicely
}
else if (join->send_records >= join->fetch_limit)
{
/*
There is a server side cursor and all rows for
this fetch request are sent.
*/
DBUG_RETURN(-4);
}
}
else
{
@ -8150,6 +8504,14 @@ end_send_group(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
join->do_send_rows=0;
join->unit->select_limit_cnt = HA_POS_ERROR;
}
else if (join->send_records >= join->fetch_limit)
{
/*
There is a server side cursor and all rows
for this fetch request are sent.
*/
DBUG_RETURN(-4);
}
}
}
else

View file

@ -81,6 +81,10 @@ enum join_type { JT_UNKNOWN,JT_SYSTEM,JT_CONST,JT_EQ_REF,JT_REF,JT_MAYBE_REF,
class JOIN;
typedef int (*Next_select_func)(JOIN *,struct st_join_table *,bool);
typedef int (*Read_record_func)(struct st_join_table *tab);
typedef struct st_join_table {
TABLE *table;
KEYUSE *keyuse; /* pointer to first used key */
@ -95,8 +99,8 @@ typedef struct st_join_table {
st_join_table *first_upper; /* first inner table for embedding outer join */
st_join_table *first_unmatched; /* used for optimization purposes only */
const char *info;
int (*read_first_record)(struct st_join_table *tab);
int (*next_select)(JOIN *,struct st_join_table *,bool);
Read_record_func read_first_record;
Next_select_func next_select;
READ_RECORD read_record;
double worst_seeks;
key_map const_keys; /* Keys with constant part */
@ -149,6 +153,16 @@ class JOIN :public Sql_alloc
bool do_send_rows;
table_map const_table_map,found_const_table_map,outer_join;
ha_rows send_records,found_records,examined_rows,row_limit, select_limit;
/*
Used to fetch no more than given amount of rows per one
fetch operation of server side cursor.
The value is checked in end_send and end_send_group in fashion, similar
to offset_limit_cnt:
- fetch_limit= HA_POS_ERROR if there is no cursor.
- when we open a cursor, we set fetch_limit to 0,
- on each fetch iteration we add num_rows to fetch to fetch_limit
*/
ha_rows fetch_limit;
POSITION positions[MAX_TABLES+1],best_positions[MAX_TABLES+1];
double best_read;
List<Item> *fields;
@ -239,6 +253,7 @@ class JOIN :public Sql_alloc
do_send_rows= 1;
send_records= 0;
found_records= 0;
fetch_limit= HA_POS_ERROR;
examined_rows= 0;
exec_tmp_table1= 0;
exec_tmp_table2= 0;
@ -319,6 +334,44 @@ class JOIN :public Sql_alloc
};
/*
Server-side cursor (now stands only for basic read-only cursor)
See class implementation in sql_select.cc
*/
class Cursor: public Sql_alloc, public Item_arena
{
JOIN *join;
SELECT_LEX_UNIT *unit;
TABLE *open_tables;
MYSQL_LOCK *lock;
TABLE *derived_tables;
/* List of items created during execution */
ulong query_id;
public:
select_send result;
/* Temporary implementation as now we replace THD state by value */
/* Save THD state into cursor */
void init_from_thd(THD *thd);
/* Restore THD from cursor to continue cursor execution */
void init_thd(THD *thd);
/* bzero cursor state in THD */
void reset_thd(THD *thd);
int open(JOIN *join);
int fetch(ulong num_rows);
void reset() { join= 0; }
bool is_open() const { return join != 0; }
void close();
void set_unit(SELECT_LEX_UNIT *unit_arg) { unit= unit_arg; }
Cursor() :join(0), unit(0) {}
~Cursor();
};
typedef struct st_select_check {
uint const_ref,reg_ref;
} SELECT_CHECK;

View file

@ -66,7 +66,8 @@ mysqld_show_dbs(THD *thd,const char *wild)
strxmov(end," (",wild,")",NullS);
field_list.push_back(field);
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
if (mysql_find_files(thd,&files,NullS,mysql_data_home,wild,1))
DBUG_RETURN(1);
@ -107,7 +108,8 @@ int mysqld_show_open_tables(THD *thd,const char *wild)
field_list.push_back(new Item_return_int("In_use", 1, MYSQL_TYPE_TINY));
field_list.push_back(new Item_return_int("Name_locked", 4, MYSQL_TYPE_TINY));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
if (!(open_list=list_open_tables(thd,wild)) && thd->is_fatal_error)
@ -160,7 +162,8 @@ int mysqld_show_tables(THD *thd,const char *db,const char *wild)
field_list.push_back(field);
if (show_type)
field_list.push_back(new Item_empty_string("table_type", 10));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
if (mysql_find_files(thd,&files,db,path,wild,0))
DBUG_RETURN(-1);
@ -208,7 +211,8 @@ int mysqld_show_storage_engines(THD *thd)
field_list.push_back(new Item_empty_string("Support",10));
field_list.push_back(new Item_empty_string("Comment",80));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
const char *default_type_name=
@ -282,7 +286,8 @@ int mysqld_show_privileges(THD *thd)
field_list.push_back(new Item_empty_string("Context",15));
field_list.push_back(new Item_empty_string("Comment",NAME_LEN));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
show_privileges_st *privilege= sys_privileges;
@ -357,7 +362,8 @@ int mysqld_show_column_types(THD *thd)
field_list.push_back(new Item_empty_string("Default",NAME_LEN));
field_list.push_back(new Item_empty_string("Comment",NAME_LEN));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
/* TODO: Change the loop to not use 'i' */
@ -530,7 +536,8 @@ int mysqld_extend_show_tables(THD *thd,const char *db,const char *wild)
item->maybe_null=1;
field_list.push_back(item=new Item_empty_string("Comment",80));
item->maybe_null=1;
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
if (mysql_find_files(thd,&files,db,path,wild,0))
@ -721,7 +728,7 @@ mysqld_show_fields(THD *thd, TABLE_LIST *table_list,const char *wild,
}
// Send first number of fields and records
if (protocol->send_records_num(&field_list, (ulonglong)file->records) ||
protocol->send_fields(&field_list,0))
protocol->send_fields(&field_list, Protocol::SEND_EOF))
DBUG_RETURN(1);
restore_record(table,default_values); // Get empty record
@ -859,7 +866,8 @@ mysqld_show_create(THD *thd, TABLE_LIST *table_list)
field_list.push_back(new Item_empty_string("Create Table",
max(buffer.length(),1024)));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
protocol->prepare_for_resend();
buffer.length(0);
@ -943,7 +951,8 @@ int mysqld_show_create_db(THD *thd, char *dbname,
field_list.push_back(new Item_empty_string("Database",NAME_LEN));
field_list.push_back(new Item_empty_string("Create Database",1024));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
protocol->prepare_for_resend();
@ -985,7 +994,8 @@ mysqld_show_logs(THD *thd)
field_list.push_back(new Item_empty_string("Type",10));
field_list.push_back(new Item_empty_string("Status",10));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
#ifdef HAVE_BERKELEY_DB
@ -1034,7 +1044,8 @@ mysqld_show_keys(THD *thd, TABLE_LIST *table_list)
field_list.push_back(new Item_empty_string("Comment",255));
item->maybe_null=1;
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
KEY *key_info=table->key_info;
@ -1121,7 +1132,8 @@ mysqld_list_fields(THD *thd, TABLE_LIST *table_list, const char *wild)
field_list.push_back(new Item_field(field));
}
restore_record(table,default_values); // Get empty record
if (thd->protocol->send_fields(&field_list,2))
if (thd->protocol->send_fields(&field_list, Protocol::SEND_DEFAULTS |
Protocol::SEND_EOF))
DBUG_VOID_RETURN;
net_flush(&thd->net);
DBUG_VOID_RETURN;
@ -1615,7 +1627,8 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
field->maybe_null=1;
field_list.push_back(field=new Item_empty_string("Info",max_query_length));
field->maybe_null=1;
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_VOID_RETURN;
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
@ -1751,7 +1764,8 @@ int mysqld_show_collations(THD *thd, const char *wild)
field_list.push_back(new Item_empty_string("Compiled",30));
field_list.push_back(new Item_return_int("Sortlen",3, FIELD_TYPE_SHORT));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
for ( cs= all_charsets ; cs < all_charsets+255 ; cs++ )
@ -1804,7 +1818,8 @@ int mysqld_show_charsets(THD *thd, const char *wild)
field_list.push_back(new Item_empty_string("Default collation",60));
field_list.push_back(new Item_return_int("Maxlen",3, FIELD_TYPE_SHORT));
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
for ( cs= all_charsets ; cs < all_charsets+255 ; cs++ )
@ -1838,7 +1853,8 @@ int mysqld_show(THD *thd, const char *wild, show_var_st *variables,
field_list.push_back(new Item_empty_string("Variable_name",30));
field_list.push_back(new Item_empty_string("Value",256));
if (protocol->send_fields(&field_list,1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1); /* purecov: inspected */
null_lex_str.str= 0; // For sys_var->value_ptr()
null_lex_str.length= 0;

View file

@ -1738,7 +1738,8 @@ static int mysql_admin_table(THD* thd, TABLE_LIST* tables,
item->maybe_null = 1;
field_list.push_back(item = new Item_empty_string("Msg_text", 255));
item->maybe_null = 1;
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(-1);
mysql_ha_close(thd, tables, /*dont_send_ok*/ 1, /*dont_lock*/ 1);
@ -3449,7 +3450,8 @@ int mysql_checksum_table(THD *thd, TABLE_LIST *tables, HA_CHECK_OPT *check_opt)
item->maybe_null= 1;
field_list.push_back(item=new Item_int("Checksum",(longlong) 1,21));
item->maybe_null= 1;
if (protocol->send_fields(&field_list, 1))
if (protocol->send_fields(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(-1);
for (table= tables; table; table= table->next_local)

View file

@ -31,7 +31,13 @@ int mysql_union(THD *thd, LEX *lex, select_result *result,
int res, res_cln;
if (!(res= unit->prepare(thd, result, SELECT_NO_UNLOCK)))
res= unit->exec();
res_cln= unit->cleanup();
if (res == 0 && thd->cursor && thd->cursor->is_open())
{
thd->cursor->set_unit(unit);
res_cln= 0;
}
else
res_cln= unit->cleanup();
DBUG_RETURN(res?res:res_cln);
}

View file

@ -35,7 +35,7 @@ INCLUDES = -I$(top_srcdir)/include $(openssl_includes)
LIBS = @CLIENT_LIBS@
LDADD = @CLIENT_EXTRA_LDFLAGS@ ../libmysql/libmysqlclient.la
client_test_LDADD= $(LDADD) $(CXXLDFLAGS)
client_test_SOURCES= client_test.c
client_test_SOURCES= client_test.cc
insert_test_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)
select_test_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)

View file

@ -693,6 +693,240 @@ static void client_use_result()
}
/*
Accepts arbitrary number of queries and runs them against the database.
Used to fill tables for each test.
*/
void fill_tables(const char **query_list, unsigned query_count)
{
int rc;
for (const char **query= query_list; query < query_list + query_count;
++query)
{
rc= mysql_query(mysql, *query);
if (rc)
{
fprintf(stderr,
"fill_tables failed: query is\n"
"%s,\n"
"error: %s\n", *query, mysql_error(mysql));
exit(1);
}
}
}
/*
All state of fetch from one statement: statement handle, out buffers,
fetch position.
See fetch_n for for the only use case.
*/
struct Stmt_fetch
{
enum { MAX_COLUMN_LENGTH= 255 };
Stmt_fetch() {}
~Stmt_fetch();
void init(unsigned stmt_no_arg, const char *query_arg);
int fetch_row();
const char *query;
unsigned stmt_no;
MYSQL_STMT *stmt;
bool is_open;
MYSQL_BIND *bind_array;
char **out_data;
unsigned long *out_data_length;
unsigned column_count;
unsigned row_count;
};
/*
Create statement handle, prepare it with statement, execute and allocate
fetch buffers.
*/
void Stmt_fetch::init(unsigned stmt_no_arg, const char *query_arg)
{
unsigned long type= CURSOR_TYPE_READ_ONLY;
int rc;
unsigned i;
MYSQL_RES *metadata;
/* Save query and statement number for error messages */
stmt_no= stmt_no_arg;
query= query_arg;
stmt= mysql_stmt_init(mysql);
rc= mysql_stmt_prepare(stmt, query, strlen(query));
if (rc)
{
fprintf(stderr,
"mysql_stmt_prepare of stmt %d failed:\n"
"query: %s\n"
"error: %s\n",
stmt_no, query, mysql_stmt_error(stmt));
exit(1);
}
/*
The attribute is sent to server on execute and asks to open read-only
for result set
*/
mysql_stmt_attr_set(stmt, STMT_ATTR_CURSOR_TYPE, (const void *) &type);
rc= mysql_stmt_execute(stmt);
if (rc)
{
fprintf(stderr,
"mysql_stmt_execute of stmt %d failed:\n"
"query: %s\n"
"error: %s\n",
stmt_no, query, mysql_stmt_error(stmt));
exit(1);
}
/* Find out total number of columns in result set */
metadata= mysql_stmt_result_metadata(stmt);
column_count= mysql_num_fields(metadata);
mysql_free_result(metadata);
/*
Now allocate bind handles and buffers for output data:
calloc memory to reduce number of MYSQL_BIND members we need to
set up.
*/
bind_array= (MYSQL_BIND *) calloc(1, sizeof(MYSQL_BIND) * column_count);
out_data= (char **) calloc(1, sizeof(*out_data) * column_count);
out_data_length= (unsigned long *) calloc(1,
sizeof(*out_data_length) * column_count);
for (i= 0; i < column_count; ++i)
{
out_data[i]= (char *) calloc(1, MAX_COLUMN_LENGTH);
bind_array[i].buffer_type= MYSQL_TYPE_STRING;
bind_array[i].buffer= out_data[i];
bind_array[i].buffer_length= MAX_COLUMN_LENGTH;
bind_array[i].length= out_data_length + i;
}
mysql_stmt_bind_result(stmt, bind_array);
row_count= 0;
is_open= true;
/* Ready for reading rows */
}
/* Fetch and print one row from cursor */
int Stmt_fetch::fetch_row()
{
int rc;
unsigned i;
if ((rc= mysql_stmt_fetch(stmt)) == 0)
{
++row_count;
printf("Stmt %d fetched row %d:\n", stmt_no, row_count);
for (i= 0; i < column_count; ++i)
{
out_data[i][out_data_length[i]]= '\0';
printf("column %d: %s\n", i+1, out_data[i]);
}
}
else
is_open= false;
return rc;
}
Stmt_fetch::~Stmt_fetch()
{
unsigned i;
for (i= 0; i < column_count; ++i)
free(out_data[i]);
free(out_data);
free(bind_array);
mysql_stmt_close(stmt);
}
/* We need these to compile without libstdc++ */
void *operator new[] (size_t sz)
{
return (void *) malloc (sz ? sz : 1);
}
void operator delete[] (void *ptr) throw ()
{
if (ptr)
free(ptr);
}
/*
For given array of queries, open query_count cursors and fetch
from them in simultaneous manner.
In case there was an error in one of the cursors, continue
reading from the rest.
*/
bool fetch_n(const char **query_list, unsigned query_count)
{
unsigned open_statements= query_count;
unsigned i;
int rc, error_count= 0;
Stmt_fetch *stmt_array= new Stmt_fetch[query_count];
Stmt_fetch *stmt;
for (i= 0; i < query_count; ++i)
{
/* Init will exit(1) in case of error */
stmt_array[i].init(i, query_list[i]);
}
while (open_statements)
{
for (stmt= stmt_array; stmt < stmt_array + query_count; ++stmt)
{
if (stmt->is_open && (rc= stmt->fetch_row()))
{
--open_statements;
/*
We try to fetch from the rest of the statements in case of
error
*/
if (rc != MYSQL_NO_DATA)
{
fprintf(stderr,
"Got error reading rows from statement %d,\n"
"query is: %s,\n"
"error message: %s", stmt - stmt_array, stmt->query,
mysql_stmt_error(stmt->stmt));
++error_count;
}
}
}
}
if (error_count)
fprintf(stderr, "Fetch FAILED");
else
{
unsigned total_row_count= 0;
for (stmt= stmt_array; stmt < stmt_array + query_count; ++stmt)
total_row_count+= stmt->row_count;
printf("Success, total rows fetched: %d\n", total_row_count);
}
delete [] stmt_array;
return error_count != 0;
}
/* Separate thread query to test some cases */
static my_bool thread_query(char *query)
@ -10043,7 +10277,7 @@ static void test_view()
int rc, i;
MYSQL_BIND bind[1];
char str_data[50];
long length = 0L;
ulong length = 0L;
long is_null = 0L;
const char *query=
"SELECT COUNT(*) FROM v1 WHERE `SERVERNAME`=?";
@ -10141,7 +10375,7 @@ static void test_view_2where()
int rc, i;
MYSQL_BIND bind[8];
char parms[8][100];
long length[8];
ulong length[8];
const char *query= "SELECT `RELID` ,`REPORT` ,`HANDLE` ,`LOG_GROUP` ,`USERNAME` ,`VARIANT` ,`TYPE` ,`VERSION` ,`ERFDAT` ,`ERFTIME` ,`ERFNAME` ,`AEDAT` ,`AETIME` ,`AENAME` ,`DEPENDVARS` ,`INACTIVE` FROM `V_LTDX` WHERE `MANDT` = ? AND `RELID` = ? AND `REPORT` = ? AND `HANDLE` = ? AND `LOG_GROUP` = ? AND `USERNAME` IN ( ? , ? ) AND `TYPE` = ?";
myheader("test_view_2where");
@ -10189,7 +10423,7 @@ static void test_view_star()
int rc, i;
MYSQL_BIND bind[8];
char parms[8][100];
long length[8];
ulong length[8];
const char *query= "SELECT * FROM vt1 WHERE a IN (?,?)";
myheader("test_view_star");
@ -10338,7 +10572,7 @@ static void test_view_insert_fields()
{
MYSQL_STMT *stmt;
char parm[11][1000];
long l[11];
ulong l[11];
int rc, i;
MYSQL_BIND bind[11];
const char *query= "INSERT INTO `v1` ( `K1C4` ,`K2C4` ,`K3C4` ,`K4N4` ,`F1C4` ,`F2I4` ,`F3N5` ,`F7F8` ,`F6N4` ,`F5C8` ,`F9D8` ) VALUES( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? )";
@ -10390,6 +10624,61 @@ static void test_view_insert_fields()
}
static void test_basic_cursors()
{
myheader("test_basic_cursors");
const char *basic_tables[]=
{
"DROP TABLE IF EXISTS t1, t2",
"CREATE TABLE t1 "
"(id INTEGER NOT NULL PRIMARY KEY, "
" name VARCHAR(20) NOT NULL)",
"INSERT INTO t1 (id, name) VALUES "
" (2, 'Ja'), (3, 'Ede'), "
" (4, 'Haag'), (5, 'Kabul'), "
" (6, 'Almere'), (7, 'Utrecht'), "
" (8, 'Qandahar'), (9, 'Amsterdam'), "
" (10, 'Amersfoort'), (11, 'Constantine')",
"CREATE TABLE t2 "
"(id INTEGER NOT NULL PRIMARY KEY, "
" name VARCHAR(20) NOT NULL)",
"INSERT INTO t2 (id, name) VALUES "
" (4, 'Guam'), (5, 'Aruba'), "
" (6, 'Angola'), (7, 'Albania'), "
" (8, 'Anguilla'), (9, 'Argentina'), "
" (10, 'Azerbaijan'), (11, 'Afghanistan'), "
" (12, 'Burkina Faso'), (13, 'Faroe Islands')"
};
fill_tables(basic_tables, sizeof(basic_tables)/sizeof(*basic_tables));
const char *queries[]=
{
"SELECT * FROM t1",
"SELECT * FROM t2"
};
fetch_n(queries, sizeof(queries)/sizeof(*queries));
}
static void test_cursors_with_union()
{
myheader("test_cursors_with_union");
const char *queries[]=
{
"SELECT t1.name FROM t1 UNION SELECT t2.name FROM t2",
"SELECT t1.id FROM t1 WHERE t1.id < 5"
};
fetch_n(queries, sizeof(queries)/sizeof(*queries));
}
/*
Read and parse arguments and MySQL options from my.cnf
*/
@ -10694,6 +10983,8 @@ int main(int argc, char **argv)
test_view_insert(); /* inserting in VIEW without field list */
test_left_join_view(); /* left join on VIEW with WHERE condition */
test_view_insert_fields(); /* insert into VIOEW with fields list */
test_basic_cursors();
test_cursors_with_union();
/*
XXX: PLEASE RUN THIS PROGRAM UNDER VALGRIND AND VERIFY THAT YOUR TEST
DOESN'T CONTAIN WARNINGS/ERRORS BEFORE YOU PUSH.