MDEV-17096 Pushdown of simple derived tables to storage engines

MDEV-17631 select_handler for a full query pushdown

Interfaces + Proof of Concept for federatedx with test cases.

The interfaces have been developed for integration of ColumnStore engine.
This commit is contained in:
Igor Babaev 2018-10-09 02:36:09 -07:00
parent 171fbbb968
commit 16327fc2e7
21 changed files with 1279 additions and 20 deletions

View file

@ -77,7 +77,8 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/debug_sync.cc ../sql/opt_table_elimination.cc ../sql/debug_sync.cc ../sql/opt_table_elimination.cc
../sql/sql_prepare.cc ../sql/sql_rename.cc ../sql/sql_repl.cc ../sql/sql_prepare.cc ../sql/sql_rename.cc ../sql/sql_repl.cc
../sql/sql_select.cc ../sql/sql_servers.cc ../sql/sql_select.cc ../sql/sql_servers.cc
../sql/group_by_handler.cc ../sql/group_by_handler.cc ../sql/derived_handler.cc
../sql/select_handler.cc
../sql/sql_show.cc ../sql/sql_state.c ../sql/sql_show.cc ../sql/sql_state.c
../sql/sql_statistics.cc ../sql/sql_string.cc ../sql/sql_statistics.cc ../sql/sql_string.cc
../sql/sql_tablespace.cc ../sql/sql_table.cc ../sql/sql_test.cc ../sql/sql_tablespace.cc ../sql/sql_table.cc ../sql/sql_test.cc

View file

@ -0,0 +1,202 @@
connect master,127.0.0.1,root,,test,$MASTER_MYPORT,;
connect slave,127.0.0.1,root,,test,$SLAVE_MYPORT,;
connection master;
CREATE DATABASE federated;
connection slave;
CREATE DATABASE federated;
connection slave;
DROP TABLE IF EXISTS federated.t1;
Warnings:
Note 1051 Unknown table 'federated.t1'
CREATE TABLE federated.t1 (
id int(20) NOT NULL,
name varchar(16) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
INSERT INTO federated.t1 VALUES
(3,'xxx'), (7,'yyy'), (4,'xxx'), (1,'zzz'), (5,'yyy');
DROP TABLE IF EXISTS federated.t2;
Warnings:
Note 1051 Unknown table 'federated.t2'
CREATE TABLE federated.t2 (
name varchar(16) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
INSERT INTO federated.t2 VALUES
('yyy'), ('www'), ('yyy'), ('xxx'), ('www'), ('yyy'), ('www');
connection master;
DROP TABLE IF EXISTS federated.t1;
Warnings:
Note 1051 Unknown table 'federated.t1'
CREATE TABLE federated.t1 (
id int(20) NOT NULL,
name varchar(16) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1';
DROP TABLE IF EXISTS federated.t2;
Warnings:
Note 1051 Unknown table 'federated.t2'
CREATE TABLE federated.t2 (
name varchar(16) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/federated/t2';
SELECT * FROM federated.t1;
id name
3 xxx
7 yyy
4 xxx
1 zzz
5 yyy
SELECT id FROM federated.t1 WHERE id < 5;
id
3
4
1
SELECT count(*), name FROM federated.t1 WHERE id < 5 GROUP BY name;
count(*) name
2 xxx
1 zzz
SELECT * FROM federated.t1, federated.t2
WHERE federated.t1.name = federated.t2.name;
id name name
7 yyy yyy
5 yyy yyy
7 yyy yyy
5 yyy yyy
3 xxx xxx
4 xxx xxx
7 yyy yyy
5 yyy yyy
SELECT * FROM federated.t1 LEFT JOIN federated.t2
ON federated.t1.name = federated.t2.name
WHERE federated.t1.id > 1;
id name name
7 yyy yyy
5 yyy yyy
7 yyy yyy
5 yyy yyy
3 xxx xxx
4 xxx xxx
7 yyy yyy
5 yyy yyy
SELECT * FROM federated.t1
WHERE id IN (SELECT count(*) FROM federated.t2 GROUP BY name);
id name
3 xxx
1 zzz
EXPLAIN
SELECT id FROM federated.t1 WHERE id < 5;
id select_type table type possible_keys key key_len ref rows Extra
1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL
EXPLAIN EXTENDED
SELECT id FROM federated.t1 WHERE id < 5;
id select_type table type possible_keys key key_len ref rows filtered Extra
1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL NULL
Warnings:
Note 1003 select `federated`.`t1`.`id` AS `id` from `federated`.`t1` where `federated`.`t1`.`id` < 5
EXPLAIN FORMAT=JSON
SELECT id FROM federated.t1 WHERE id < 5;
EXPLAIN
{
"query_block": {
"select_id": 1,
"table": {
"message": "Pushed select"
}
}
}
ANALYZE
SELECT id FROM federated.t1 WHERE id < 5;
id select_type table type possible_keys key key_len ref rows r_rows filtered r_filtered Extra
1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
ANALYZE FORMAT=JSON
SELECT id FROM federated.t1 WHERE id < 5;
ANALYZE
{
"query_block": {
"select_id": 1,
"table": {
"message": "Pushed select"
}
}
}
CREATE TABLE federated.t3 (
name varchar(16) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
INSERT INTO federated.t3 VALUES
('yyy'), ('www'), ('yyy'), ('xxx'), ('www'), ('yyy'), ('www');
SELECT *
FROM federated.t3, (SELECT * FROM federated.t1 WHERE id > 3) t
WHERE federated.t3.name=t.name;
name id name
yyy 5 yyy
yyy 7 yyy
yyy 5 yyy
yyy 7 yyy
xxx 4 xxx
yyy 5 yyy
yyy 7 yyy
EXPLAIN
SELECT *
FROM federated.t3, (SELECT * FROM federated.t1 WHERE id > 3) t
WHERE federated.t3.name=t.name;
id select_type table type possible_keys key key_len ref rows Extra
1 PRIMARY t3 ALL NULL NULL NULL NULL 7
1 PRIMARY <derived2> ref key0 key0 18 federated.t3.name 2
2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL
EXPLAIN FORMAT=JSON
SELECT *
FROM federated.t3, (SELECT * FROM federated.t1 WHERE id > 3) t
WHERE federated.t3.name=t.name;
EXPLAIN
{
"query_block": {
"select_id": 1,
"table": {
"table_name": "t3",
"access_type": "ALL",
"rows": 7,
"filtered": 100
},
"table": {
"table_name": "<derived2>",
"access_type": "ref",
"possible_keys": ["key0"],
"key": "key0",
"key_length": "18",
"used_key_parts": ["name"],
"ref": ["federated.t3.name"],
"rows": 2,
"filtered": 100,
"materialized": {
"query_block": {
"select_id": 2,
"table": {
"message": "Pushed derived"
}
}
}
}
}
}
ANALYZE
SELECT *
FROM federated.t3, (SELECT * FROM federated.t1 WHERE id > 3) t
WHERE federated.t3.name=t.name;
id select_type table type possible_keys key key_len ref rows r_rows filtered r_filtered Extra
1 PRIMARY t3 ALL NULL NULL NULL NULL 7 7.00 100.00 100.00
1 PRIMARY <derived2> ref key0 key0 18 federated.t3.name 2 0.00 100.00 100.00
2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
DROP TABLE federated.t1, federated.t2;
connection slave;
DROP TABLE federated.t1, federated.t2;
connection default;
connection master;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
connection slave;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;

View file

@ -0,0 +1,116 @@
--source have_federatedx.inc
--source include/federated.inc
connection slave;
DROP TABLE IF EXISTS federated.t1;
CREATE TABLE federated.t1 (
id int(20) NOT NULL,
name varchar(16) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
INSERT INTO federated.t1 VALUES
(3,'xxx'), (7,'yyy'), (4,'xxx'), (1,'zzz'), (5,'yyy');
DROP TABLE IF EXISTS federated.t2;
CREATE TABLE federated.t2 (
name varchar(16) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
INSERT INTO federated.t2 VALUES
('yyy'), ('www'), ('yyy'), ('xxx'), ('www'), ('yyy'), ('www');
connection master;
DROP TABLE IF EXISTS federated.t1;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval
CREATE TABLE federated.t1 (
id int(20) NOT NULL,
name varchar(16) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1';
DROP TABLE IF EXISTS federated.t2;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval
CREATE TABLE federated.t2 (
name varchar(16) NOT NULL default ''
)
ENGINE="FEDERATED" DEFAULT CHARSET=latin1
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t2';
SELECT * FROM federated.t1;
SELECT id FROM federated.t1 WHERE id < 5;
SELECT count(*), name FROM federated.t1 WHERE id < 5 GROUP BY name;
SELECT * FROM federated.t1, federated.t2
WHERE federated.t1.name = federated.t2.name;
SELECT * FROM federated.t1 LEFT JOIN federated.t2
ON federated.t1.name = federated.t2.name
WHERE federated.t1.id > 1;
SELECT * FROM federated.t1
WHERE id IN (SELECT count(*) FROM federated.t2 GROUP BY name);
EXPLAIN
SELECT id FROM federated.t1 WHERE id < 5;
EXPLAIN EXTENDED
SELECT id FROM federated.t1 WHERE id < 5;
EXPLAIN FORMAT=JSON
SELECT id FROM federated.t1 WHERE id < 5;
ANALYZE
SELECT id FROM federated.t1 WHERE id < 5;
ANALYZE FORMAT=JSON
SELECT id FROM federated.t1 WHERE id < 5;
CREATE TABLE federated.t3 (
name varchar(16) NOT NULL default ''
)
DEFAULT CHARSET=latin1;
INSERT INTO federated.t3 VALUES
('yyy'), ('www'), ('yyy'), ('xxx'), ('www'), ('yyy'), ('www');
SELECT *
FROM federated.t3, (SELECT * FROM federated.t1 WHERE id > 3) t
WHERE federated.t3.name=t.name;
EXPLAIN
SELECT *
FROM federated.t3, (SELECT * FROM federated.t1 WHERE id > 3) t
WHERE federated.t3.name=t.name;
EXPLAIN FORMAT=JSON
SELECT *
FROM federated.t3, (SELECT * FROM federated.t1 WHERE id > 3) t
WHERE federated.t3.name=t.name;
ANALYZE
SELECT *
FROM federated.t3, (SELECT * FROM federated.t1 WHERE id > 3) t
WHERE federated.t3.name=t.name;
DROP TABLE federated.t1, federated.t2;
connection slave;
DROP TABLE federated.t1, federated.t2;
connection default;
source include/federated_cleanup.inc;

View file

@ -96,7 +96,7 @@ SET (SQL_SOURCE
sql_partition.cc sql_plugin.cc sql_prepare.cc sql_rename.cc sql_partition.cc sql_plugin.cc sql_prepare.cc sql_rename.cc
debug_sync.cc debug_sync.cc
sql_repl.cc sql_select.cc sql_show.cc sql_state.c sql_repl.cc sql_select.cc sql_show.cc sql_state.c
group_by_handler.cc group_by_handler.cc derived_handler.cc select_handler.cc
sql_statistics.cc sql_string.cc lex_string.h sql_statistics.cc sql_string.cc lex_string.h
sql_table.cc sql_test.cc sql_trigger.cc sql_udf.cc sql_union.cc sql_table.cc sql_test.cc sql_trigger.cc sql_udf.cc sql_union.cc
sql_update.cc sql_view.cc strfunc.cc table.cc thr_malloc.cc sql_update.cc sql_view.cc strfunc.cc table.cc thr_malloc.cc

84
sql/derived_handler.cc Normal file
View file

@ -0,0 +1,84 @@
#include "mariadb.h"
#include "sql_priv.h"
#include "sql_select.h"
#include "derived_handler.h"
void derived_handler::set_derived(TABLE_LIST *tbl)
{
derived= tbl;
table= tbl->table;
unit= tbl->derived;
select= unit->first_select();
tmp_table_param= select->next_select() ?
((select_unit *)(unit->result))->get_tmp_table_param() :
&select->join->tmp_table_param;
}
Pushdown_derived::Pushdown_derived(TABLE_LIST *tbl, derived_handler *h)
: derived(tbl), handler(h)
{
is_analyze= handler->thd->lex->analyze_stmt;
}
Pushdown_derived::~Pushdown_derived()
{
delete handler;
}
int Pushdown_derived::execute()
{
int err;
THD *thd= handler->thd;
TABLE *table= handler->table;
TMP_TABLE_PARAM *tmp_table_param= handler->tmp_table_param;
DBUG_ENTER("Pushdown_query::execute");
if ((err= handler->init_scan()))
goto error;
if (is_analyze)
{
handler->end_scan();
DBUG_RETURN(0);
}
while (!(err= handler->next_row()))
{
if (unlikely(thd->check_killed()))
{
handler->end_scan();
DBUG_RETURN(-1);
}
if ((err= table->file->ha_write_tmp_row(table->record[0])))
{
bool is_duplicate;
if (likely(!table->file->is_fatal_error(err, HA_CHECK_DUP)))
continue; // Distinct elimination
if (create_internal_tmp_table_from_heap(thd, table,
tmp_table_param->start_recinfo,
&tmp_table_param->recinfo,
err, 1, &is_duplicate))
DBUG_RETURN(1);
if (is_duplicate)
continue;
}
}
if (err != 0 && err != HA_ERR_END_OF_FILE)
goto error;
if ((err= handler->end_scan()))
goto error_2;
DBUG_RETURN(0);
error:
handler->end_scan();
error_2:
handler->print_error(err, MYF(0));
DBUG_RETURN(-1); // Error not sent to client
}

61
sql/derived_handler.h Normal file
View file

@ -0,0 +1,61 @@
#ifndef DERIVED_HANDLER_INCLUDED
#define DERIVED_HANDLER_INCLUDED
#include "mariadb.h"
#include "sql_priv.h"
class TMP_TABLE_PARAM;
typedef class st_select_lex_unit SELECT_LEX_UNIT;
class derived_handler
{
public:
THD *thd;
handlerton *ht;
TABLE_LIST *derived;
/*
Temporary table where all results should be stored in record[0]
The table has a field for every item from the select list of
the specification of derived.
*/
TABLE *table;
TMP_TABLE_PARAM *tmp_table_param;
SELECT_LEX_UNIT *unit;
SELECT_LEX *select;
derived_handler(THD *thd_arg, handlerton *ht_arg)
: thd(thd_arg), ht(ht_arg), derived(0),table(0), tmp_table_param(0),
unit(0), select(0) {}
virtual ~derived_handler() {}
/*
Functions to scan data. All these returns 0 if ok, error code in case
of error
*/
/* Initialize the process of producing rows of the derived table */
virtual int init_scan()= 0;
/*
Put the next produced row of the derived in table->record[0] and return 0.
Return HA_ERR_END_OF_FILE if there are no more rows, return other error
number in case of fatal error.
*/
virtual int next_row()= 0;
/* End prodicing rows */
virtual int end_scan()=0;
/* Report errors */
virtual void print_error(int error, myf errflag)=0;
void set_derived(TABLE_LIST *tbl);
};
#endif /* DERIVED_HANDLER_INCLUDED */

View file

@ -1183,6 +1183,8 @@ struct handler_iterator {
class handler; class handler;
class group_by_handler; class group_by_handler;
class derived_handler;
class select_handler;
struct Query; struct Query;
typedef class st_select_lex SELECT_LEX; typedef class st_select_lex SELECT_LEX;
typedef struct st_order ORDER; typedef struct st_order ORDER;
@ -1502,6 +1504,21 @@ struct handlerton
*/ */
group_by_handler *(*create_group_by)(THD *thd, Query *query); group_by_handler *(*create_group_by)(THD *thd, Query *query);
/*
Create and return a derived_handler if the storage engine can execute
the derived table 'derived', otherwise return NULL.
In a general case 'derived' may contain tables not from the engine.
If the engine cannot handle or does not want to handle such pushed derived
the function create_group_by has to return NULL.
*/
derived_handler *(*create_derived)(THD *thd, TABLE_LIST *derived);
/*
Create and return a select_handler if the storage engine can execute
the select statement 'select, otherwise return NULL
*/
select_handler *(*create_select) (THD *thd, SELECT_LEX *select);
/********************************************************************* /*********************************************************************
Table discovery API. Table discovery API.
It allows the server to "discover" tables that exist in the storage It allows the server to "discover" tables that exist in the storage

145
sql/select_handler.cc Normal file
View file

@ -0,0 +1,145 @@
#include "mariadb.h"
#include "sql_priv.h"
#include "sql_select.h"
#include "select_handler.h"
Pushdown_select::Pushdown_select(SELECT_LEX *sel, select_handler *h)
: select(sel), handler(h)
{
is_analyze= handler->thd->lex->analyze_stmt;
}
Pushdown_select::~Pushdown_select()
{
delete handler;
select->select_h= NULL;
}
bool Pushdown_select::init()
{
List<Item> types;
TMP_TABLE_PARAM tmp_table_param;
THD *thd= handler->thd;
DBUG_ENTER("Pushdown_select::init");
if (select->master_unit()->join_union_item_types(thd, types, 1))
DBUG_RETURN(true);
tmp_table_param.init();
tmp_table_param.field_count= types.elements;
handler->table= create_tmp_table(thd, &tmp_table_param, types,
(ORDER *) 0, false, 0,
TMP_TABLE_ALL_COLUMNS, 1,
&empty_clex_str, true, false);
if (!handler->table)
DBUG_RETURN(true);
if (handler->table->fill_item_list(&result_columns))
DBUG_RETURN(true);
DBUG_RETURN(false);
}
bool Pushdown_select::send_result_set_metadata()
{
THD *thd= handler->thd;
Protocol *protocol= thd->protocol;
DBUG_ENTER("Pushdown_select::send_result_set_metadata");
#ifdef WITH_WSREP
if (WSREP(thd) && thd->wsrep_retry_query)
{
WSREP_DEBUG("skipping select metadata");
DBUG_RETURN(false);
}
#endif /* WITH_WSREP */
if (protocol->send_result_set_metadata(&result_columns,
Protocol::SEND_NUM_ROWS |
Protocol::SEND_EOF))
DBUG_RETURN(true);
DBUG_RETURN(false);
}
bool Pushdown_select::send_data()
{
THD *thd= handler->thd;
Protocol *protocol= thd->protocol;
DBUG_ENTER("Pushdown_select::send_data");
if (thd->killed == ABORT_QUERY)
DBUG_RETURN(false);
protocol->prepare_for_resend();
if (protocol->send_result_set_row(&result_columns))
{
protocol->remove_last_row();
DBUG_RETURN(true);
}
thd->inc_sent_row_count(1);
if (thd->vio_ok())
DBUG_RETURN(protocol->write());
DBUG_RETURN(false);
}
bool Pushdown_select::send_eof()
{
THD *thd= handler->thd;
DBUG_ENTER("Pushdown_select::send_eof");
/*
Don't send EOF if we're in error condition (which implies we've already
sent or are sending an error)
*/
if (thd->is_error())
DBUG_RETURN(true);
::my_eof(thd);
DBUG_RETURN(false);
}
int Pushdown_select::execute()
{
int err;
THD *thd= handler->thd;
DBUG_ENTER("Pushdown_select::execute");
if ((err= handler->init_scan()))
goto error;
if (is_analyze)
{
handler->end_scan();
DBUG_RETURN(0);
}
if (send_result_set_metadata())
DBUG_RETURN(-1);
while (!(err= handler->next_row()))
{
if (thd->check_killed() || send_data())
{
handler->end_scan();
DBUG_RETURN(-1);
}
}
if (err != 0 && err != HA_ERR_END_OF_FILE)
goto error;
if ((err= handler->end_scan()))
goto error_2;
if (send_eof())
DBUG_RETURN(-1);
DBUG_RETURN(0);
error:
handler->end_scan();
error_2:
handler->print_error(err, MYF(0));
DBUG_RETURN(-1); // Error not sent to client
}

48
sql/select_handler.h Normal file
View file

@ -0,0 +1,48 @@
#ifndef SELECT_HANDLER_INCLUDED
#define SELECT_HANDLER_INCLUDED
#include "mariadb.h"
#include "sql_priv.h"
class select_handler
{
public:
THD *thd;
handlerton *ht;
SELECT_LEX *select;
/*
Temporary table where all results should be stored in record[0]
The table has a field for every item from the select_lex::item_list.
*/
TABLE *table;
select_handler(THD *thd_arg, handlerton *ht_arg)
: thd(thd_arg), ht(ht_arg), table(0) {}
virtual ~select_handler() {}
/*
Functions to scan the select result set.
All these returns 0 if ok, error code in case of error.
*/
/* Initialize the process of producing rows of result set */
virtual int init_scan() = 0;
/*
Put the next produced row of the result set in table->record[0]
and return 0. Return HA_ERR_END_OF_FILE if there are no more rows,
return other error number in case of fatal error.
*/
virtual int next_row() = 0;
/* Finish scanning */
virtual int end_scan() = 0;
/* Report errors */
virtual void print_error(int error, myf errflag) = 0;
};
#endif /* SELECT_HANDLER_INCLUDED */

View file

@ -27,6 +27,7 @@
#include "unireg.h" #include "unireg.h"
#include "sql_derived.h" #include "sql_derived.h"
#include "sql_select.h" #include "sql_select.h"
#include "derived_handler.h"
#include "sql_base.h" #include "sql_base.h"
#include "sql_view.h" // check_duplicate_names #include "sql_view.h" // check_duplicate_names
#include "sql_acl.h" // SELECT_ACL #include "sql_acl.h" // SELECT_ACL
@ -384,9 +385,16 @@ bool mysql_derived_merge(THD *thd, LEX *lex, TABLE_LIST *derived)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
if (thd->lex->sql_command == SQLCOM_UPDATE_MULTI || if ((derived->dt_handler= derived->find_derived_handler(thd)))
thd->lex->sql_command == SQLCOM_DELETE_MULTI) {
thd->save_prep_leaf_list= TRUE; derived->change_refs_to_fields();
derived->set_materialized_derived();
DBUG_RETURN(FALSE);
}
if (thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI)
thd->save_prep_leaf_list= TRUE;
arena= thd->activate_stmt_arena_if_needed(&backup); // For easier test arena= thd->activate_stmt_arena_if_needed(&backup); // For easier test
@ -904,6 +912,19 @@ bool mysql_derived_optimize(THD *thd, LEX *lex, TABLE_LIST *derived)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
if (derived->is_materialized_derived() && !derived->dt_handler)
derived->dt_handler= derived->find_derived_handler(thd);
if (derived->dt_handler)
{
if (!(derived->pushdown_derived=
new (thd->mem_root) Pushdown_derived(derived, derived->dt_handler)))
{
delete derived->dt_handler;
derived->dt_handler= NULL;
DBUG_RETURN(1);
}
}
lex->current_select= first_select; lex->current_select= first_select;
if (unit->is_unit_op()) if (unit->is_unit_op())
@ -1108,6 +1129,17 @@ bool mysql_derived_fill(THD *thd, LEX *lex, TABLE_LIST *derived)
select_unit *derived_result= derived->derived_result; select_unit *derived_result= derived->derived_result;
SELECT_LEX *save_current_select= lex->current_select; SELECT_LEX *save_current_select= lex->current_select;
if (derived->pushdown_derived)
{
int res;
if (unit->executed)
DBUG_RETURN(FALSE);
res= derived->pushdown_derived->execute();
unit->executed= true;
delete derived->pushdown_derived;
DBUG_RETURN(res);
}
if (unit->executed && !derived_is_recursive && if (unit->executed && !derived_is_recursive &&
(unit->uncacheable & UNCACHEABLE_DEPENDENT)) (unit->uncacheable & UNCACHEABLE_DEPENDENT))
{ {
@ -1404,3 +1436,47 @@ bool pushdown_cond_for_derived(THD *thd, Item *cond, TABLE_LIST *derived)
thd->lex->current_select= save_curr_select; thd->lex->current_select= save_curr_select;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
derived_handler *TABLE_LIST::find_derived_handler(THD *thd)
{
if (!derived || is_recursive_with_table())
return 0;
for (SELECT_LEX *sl= derived->first_select(); sl; sl= sl->next_select())
{
if (!(sl->join))
continue;
for (TABLE_LIST *tbl= sl->join->tables_list; tbl; tbl= tbl->next_local)
{
if (!tbl->table)
continue;
handlerton *ht= tbl->table->file->partition_ht();
if (!ht->create_derived)
continue;
derived_handler *dh= ht->create_derived(thd, this);
if (dh)
{
dh->set_derived(this);
return dh;
}
}
}
return 0;
}
TABLE_LIST *TABLE_LIST::get_first_table()
{
for (SELECT_LEX *sl= derived->first_select(); sl; sl= sl->next_select())
{
if (!(sl->join))
continue;
for (TABLE_LIST *tbl= sl->join->tables_list; tbl; tbl= tbl->next_local)
{
if (!tbl->table)
continue;
return tbl;
}
}
return 0;
}

View file

@ -34,6 +34,9 @@ const char *unit_operation_text[4]=
"UNIT RESULT","UNION RESULT","INTERSECT RESULT","EXCEPT RESULT" "UNIT RESULT","UNION RESULT","INTERSECT RESULT","EXCEPT RESULT"
}; };
const char *pushed_derived_text= "PUSHED DERIVED";
const char *pushed_select_text= "PUSHED SELECT";
static void write_item(Json_writer *writer, Item *item); static void write_item(Json_writer *writer, Item *item);
static void append_item_to_str(String *out, Item *item); static void append_item_to_str(String *out, Item *item);
@ -334,6 +337,9 @@ int print_explain_row(select_result_sink *result,
List<Item> item_list; List<Item> item_list;
Item *item; Item *item;
if (!select_type[0])
return 0;
item_list.push_back(new (mem_root) Item_int(thd, (int32) select_number), item_list.push_back(new (mem_root) Item_int(thd, (int32) select_number),
mem_root); mem_root);
item_list.push_back(new (mem_root) Item_string_sys(thd, select_type), item_list.push_back(new (mem_root) Item_string_sys(thd, select_type),
@ -746,7 +752,15 @@ int Explain_select::print_explain(Explain_query *query,
THD *thd= output->thd; THD *thd= output->thd;
MEM_ROOT *mem_root= thd->mem_root; MEM_ROOT *mem_root= thd->mem_root;
if (message) if (select_type == pushed_derived_text || select_type == pushed_select_text)
{
print_explain_message_line(output, explain_flags, is_analyze,
select_id /*select number*/,
select_type,
NULL, /* rows */
NULL);
}
else if (message)
{ {
List<Item> item_list; List<Item> item_list;
Item *item_null= new (mem_root) Item_null(thd); Item *item_null= new (mem_root) Item_null(thd);
@ -869,14 +883,20 @@ void Explain_select::print_explain_json(Explain_query *query,
bool started_cache= print_explain_json_cache(writer, is_analyze); bool started_cache= print_explain_json_cache(writer, is_analyze);
if (message) if (message ||
select_type == pushed_derived_text ||
select_type == pushed_select_text)
{ {
writer->add_member("query_block").start_object(); writer->add_member("query_block").start_object();
writer->add_member("select_id").add_ll(select_id); writer->add_member("select_id").add_ll(select_id);
add_linkage(writer); add_linkage(writer);
writer->add_member("table").start_object(); writer->add_member("table").start_object();
writer->add_member("message").add_str(message); writer->add_member("message").add_str(select_type == pushed_derived_text ?
"Pushed derived" :
select_type == pushed_select_text ?
"Pushed select" :
message);
writer->end_object(); writer->end_object();
print_explain_json_for_children(query, writer, is_analyze); print_explain_json_for_children(query, writer, is_analyze);
@ -1205,7 +1225,7 @@ int Explain_table_access::print_explain(select_result_sink *output, uint8 explai
{ {
THD *thd= output->thd; THD *thd= output->thd;
MEM_ROOT *mem_root= thd->mem_root; MEM_ROOT *mem_root= thd->mem_root;
List<Item> item_list; List<Item> item_list;
Item *item_null= new (mem_root) Item_null(thd); Item *item_null= new (mem_root) Item_null(thd);

View file

@ -328,6 +328,8 @@ public:
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
extern const char *unit_operation_text[4]; extern const char *unit_operation_text[4];
extern const char *pushed_derived_text;
extern const char *pushed_select_text;
/* /*
Explain structure for a UNION. Explain structure for a UNION.

View file

@ -2354,6 +2354,7 @@ void st_select_lex::init_query()
tvc= 0; tvc= 0;
in_tvc= false; in_tvc= false;
versioned_tables= 0; versioned_tables= 0;
pushdown_select= 0;
} }
void st_select_lex::init_select() void st_select_lex::init_select()
@ -4650,7 +4651,10 @@ void st_select_lex::set_explain_type(bool on_the_fly)
if (master_unit()->thd->lex->first_select_lex() == this) if (master_unit()->thd->lex->first_select_lex() == this)
{ {
type= is_primary ? "PRIMARY" : "SIMPLE"; if (pushdown_select)
type= pushed_select_text;
else
type= is_primary ? "PRIMARY" : "SIMPLE";
} }
else else
{ {
@ -4659,7 +4663,11 @@ void st_select_lex::set_explain_type(bool on_the_fly)
/* If we're a direct child of a UNION, we're the first sibling there */ /* If we're a direct child of a UNION, we're the first sibling there */
if (linkage == DERIVED_TABLE_TYPE) if (linkage == DERIVED_TABLE_TYPE)
{ {
if (is_uncacheable & UNCACHEABLE_DEPENDENT) bool is_pushed_master_unit= master_unit()->derived &&
master_unit()->derived->pushdown_derived;
if (is_pushed_master_unit)
type= pushed_derived_text;
else if (is_uncacheable & UNCACHEABLE_DEPENDENT)
type= "LATERAL DERIVED"; type= "LATERAL DERIVED";
else else
type= "DERIVED"; type= "DERIVED";
@ -9458,3 +9466,4 @@ bool SELECT_LEX::make_unique_derived_name(THD *thd, LEX_CSTRING *alias)
alias->str= thd->strmake(buff, alias->length); alias->str= thd->strmake(buff, alias->length);
return !alias->str; return !alias->str;
} }

View file

@ -225,6 +225,8 @@ class Item_window_func;
struct sql_digest_state; struct sql_digest_state;
class With_clause; class With_clause;
class my_var; class my_var;
class select_handler;
class Pushdown_select;
#define ALLOC_ROOT_SET 1024 #define ALLOC_ROOT_SET 1024
@ -812,11 +814,12 @@ protected:
bool prepare_join(THD *thd, SELECT_LEX *sl, select_result *result, bool prepare_join(THD *thd, SELECT_LEX *sl, select_result *result,
ulong additional_options, ulong additional_options,
bool is_union_select); bool is_union_select);
bool join_union_item_types(THD *thd, List<Item> &types, uint count);
bool join_union_type_handlers(THD *thd, bool join_union_type_handlers(THD *thd,
class Type_holder *holders, uint count); class Type_holder *holders, uint count);
bool join_union_type_attributes(THD *thd, bool join_union_type_attributes(THD *thd,
class Type_holder *holders, uint count); class Type_holder *holders, uint count);
public:
bool join_union_item_types(THD *thd, List<Item> &types, uint count);
public: public:
// Ensures that at least all members used during cleanup() are initialized. // Ensures that at least all members used during cleanup() are initialized.
st_select_lex_unit() st_select_lex_unit()
@ -1240,6 +1243,9 @@ public:
table_value_constr *tvc; table_value_constr *tvc;
bool in_tvc; bool in_tvc;
select_handler *select_h;
Pushdown_select *pushdown_select;
/** System Versioning */ /** System Versioning */
public: public:
uint versioned_tables; uint versioned_tables;
@ -1471,6 +1477,8 @@ public:
Item_transformer transformer, Item_transformer transformer,
uchar *arg); uchar *arg);
select_handler *find_select_handler(THD *thd);
private: private:
bool m_non_agg_field_used; bool m_non_agg_field_used;
bool m_agg_func_used; bool m_agg_func_used;

View file

@ -64,6 +64,7 @@
#include "sys_vars_shared.h" #include "sys_vars_shared.h"
#include "sp_head.h" #include "sp_head.h"
#include "sp_rcontext.h" #include "sp_rcontext.h"
#include "select_handler.h"
/* /*
A key part number that means we're using a fulltext scan. A key part number that means we're using a fulltext scan.
@ -1437,7 +1438,13 @@ int JOIN::optimize()
{ {
int res= 0; int res= 0;
join_optimization_state init_state= optimization_state; join_optimization_state init_state= optimization_state;
if (optimization_state == JOIN::OPTIMIZATION_PHASE_1_DONE) if (select_lex->pushdown_select)
{
if (!(select_options & SELECT_DESCRIBE))
res= select_lex->pushdown_select->init();
with_two_phase_optimization= false;
}
else if (optimization_state == JOIN::OPTIMIZATION_PHASE_1_DONE)
res= optimize_stage2(); res= optimize_stage2();
else else
{ {
@ -3934,7 +3941,6 @@ void JOIN::exec_inner()
if (select_options & SELECT_DESCRIBE) if (select_options & SELECT_DESCRIBE)
select_describe(this, FALSE, FALSE, FALSE, select_describe(this, FALSE, FALSE, FALSE,
(zero_result_cause?zero_result_cause:"No tables used")); (zero_result_cause?zero_result_cause:"No tables used"));
else else
{ {
if (result->send_result_set_metadata(*columns_list, if (result->send_result_set_metadata(*columns_list,
@ -4033,7 +4039,8 @@ void JOIN::exec_inner()
not the case. not the case.
*/ */
if (exec_const_order_group_cond.elements && if (exec_const_order_group_cond.elements &&
!(select_options & SELECT_DESCRIBE)) !(select_options & SELECT_DESCRIBE) &&
!select_lex->pushdown_select)
{ {
List_iterator_fast<Item> const_item_it(exec_const_order_group_cond); List_iterator_fast<Item> const_item_it(exec_const_order_group_cond);
Item *cur_const_item; Item *cur_const_item;
@ -4060,6 +4067,11 @@ void JOIN::exec_inner()
!table_count ? "No tables used" : NullS); !table_count ? "No tables used" : NullS);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
else if (select_lex->pushdown_select)
{
error= select_lex->pushdown_select->execute();
DBUG_VOID_RETURN;
}
else else
{ {
/* it's a const select, materialize it. */ /* it's a const select, materialize it. */
@ -4271,6 +4283,19 @@ mysql_select(THD *thd,
} }
} }
select_lex->select_h= select_lex->find_select_handler(thd);
if (select_lex->select_h)
{
if (!(select_lex->pushdown_select=
new (thd->mem_root) Pushdown_select(select_lex,
select_lex->select_h)))
{
delete select_lex->select_h;
select_lex->select_h= NULL;
DBUG_RETURN(TRUE);
}
}
if ((err= join->optimize())) if ((err= join->optimize()))
{ {
goto err; // 1 goto err; // 1
@ -4292,8 +4317,15 @@ mysql_select(THD *thd,
select_lex->where= join->conds_history; select_lex->where= join->conds_history;
select_lex->having= join->having_history; select_lex->having= join->having_history;
} }
err: err:
if (select_lex->pushdown_select)
{
delete select_lex->pushdown_select;
select_lex->pushdown_select= NULL;
}
if (free_join) if (free_join)
{ {
THD_STAGE_INFO(thd, stage_end); THD_STAGE_INFO(thd, stage_end);
@ -25681,6 +25713,7 @@ bool mysql_explain_union(THD *thd, SELECT_LEX_UNIT *unit, select_result *result)
DBUG_ENTER("mysql_explain_union"); DBUG_ENTER("mysql_explain_union");
bool res= 0; bool res= 0;
SELECT_LEX *first= unit->first_select(); SELECT_LEX *first= unit->first_select();
bool is_pushed_union= unit->derived && unit->derived->pushdown_derived;
for (SELECT_LEX *sl= first; sl; sl= sl->next_select()) for (SELECT_LEX *sl= first; sl; sl= sl->next_select())
{ {
@ -25698,9 +25731,12 @@ bool mysql_explain_union(THD *thd, SELECT_LEX_UNIT *unit, select_result *result)
} }
if (!(res= unit->prepare(unit->derived, result, if (!(res= unit->prepare(unit->derived, result,
SELECT_NO_UNLOCK | SELECT_DESCRIBE))) SELECT_NO_UNLOCK | SELECT_DESCRIBE)))
res= unit->exec(); {
if (!is_pushed_union)
res= unit->exec();
}
} }
else else
{ {
thd->lex->current_select= first; thd->lex->current_select= first;
unit->set_limit(unit->global_parameters()); unit->set_limit(unit->global_parameters());
@ -25716,6 +25752,13 @@ bool mysql_explain_union(THD *thd, SELECT_LEX_UNIT *unit, select_result *result)
first->options | thd->variables.option_bits | SELECT_DESCRIBE, first->options | thd->variables.option_bits | SELECT_DESCRIBE,
result, unit, first); result, unit, first);
} }
if (unit->derived && unit->derived->pushdown_derived)
{
delete unit->derived->pushdown_derived;
unit->derived->pushdown_derived= NULL;
}
DBUG_RETURN(res || thd->is_error()); DBUG_RETURN(res || thd->is_error());
} }
@ -27367,6 +27410,26 @@ Item *remove_pushed_top_conjuncts(THD *thd, Item *cond)
return cond; return cond;
} }
select_handler *SELECT_LEX::find_select_handler(THD *thd)
{
if (next_select())
return 0;
if (master_unit()->outer_select())
return 0;
for (TABLE_LIST *tbl= join->tables_list; tbl; tbl= tbl->next_local)
{
if (!tbl->table)
continue;
handlerton *ht= tbl->table->file->partition_ht();
if (!ht->create_select)
continue;
select_handler *sh= ht->create_select(thd, this);
return sh;
}
return 0;
}
/** /**
@} (end of group Query_Optimizer) @} (end of group Query_Optimizer)
*/ */

View file

@ -2442,9 +2442,53 @@ public:
~Pushdown_query() { delete handler; } ~Pushdown_query() { delete handler; }
/* Function that calls the above scan functions */ /* Function that calls the above scan functions */
int execute(JOIN *join); int execute(JOIN *);
}; };
class derived_handler;
class Pushdown_derived: public Sql_alloc
{
private:
bool is_analyze;
public:
TABLE_LIST *derived;
derived_handler *handler;
Pushdown_derived(TABLE_LIST *tbl, derived_handler *h);
~Pushdown_derived();
int execute();
};
class select_handler;
class Pushdown_select: public Sql_alloc
{
private:
bool is_analyze;
List<Item> result_columns;
bool send_result_set_metadata();
bool send_data();
bool send_eof();
public:
SELECT_LEX *select;
select_handler *handler;
Pushdown_select(SELECT_LEX *sel, select_handler *h);
~Pushdown_select();
bool init();
int execute();
};
bool test_if_order_compatible(SQL_I_List<ORDER> &a, SQL_I_List<ORDER> &b); bool test_if_order_compatible(SQL_I_List<ORDER> &a, SQL_I_List<ORDER> &b);
int test_if_group_changed(List<Cached_item> &list); int test_if_group_changed(List<Cached_item> &list);
int create_sort_index(THD *thd, JOIN *join, JOIN_TAB *tab, Filesort *fsort); int create_sort_index(THD *thd, JOIN *join, JOIN_TAB *tab, Filesort *fsort);

View file

@ -55,6 +55,8 @@ class Virtual_column_info;
class Table_triggers_list; class Table_triggers_list;
class TMP_TABLE_PARAM; class TMP_TABLE_PARAM;
class SEQUENCE; class SEQUENCE;
class derived_handler;
class Pushdown_derived;
/* /*
Used to identify NESTED_JOIN structures within a join (applicable only to Used to identify NESTED_JOIN structures within a join (applicable only to
@ -2118,6 +2120,8 @@ struct TABLE_LIST
TABLE_LIST * next_with_rec_ref; TABLE_LIST * next_with_rec_ref;
bool is_derived_with_recursive_reference; bool is_derived_with_recursive_reference;
bool block_handle_derived; bool block_handle_derived;
derived_handler *dt_handler;
Pushdown_derived *pushdown_derived;
ST_SCHEMA_TABLE *schema_table; /* Information_schema table */ ST_SCHEMA_TABLE *schema_table; /* Information_schema table */
st_select_lex *schema_select_lex; st_select_lex *schema_select_lex;
/* /*
@ -2584,6 +2588,9 @@ struct TABLE_LIST
} }
void set_lock_type(THD* thd, enum thr_lock_type lock); void set_lock_type(THD* thd, enum thr_lock_type lock);
derived_handler *find_derived_handler(THD *thd);
TABLE_LIST *get_first_table();
private: private:
bool prep_check_option(THD *thd, uint8 check_opt_type); bool prep_check_option(THD *thd, uint8 check_opt_type);
bool prep_where(THD *thd, Item **conds, bool no_where_clause); bool prep_where(THD *thd, Item **conds, bool no_where_clause);

View file

@ -0,0 +1,273 @@
/*
Copyright (c) 2019 MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
/* !!! For inclusion into ha_federatedx.cc */
static derived_handler*
create_federatedx_derived_handler(THD* thd, TABLE_LIST *derived)
{
ha_federatedx_derived_handler* handler = NULL;
handlerton *ht= 0;
SELECT_LEX_UNIT *unit= derived->derived;
for (SELECT_LEX *sl= unit->first_select(); sl; sl= sl->next_select())
{
if (!(sl->join))
return 0;
for (TABLE_LIST *tbl= sl->join->tables_list; tbl; tbl= tbl->next_local)
{
if (!tbl->table)
return 0;
if (!ht)
ht= tbl->table->file->partition_ht();
else if (ht != tbl->table->file->partition_ht())
return 0;
}
}
handler= new ha_federatedx_derived_handler(thd, derived);
return handler;
}
/*
Implementation class of the derived_handler interface for FEDERATEDX:
class implementation
*/
ha_federatedx_derived_handler::ha_federatedx_derived_handler(THD *thd,
TABLE_LIST *dt)
: derived_handler(thd, federatedx_hton),
share(NULL), txn(NULL), iop(NULL), stored_result(NULL)
{
derived= dt;
}
ha_federatedx_derived_handler::~ha_federatedx_derived_handler() {}
int ha_federatedx_derived_handler::init_scan()
{
char query_buff[4096];
THD *thd;
int rc= 0;
DBUG_ENTER("ha_federatedx_derived_handler::init_scan");
TABLE *table= derived->get_first_table()->table;
ha_federatedx *h= (ha_federatedx *) table->file;
iop= &h->io;
share= get_share(table->s->table_name.str, table);
thd= table->in_use;
txn= h->get_txn(thd);
if ((rc= txn->acquire(share, thd, TRUE, iop)))
DBUG_RETURN(rc);
String derived_query(query_buff, sizeof(query_buff), thd->charset());
derived_query.length(0);
derived->derived->print(&derived_query, QT_ORDINARY);
if ((*iop)->query(derived_query.ptr(), derived_query.length()))
goto err;
stored_result= (*iop)->store_result();
if (!stored_result)
goto err;
DBUG_RETURN(0);
err:
DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM);
}
int ha_federatedx_derived_handler::next_row()
{
int rc;
FEDERATEDX_IO_ROW *row;
ulong *lengths;
Field **field;
int column= 0;
Time_zone *saved_time_zone= table->in_use->variables.time_zone;
DBUG_ENTER("ha_federatedx_derived_handler::next_row");
if ((rc= txn->acquire(share, table->in_use, TRUE, iop)))
DBUG_RETURN(rc);
if (!(row= (*iop)->fetch_row(stored_result)))
DBUG_RETURN(HA_ERR_END_OF_FILE);
/* Convert row to internal format */
table->in_use->variables.time_zone= UTC;
lengths= (*iop)->fetch_lengths(stored_result);
for (field= table->field; *field; field++, column++)
{
if ((*iop)->is_column_null(row, column))
(*field)->set_null();
else
{
(*field)->set_notnull();
(*field)->store((*iop)->get_column_data(row, column),
lengths[column], &my_charset_bin);
}
}
table->in_use->variables.time_zone= saved_time_zone;
DBUG_RETURN(rc);
}
int ha_federatedx_derived_handler::end_scan()
{
DBUG_ENTER("ha_federatedx_derived_handler::end_scan");
(*iop)->free_result(stored_result);
free_share(txn, share);
DBUG_RETURN(0);
}
void ha_federatedx_derived_handler::print_error(int, unsigned long)
{
}
static select_handler*
create_federatedx_select_handler(THD* thd, SELECT_LEX *sel)
{
ha_federatedx_select_handler* handler = NULL;
handlerton *ht= 0;
for (TABLE_LIST *tbl= thd->lex->query_tables; tbl; tbl= tbl->next_global)
{
if (!tbl->table)
return 0;
if (!ht)
ht= tbl->table->file->partition_ht();
else if (ht != tbl->table->file->partition_ht())
return 0;
}
handler= new ha_federatedx_select_handler(thd, sel);
return handler;
}
/*
Implementation class of the select_handler interface for FEDERATEDX:
class implementation
*/
ha_federatedx_select_handler::ha_federatedx_select_handler(THD *thd,
SELECT_LEX *sel)
: select_handler(thd, federatedx_hton),
share(NULL), txn(NULL), iop(NULL), stored_result(NULL)
{
select= sel;
}
ha_federatedx_select_handler::~ha_federatedx_select_handler() {}
int ha_federatedx_select_handler::init_scan()
{
int rc= 0;
DBUG_ENTER("ha_federatedx_select_handler::init_scan");
TABLE *table= 0;
for (TABLE_LIST *tbl= thd->lex->query_tables; tbl; tbl= tbl->next_global)
{
if (!tbl->table)
continue;
table= tbl->table;
break;
}
ha_federatedx *h= (ha_federatedx *) table->file;
iop= &h->io;
share= get_share(table->s->table_name.str, table);
txn= h->get_txn(thd);
if ((rc= txn->acquire(share, thd, TRUE, iop)))
DBUG_RETURN(rc);
if ((*iop)->query(thd->query(), thd->query_length()))
goto err;
stored_result= (*iop)->store_result();
if (!stored_result)
goto err;
DBUG_RETURN(0);
err:
DBUG_RETURN(HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM);
}
int ha_federatedx_select_handler::next_row()
{
int rc= 0;
FEDERATEDX_IO_ROW *row;
ulong *lengths;
Field **field;
int column= 0;
Time_zone *saved_time_zone= table->in_use->variables.time_zone;
DBUG_ENTER("ha_federatedx_select_handler::next_row");
if ((rc= txn->acquire(share, table->in_use, TRUE, iop)))
DBUG_RETURN(rc);
if (!(row= (*iop)->fetch_row(stored_result)))
DBUG_RETURN(HA_ERR_END_OF_FILE);
/* Convert row to internal format */
table->in_use->variables.time_zone= UTC;
lengths= (*iop)->fetch_lengths(stored_result);
for (field= table->field; *field; field++, column++)
{
if ((*iop)->is_column_null(row, column))
(*field)->set_null();
else
{
(*field)->set_notnull();
(*field)->store((*iop)->get_column_data(row, column),
lengths[column], &my_charset_bin);
}
}
table->in_use->variables.time_zone= saved_time_zone;
DBUG_RETURN(rc);
}
int ha_federatedx_select_handler::end_scan()
{
DBUG_ENTER("ha_federatedx_derived_handler::end_scan");
free_tmp_table(thd, table);
table= 0;
(*iop)->free_result(stored_result);
free_share(txn, share);
DBUG_RETURN(0);
}
void ha_federatedx_select_handler::print_error(int, unsigned long)
{
}

View file

@ -0,0 +1,63 @@
/*
Copyright (c) 2019 MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "derived_handler.h"
#include "select_handler.h"
/*
Implementation class of the derived_handler interface for FEDERATEDX:
class declaration
*/
class ha_federatedx_derived_handler: public derived_handler
{
private:
FEDERATEDX_SHARE *share;
federatedx_txn *txn;
federatedx_io **iop;
FEDERATEDX_IO_RESULT *stored_result;
public:
ha_federatedx_derived_handler(THD* thd_arg, TABLE_LIST *tbl);
~ha_federatedx_derived_handler();
int init_scan();
int next_row();
int end_scan();
void print_error(int, unsigned long);
};
/*
Implementation class of the select_handler interface for FEDERATEDX:
class declaration
*/
class ha_federatedx_select_handler: public select_handler
{
private:
FEDERATEDX_SHARE *share;
federatedx_txn *txn;
federatedx_io **iop;
FEDERATEDX_IO_RESULT *stored_result;
public:
ha_federatedx_select_handler(THD* thd_arg, SELECT_LEX *sel);
~ha_federatedx_select_handler();
int init_scan();
int next_row();
int end_scan();
void print_error(int, unsigned long);
};

View file

@ -319,6 +319,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "sql_analyse.h" // append_escaped() #include "sql_analyse.h" // append_escaped()
#include "sql_show.h" // append_identifier() #include "sql_show.h" // append_identifier()
#include "tztime.h" // my_tz_find() #include "tztime.h" // my_tz_find()
#include "sql_select.h"
#ifdef I_AM_PARANOID #ifdef I_AM_PARANOID
#define MIN_PORT 1023 #define MIN_PORT 1023
@ -401,6 +402,12 @@ static void init_federated_psi_keys(void)
#define init_federated_psi_keys() /* no-op */ #define init_federated_psi_keys() /* no-op */
#endif /* HAVE_PSI_INTERFACE */ #endif /* HAVE_PSI_INTERFACE */
handlerton* federatedx_hton;
static derived_handler*
create_federatedx_derived_handler(THD* thd, TABLE_LIST *derived);
static select_handler*
create_federatedx_select_handler(THD* thd, SELECT_LEX *sel);
/* /*
Initialize the federatedx handler. Initialize the federatedx handler.
@ -418,7 +425,7 @@ int federatedx_db_init(void *p)
{ {
DBUG_ENTER("federatedx_db_init"); DBUG_ENTER("federatedx_db_init");
init_federated_psi_keys(); init_federated_psi_keys();
handlerton *federatedx_hton= (handlerton *)p; federatedx_hton= (handlerton *)p;
federatedx_hton->state= SHOW_OPTION_YES; federatedx_hton->state= SHOW_OPTION_YES;
/* Needed to work with old .frm files */ /* Needed to work with old .frm files */
federatedx_hton->db_type= DB_TYPE_FEDERATED_DB; federatedx_hton->db_type= DB_TYPE_FEDERATED_DB;
@ -432,6 +439,8 @@ int federatedx_db_init(void *p)
federatedx_hton->discover_table_structure= ha_federatedx::discover_assisted; federatedx_hton->discover_table_structure= ha_federatedx::discover_assisted;
federatedx_hton->create= federatedx_create_handler; federatedx_hton->create= federatedx_create_handler;
federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED; federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED;
federatedx_hton->create_derived= create_federatedx_derived_handler;
federatedx_hton->create_select= create_federatedx_select_handler;
if (mysql_mutex_init(fe_key_mutex_federatedx, if (mysql_mutex_init(fe_key_mutex_federatedx,
&federatedx_mutex, MY_MUTEX_INIT_FAST)) &federatedx_mutex, MY_MUTEX_INIT_FAST))
@ -3668,6 +3677,7 @@ err1:
return error; return error;
} }
#include "federatedx_pushdown.cc"
struct st_mysql_storage_engine federatedx_storage_engine= struct st_mysql_storage_engine federatedx_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION }; { MYSQL_HANDLERTON_INTERFACE_VERSION };
@ -3689,3 +3699,4 @@ maria_declare_plugin(federatedx)
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
} }
maria_declare_plugin_end; maria_declare_plugin_end;

View file

@ -1,3 +1,5 @@
#ifndef HA_FEDERATEDX_INCLUDED
#define HA_FEDERATEDX_INCLUDED
/* /*
Copyright (c) 2008, Patrick Galbraith Copyright (c) 2008, Patrick Galbraith
All rights reserved. All rights reserved.
@ -445,6 +447,9 @@ public:
int external_lock(THD *thd, int lock_type); int external_lock(THD *thd, int lock_type);
int reset(void); int reset(void);
int free_result(void); int free_result(void);
friend class ha_federatedx_derived_handler;
friend class ha_federatedx_select_handler;
}; };
extern const char ident_quote_char; // Character for quoting extern const char ident_quote_char; // Character for quoting
@ -460,3 +465,7 @@ extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
FEDERATEDX_SERVER *server); FEDERATEDX_SERVER *server);
extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root, extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root,
FEDERATEDX_SERVER *server); FEDERATEDX_SERVER *server);
#include "federatedx_pushdown.h"
#endif /* HA_FEDERATEDX_INCLUDED */