WL#1791 Handler: support update of primary key

This commit is contained in:
mskold@mysql.com 2004-07-22 18:35:51 +02:00
parent 42df1f0686
commit 217bd8b6bd
6 changed files with 191 additions and 81 deletions

View file

@ -1,98 +1,105 @@
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (9410,9412);
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
SELECT pk1 FROM t1;
pk1
9410
9411
SELECT * FROM t1;
pk1 attr1
9410 9412
pk1 attr1 attr2 attr3
9410 9412 NULL 9412
9411 9413 17 9413
SELECT t1.* FROM t1;
pk1 attr1
9410 9412
pk1 attr1 attr2 attr3
9410 9412 NULL 9412
9411 9413 17 9413
UPDATE t1 SET attr1=1 WHERE pk1=9410;
SELECT * FROM t1;
pk1 attr1
9410 1
pk1 attr1 attr2 attr3
9410 1 NULL 9412
9411 9413 17 9413
UPDATE t1 SET pk1=2 WHERE attr1=1;
ERROR 42000: Table 't1' uses an extension that doesn't exist in this MySQL version
SELECT * FROM t1;
pk1 attr1
9410 1
pk1 attr1 attr2 attr3
2 1 NULL 9412
9411 9413 17 9413
UPDATE t1 SET pk1=pk1 + 1;
SELECT * FROM t1;
pk1 attr1 attr2 attr3
9412 9413 17 9413
3 1 NULL 9412
DELETE FROM t1;
SELECT * FROM t1;
pk1 attr1
INSERT INTO t1 VALUES (9410,9412), (9411, 9413), (9408, 8765),
(7,8), (8,9), (9,10), (10,11), (11,12), (12,13), (13,14);
pk1 attr1 attr2 attr3
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9408, 8765, NULL, '8765'),
(7,8, NULL, NULL), (8,9, NULL, NULL), (9,10, NULL, NULL), (10,11, NULL, NULL), (11,12, NULL, NULL), (12,13, NULL, NULL), (13,14, NULL, NULL);
UPDATE t1 SET attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9999
8 9999
9 9999
10 9999
11 9999
12 9999
13 9999
9408 9999
9410 9999
9411 9999
pk1 attr1 attr2 attr3
7 9999 NULL NULL
8 9999 NULL NULL
9 9999 NULL NULL
10 9999 NULL NULL
11 9999 NULL NULL
12 9999 NULL NULL
13 9999 NULL NULL
9408 9999 NULL 8765
9410 9999 NULL 9412
UPDATE t1 SET attr1 = 9998 WHERE pk1 < 1000;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9999
9410 9999
9411 9999
pk1 attr1 attr2 attr3
7 9998 NULL NULL
8 9998 NULL NULL
9 9998 NULL NULL
10 9998 NULL NULL
11 9998 NULL NULL
12 9998 NULL NULL
13 9998 NULL NULL
9408 9999 NULL 8765
9410 9999 NULL 9412
UPDATE t1 SET attr1 = 9997 WHERE attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9997
9410 9997
9411 9997
pk1 attr1 attr2 attr3
7 9998 NULL NULL
8 9998 NULL NULL
9 9998 NULL NULL
10 9998 NULL NULL
11 9998 NULL NULL
12 9998 NULL NULL
13 9998 NULL NULL
9408 9997 NULL 8765
9410 9997 NULL 9412
DELETE FROM t1 WHERE pk1 = 9410;
SELECT * FROM t1 ORDER BY pk1;
pk1 attr1
7 9998
8 9998
9 9998
10 9998
11 9998
12 9998
13 9998
9408 9997
9411 9997
pk1 attr1 attr2 attr3
7 9998 NULL NULL
8 9998 NULL NULL
9 9998 NULL NULL
10 9998 NULL NULL
11 9998 NULL NULL
12 9998 NULL NULL
13 9998 NULL NULL
9408 9997 NULL 8765
DELETE FROM t1;
SELECT * FROM t1;
pk1 attr1
INSERT INTO t1 values (1, 4), (2, 4), (3, 5), (4, 4), (5, 5);
pk1 attr1 attr2 attr3
INSERT INTO t1 values (1, 4, NULL, NULL), (2, 4, NULL, NULL), (3, 5, NULL, NULL), (4, 4, NULL, NULL), (5, 5, NULL, NULL);
DELETE FROM t1 WHERE attr1=4;
SELECT * FROM t1 order by pk1;
pk1 attr1
3 5
5 5
pk1 attr1 attr2 attr3
3 5 NULL NULL
5 5 NULL NULL
DELETE FROM t1;
INSERT INTO t1 VALUES (9410,9412), (9411, 9413);
INSERT INTO t1 VALUES (9410,9412, NULL, NULL), (9411, 9413, NULL, NULL);
DELETE FROM t1 WHERE pk1 = 9410;
SELECT * FROM t1;
pk1 attr1
9411 9413
pk1 attr1 attr2 attr3
9411 9413 NULL NULL
DROP TABLE t1;
CREATE TABLE t1 (id INT, id2 int) engine=ndbcluster;
INSERT INTO t1 values(3456, 7890);

View file

@ -91,6 +91,15 @@ a b c
4 6 12
5 7 12
6 7 12
update t1 set a = a + 10 where b > 1 and b < 7;
select * from t1 order by a;
a b c
5 7 12
6 7 12
11 2 13
12 3 13
13 4 12
14 6 12
drop table t1;
CREATE TABLE t1 (
a int unsigned NOT NULL PRIMARY KEY,

View file

@ -14,10 +14,12 @@ DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
#
CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY,
attr1 INT NOT NULL
attr1 INT NOT NULL,
attr2 INT,
attr3 VARCHAR(10)
) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (9410,9412);
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
SELECT pk1 FROM t1;
SELECT * FROM t1;
@ -27,18 +29,19 @@ SELECT t1.* FROM t1;
UPDATE t1 SET attr1=1 WHERE pk1=9410;
SELECT * FROM t1;
# Can't UPDATE PK! Test that correct error is returned
-- error 1112
# Update primary key
UPDATE t1 SET pk1=2 WHERE attr1=1;
SELECT * FROM t1;
UPDATE t1 SET pk1=pk1 + 1;
SELECT * FROM t1;
# Delete the record
DELETE FROM t1;
SELECT * FROM t1;
# Insert more records and update them all at once
INSERT INTO t1 VALUES (9410,9412), (9411, 9413), (9408, 8765),
(7,8), (8,9), (9,10), (10,11), (11,12), (12,13), (13,14);
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9408, 8765, NULL, '8765'),
(7,8, NULL, NULL), (8,9, NULL, NULL), (9,10, NULL, NULL), (10,11, NULL, NULL), (11,12, NULL, NULL), (12,13, NULL, NULL), (13,14, NULL, NULL);
UPDATE t1 SET attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;
@ -58,13 +61,13 @@ SELECT * FROM t1;
# Insert three records with attr1=4 and two with attr1=5
# Delete all with attr1=4
INSERT INTO t1 values (1, 4), (2, 4), (3, 5), (4, 4), (5, 5);
INSERT INTO t1 values (1, 4, NULL, NULL), (2, 4, NULL, NULL), (3, 5, NULL, NULL), (4, 4, NULL, NULL), (5, 5, NULL, NULL);
DELETE FROM t1 WHERE attr1=4;
SELECT * FROM t1 order by pk1;
DELETE FROM t1;
# Insert two records and delete one
INSERT INTO t1 VALUES (9410,9412), (9411, 9413);
INSERT INTO t1 VALUES (9410,9412, NULL, NULL), (9411, 9413, NULL, NULL);
DELETE FROM t1 WHERE pk1 = 9410;
SELECT * FROM t1;
DROP TABLE t1;

View file

@ -44,6 +44,9 @@ update t1 set c = 13 where b <= 3;
select * from t1 order by a;
update t1 set b = b + 1 where b > 4 and b < 7;
select * from t1 order by a;
-- Update primary key
update t1 set a = a + 10 where b > 1 and b < 7;
select * from t1 order by a;
#
# Delete using ordered index scan

View file

@ -593,7 +593,7 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op)
Read one record from NDB using primary key
*/
int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
{
uint no_fields= table->fields, i;
NdbConnection *trans= m_active_trans;
@ -624,11 +624,11 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
return res;
}
// Read non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS
// Read all wanted non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS
for (i= 0; i < no_fields; i++)
{
Field *field= table->field[i];
if ((thd->query_id == field->query_id) ||
if ((thd->query_id == field->query_id) ||
retrieve_all_fields)
{
if (get_ndb_value(op, i, field->ptr))
@ -657,6 +657,62 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
}
/*
Read one complementing record from NDB using primary key from old_data
*/
int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
{
uint no_fields= table->fields, i;
NdbConnection *trans= m_active_trans;
NdbOperation *op;
THD *thd= current_thd;
DBUG_ENTER("complemented_pk_read");
if (retrieve_all_fields)
// We have allready retrieved all fields, nothing to complement
DBUG_RETURN(0);
if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0)
goto err;
int res;
if (res= set_primary_key_from_old_data(op, old_data))
return res;
// Read all unreferenced non-key field(s)
for (i= 0; i < no_fields; i++)
{
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
(thd->query_id != field->query_id))
{
if (get_ndb_value(op, i, field->ptr))
goto err;
}
else
{
// Attribute was not to be read
m_value[i]= NULL;
}
}
if (trans->execute(NoCommit, IgnoreError) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
}
// The value have now been fetched from NDB
unpack_record(new_data);
table->status= 0;
DBUG_RETURN(0);
err:
ERR_RETURN(trans->getNdbError());
}
/*
Read one record from NDB using unique secondary index
*/
@ -1173,10 +1229,43 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (table->timestamp_on_update_now)
update_timestamp(new_data+table->timestamp_on_update_now-1);
/* Check for update of primary key and return error */
/* Check for update of primary key for special handling */
if ((table->primary_key != MAX_KEY) &&
(key_cmp(table->primary_key, old_data, new_data)))
DBUG_RETURN(HA_ERR_UNSUPPORTED);
{
DBUG_PRINT("info", ("primary key update, doing pk read+insert+delete"));
// Get all old fields, since we optimize away fields not in query
int read_res = complemented_pk_read(old_data, new_data);
if (read_res)
{
DBUG_PRINT("info", ("pk read failed"));
DBUG_RETURN(read_res);
}
// Insert new row
int insert_res = write_row(new_data);
if (!insert_res)
{
// Delete old row
DBUG_PRINT("info", ("insert succeded"));
int delete_res = delete_row(old_data);
if (!delete_res)
{
DBUG_PRINT("info", ("insert+delete succeeded"));
DBUG_RETURN(0);
}
else
{
DBUG_PRINT("info", ("delete failed"));
DBUG_RETURN(delete_row(new_data));
}
}
else
{
DBUG_PRINT("info", ("insert failed"));
DBUG_RETURN(insert_res);
}
}
if (cursor)
{
@ -1350,7 +1439,6 @@ void ha_ndbcluster::unpack_record(byte* buf)
DBUG_VOID_RETURN;
}
/*
Utility function to print/dump the fetched field
*/

View file

@ -156,8 +156,8 @@ class ha_ndbcluster: public handler
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
int pk_read(const byte *key, uint key_len,
byte *buf);
int pk_read(const byte *key, uint key_len, byte *buf);
int complemented_pk_read(const byte *old_data, byte *new_data);
int unique_index_read(const byte *key, uint key_len,
byte *buf);
int ordered_index_scan(const key_range *start_key,