Only write full transactions to binary log

A lot of new functions for BDB tables
Fix for DROP DATABASE on windows
Default server_id variables
This commit is contained in:
monty@donna.mysql.com 2000-12-07 14:08:48 +02:00
commit 87d9388e52
67 changed files with 887 additions and 322 deletions

View file

@ -37,12 +37,15 @@
transaction ?)
- When using ALTER TABLE IGNORE, we should not start an transaction, but do
everything wthout transactions.
- When we do rollback, we need to subtract the number of changed rows
from the updated tables.
Testing of:
- ALTER TABLE
- LOCK TABLES
- CHAR keys
- BLOBS
- Mark tables that participate in a transaction so that they are not
closed during the transaction. We need to test what happens if
MySQL closes a table that is updated by a not commit transaction.
*/
@ -58,19 +61,27 @@
#include <hash.h>
#include "ha_berkeley.h"
#include "sql_manager.h"
#include <stdarg.h>
#define HA_BERKELEY_ROWS_IN_TABLE 10000 /* to get optimization right */
#define HA_BERKELEY_RANGE_COUNT 100
#define HA_BERKELEY_MAX_ROWS 10000000 /* Max rows in table */
/* extra rows for estimate_number_of_rows() */
#define HA_BERKELEY_EXTRA_ROWS 100
/* Bits for share->status */
#define STATUS_PRIMARY_KEY_INIT 1
#define STATUS_ROW_COUNT_INIT 2
#define STATUS_BDB_ANALYZE 4
const char *ha_berkeley_ext=".db";
bool berkeley_skip=0;
u_int32_t berkeley_init_flags=0,berkeley_lock_type=DB_LOCK_DEFAULT;
bool berkeley_skip=0,berkeley_shared_data=0;
u_int32_t berkeley_init_flags= DB_PRIVATE, berkeley_lock_type=DB_LOCK_DEFAULT;
ulong berkeley_cache_size;
char *berkeley_home, *berkeley_tmpdir, *berkeley_logdir;
long berkeley_lock_scan_time=0;
ulong berkeley_trans_retry=5;
ulong berkeley_lock_max;
ulong berkeley_max_lock;
pthread_mutex_t bdb_mutex;
static DB_ENV *db_env;
@ -86,11 +97,13 @@ TYPELIB berkeley_lock_typelib= {array_elements(berkeley_lock_names),"",
static void berkeley_print_error(const char *db_errpfx, char *buffer);
static byte* bdb_get_key(BDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)));
static BDB_SHARE *get_share(const char *table_name);
static void free_share(BDB_SHARE *share);
static BDB_SHARE *get_share(const char *table_name, TABLE *table);
static void free_share(BDB_SHARE *share, TABLE *table);
static void update_status(BDB_SHARE *share, TABLE *table);
static void berkeley_noticecall(DB_ENV *db_env, db_notices notice);
/* General functions */
bool berkeley_init(void)
@ -121,14 +134,14 @@ bool berkeley_init(void)
db_env->set_cachesize(db_env, 0, berkeley_cache_size, 0);
db_env->set_lk_detect(db_env, berkeley_lock_type);
if (berkeley_lock_max)
db_env->set_lk_max(db_env, berkeley_lock_max);
if (berkeley_max_lock)
db_env->set_lk_max(db_env, berkeley_max_lock);
if (db_env->open(db_env,
berkeley_home,
berkeley_init_flags | DB_INIT_LOCK |
DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN |
DB_CREATE | DB_THREAD | DB_PRIVATE, 0666))
DB_CREATE | DB_THREAD, 0666))
{
db_env->close(db_env,0);
db_env=0;
@ -336,7 +349,7 @@ berkeley_key_cmp(TABLE *table, KEY *key_info, const char *key, uint key_length)
if (*key != (table->record[0][key_part->null_offset] &
key_part->null_bit) ? 0 : 1)
return 1;
if (!*key++) // Null value
if (!*key++) // Null value
continue;
}
if ((cmp=key_part->field->pack_cmp(key,key_part->length)))
@ -345,7 +358,7 @@ berkeley_key_cmp(TABLE *table, KEY *key_info, const char *key, uint key_length)
key+=length;
key_length-=length;
}
return 0;
return 0; // Identical keys
}
@ -387,7 +400,7 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
}
/* Init table lock structure */
if (!(share=get_share(name)))
if (!(share=get_share(name,table)))
{
my_free(rec_buff,MYF(0));
my_free(alloc_ptr,MYF(0));
@ -397,7 +410,7 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
if ((error=db_create(&file, db_env, 0)))
{
free_share(share);
free_share(share,table);
my_free(rec_buff,MYF(0));
my_free(alloc_ptr,MYF(0));
my_errno=error;
@ -413,7 +426,7 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
2 | 4),
"main", DB_BTREE, open_mode,0))))
{
free_share(share);
free_share(share,table);
my_free(rec_buff,MYF(0));
my_free(alloc_ptr,MYF(0));
my_errno=error;
@ -459,7 +472,6 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
}
}
}
/* Calculate pack_length of primary key */
if (!hidden_primary_key)
{
@ -470,12 +482,9 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
ref_length+= key_part->field->max_packed_col_length(key_part->length);
fixed_length_primary_key=
(ref_length == table->key_info[primary_key].key_length);
share->status|=STATUS_PRIMARY_KEY_INIT;
}
else
{
if (!share->primary_key_inited)
update_auto_primary_key();
}
get_status();
DBUG_RETURN(0);
}
@ -491,7 +500,7 @@ int ha_berkeley::close(void)
if (key_file[i] && (error=key_file[i]->close(key_file[i],0)))
result=error;
}
free_share(share);
free_share(share,table);
my_free(rec_buff,MYF(MY_ALLOW_ZERO_PTR));
my_free(alloc_ptr,MYF(MY_ALLOW_ZERO_PTR));
if (result)
@ -632,8 +641,8 @@ void ha_berkeley::unpack_key(char *record, DBT *key, uint index)
This will never fail as the key buffer is pre allocated.
*/
DBT *ha_berkeley::pack_key(DBT *key, uint keynr, char *buff,
const byte *record)
DBT *ha_berkeley::create_key(DBT *key, uint keynr, char *buff,
const byte *record, int key_length)
{
bzero((char*) key,sizeof(*key));
@ -647,11 +656,11 @@ DBT *ha_berkeley::pack_key(DBT *key, uint keynr, char *buff,
KEY *key_info=table->key_info+keynr;
KEY_PART_INFO *key_part=key_info->key_part;
KEY_PART_INFO *end=key_part+key_info->key_parts;
DBUG_ENTER("pack_key");
DBUG_ENTER("create_key");
key->data=buff;
for ( ; key_part != end ; key_part++)
for ( ; key_part != end && key_length > 0; key_part++)
{
if (key_part->null_bit)
{
@ -666,6 +675,7 @@ DBT *ha_berkeley::pack_key(DBT *key, uint keynr, char *buff,
}
buff=key_part->field->pack(buff,record + key_part->offset,
key_part->length);
key_length-=key_part->length;
}
key->size= (buff - (char*) key->data);
DBUG_DUMP("key",(char*) key->data, key->size);
@ -729,8 +739,8 @@ int ha_berkeley::write_row(byte * record)
if (table->keys == 1)
{
error=file->put(file, transaction, pack_key(&prim_key, primary_key,
key_buff, record),
error=file->put(file, transaction, create_key(&prim_key, primary_key,
key_buff, record),
&row, key_type[primary_key]);
}
else
@ -742,8 +752,8 @@ int ha_berkeley::write_row(byte * record)
if ((error=txn_begin(db_env, transaction, &sub_trans, 0)))
break;
DBUG_PRINT("trans",("starting subtransaction"));
if (!(error=file->put(file, sub_trans, pack_key(&prim_key, primary_key,
key_buff, record),
if (!(error=file->put(file, sub_trans, create_key(&prim_key, primary_key,
key_buff, record),
&row, key_type[primary_key])))
{
for (keynr=0 ; keynr < table->keys ; keynr++)
@ -751,8 +761,8 @@ int ha_berkeley::write_row(byte * record)
if (keynr == primary_key)
continue;
if ((error=key_file[keynr]->put(key_file[keynr], sub_trans,
pack_key(&key, keynr, key_buff2,
record),
create_key(&key, keynr, key_buff2,
record),
&prim_key, key_type[keynr])))
{
last_dup_key=keynr;
@ -783,6 +793,8 @@ int ha_berkeley::write_row(byte * record)
}
if (error == DB_KEYEXIST)
error=HA_ERR_FOUND_DUPP_KEY;
else if (!error)
changed_rows++;
DBUG_RETURN(error);
}
@ -838,7 +850,7 @@ int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed,
{
// Primary key changed or we are updating a key that can have duplicates.
// Delete the old row and add a new one
pack_key(&old_key, primary_key, key_buff2, old_row);
create_key(&old_key, primary_key, key_buff2, old_row);
if ((error=remove_key(trans, primary_key, old_row, (DBT *) 0, &old_key)))
DBUG_RETURN(error); // This should always succeed
if ((error=pack_row(&row, new_row, 0)))
@ -893,10 +905,10 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
}
else
{
pack_key(&prim_key, primary_key, key_buff, new_row);
create_key(&prim_key, primary_key, key_buff, new_row);
if ((primary_key_changed=key_cmp(primary_key, old_row, new_row)))
pack_key(&old_prim_key, primary_key, primary_key_buff, old_row);
create_key(&old_prim_key, primary_key, primary_key_buff, old_row);
else
old_prim_key=prim_key;
}
@ -921,7 +933,7 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
if ((error=remove_key(sub_trans, keynr, old_row, (DBT*) 0,
&old_prim_key)) ||
(error=key_file[keynr]->put(key_file[keynr], sub_trans,
pack_key(&key, keynr, key_buff2,
create_key(&key, keynr, key_buff2,
new_row),
&prim_key, key_type[keynr])))
{
@ -980,7 +992,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record,
error=key_file[keynr]->del(key_file[keynr], sub_trans,
keynr == primary_key ?
prim_key :
pack_key(&key, keynr, key_buff2, record),
create_key(&key, keynr, key_buff2, record),
0);
}
else
@ -997,7 +1009,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record,
if (!(error=cursor->c_get(cursor,
(keynr == primary_key ?
prim_key :
pack_key(&key, keynr, key_buff2, record)),
create_key(&key, keynr, key_buff2, record)),
(keynr == primary_key ?
packed_record : prim_key),
DB_GET_BOTH)))
@ -1046,7 +1058,7 @@ int ha_berkeley::delete_row(const byte * record)
if ((error=pack_row(&row, record, 0)))
DBUG_RETURN((error));
pack_key(&prim_key, primary_key, key_buff, record);
create_key(&prim_key, primary_key, key_buff, record);
if (hidden_primary_key)
keys|= (key_map) 1 << primary_key;
@ -1078,7 +1090,9 @@ int ha_berkeley::delete_row(const byte * record)
if (error != DB_LOCK_DEADLOCK)
break;
}
DBUG_RETURN(0);
if (!error)
changed_rows--;
DBUG_RETURN(error);
}
@ -1090,7 +1104,7 @@ int ha_berkeley::index_init(uint keynr)
dbug_assert(cursor == 0);
if ((error=file->cursor(key_file[keynr], transaction, &cursor,
table->reginfo.lock_type > TL_WRITE_ALLOW_READ ?
0 : 0)))
DB_RMW : 0)))
cursor=0; // Safety
bzero((char*) &last_key,sizeof(last_key));
DBUG_RETURN(error);
@ -1336,7 +1350,7 @@ void ha_berkeley::position(const byte *record)
memcpy_fixed(ref, (char*) current_ident, BDB_HIDDEN_PRIMARY_KEY_LENGTH);
}
else
pack_key(&key, primary_key, ref, record);
create_key(&key, primary_key, ref, record);
}
@ -1345,9 +1359,18 @@ void ha_berkeley::info(uint flag)
DBUG_ENTER("info");
if (flag & HA_STATUS_VARIABLE)
{
records = estimate_number_of_rows(); // Just to get optimisations right
records = share->rows; // Just to get optimisations right
deleted = 0;
}
if ((flag & HA_STATUS_CONST) || version != share->version)
{
version=share->version;
for (uint i=0 ; i < table->keys ; i++)
{
table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]=
share->rec_per_key[i];
}
}
else if (flag & HA_STATUS_ERRKEY)
errkey=last_dup_key;
DBUG_VOID_RETURN;
@ -1424,6 +1447,7 @@ int ha_berkeley::external_lock(THD *thd, int lock_type)
}
}
transaction= (DB_TXN*) thd->transaction.stmt.bdb_tid;
changed_rows=0;
}
else
{
@ -1437,6 +1461,7 @@ int ha_berkeley::external_lock(THD *thd, int lock_type)
current_row.data=0;
}
}
thread_safe_add(share->rows, changed_rows, &share->mutex);
current_row.data=0;
if (!--thd->transaction.bdb_lock_count)
{
@ -1607,6 +1632,142 @@ ha_rows ha_berkeley::records_in_range(int keynr,
DBUG_RETURN(rows <= 1.0 ? (ha_rows) 1 : (ha_rows) rows);
}
longlong ha_berkeley::get_auto_increment()
{
longlong nr=1; // Default if error or new key
int error;
(void) ha_berkeley::extra(HA_EXTRA_KEYREAD);
ha_berkeley::index_init(table->next_number_index);
if (!table->next_number_key_offset)
{ // Autoincrement at key-start
error=ha_berkeley::index_last(table->record[1]);
}
else
{
DBT row;
bzero((char*) &row,sizeof(row));
uint key_len;
KEY *key_info= &table->key_info[active_index];
/* Reading next available number for a sub key */
ha_berkeley::create_key(&last_key, active_index,
key_buff, table->record[0],
table->next_number_key_offset);
/* Store for compare */
memcpy(key_buff2, key_buff, (key_len=last_key.size));
key_info->handler.bdb_return_if_eq= -1;
error=read_row(cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE),
table->record[1], active_index, &row, (DBT*) 0, 0);
key_info->handler.bdb_return_if_eq= 0;
if (!error && !berkeley_key_cmp(table, key_info, key_buff2, key_len))
{
/*
Found matching key; Now search after next key, go one step back
and then we should have found the biggest key with the given
prefix
*/
(void) read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT_NODUP),
table->record[1], active_index, &row, (DBT*) 0, 0);
if (read_row(cursor->c_get(cursor, &last_key, &row, DB_PREV),
table->record[1], active_index, &row, (DBT*) 0, 0) ||
berkeley_key_cmp(table, key_info, key_buff2, key_len))
error=1; // Something went wrong
}
}
nr=(longlong)
table->next_number_field->val_int_offset(table->rec_buff_length)+1;
ha_berkeley::index_end();
(void) ha_berkeley::extra(HA_EXTRA_NO_KEYREAD);
return nr;
}
/****************************************************************************
Analyzing, checking, and optimizing tables
****************************************************************************/
static void print_msg(THD *thd, const char *table_name, const char *op_name,
const char *msg_type, const char *fmt, ...)
{
String* packet = &thd->packet;
packet->length(0);
char msgbuf[256];
msgbuf[0] = 0;
va_list args;
va_start(args,fmt);
my_vsnprintf(msgbuf, sizeof(msgbuf), fmt, args);
msgbuf[sizeof(msgbuf) - 1] = 0; // healthy paranoia
DBUG_PRINT(msg_type,("message: %s",msgbuf));
net_store_data(packet, table_name);
net_store_data(packet, op_name);
net_store_data(packet, msg_type);
net_store_data(packet, msgbuf);
if (my_net_write(&thd->net, (char*)thd->packet.ptr(),
thd->packet.length()))
thd->killed=1;
}
int ha_berkeley::analyze(THD* thd, HA_CHECK_OPT* check_opt)
{
DB_BTREE_STAT stat;
uint i;
for (i=0 ; i < table->keys ; i++)
{
file->stat(key_file[i], (void*) &stat, 0, 0);
share->rec_per_key[i]= stat.bt_ndata / stat.bt_nkeys;
}
/* If hidden primary key */
if (hidden_primary_key)
file->stat(file, (void*) &stat, 0, 0);
pthread_mutex_lock(&share->mutex);
share->rows=stat.bt_ndata;
share->status|=STATUS_BDB_ANALYZE; // Save status on close
share->version++; // Update stat in table
pthread_mutex_unlock(&share->mutex);
update_status(share,table); // Write status to file
return ((share->status & STATUS_BDB_ANALYZE) ? HA_ADMIN_FAILED :
HA_ADMIN_OK);
}
int ha_berkeley::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
return ha_berkeley::analyze(thd,check_opt);
}
int ha_berkeley::check(THD* thd, HA_CHECK_OPT* check_opt)
{
char name_buff[FN_REFLEN];
int error;
fn_format(name_buff,share->table_name,"", ha_berkeley_ext, 2 | 4);
if ((error=file->verify(file, name_buff, NullS, (FILE*) 0,
hidden_primary_key ? 0 : DB_NOORDERCHK)))
{
print_msg(thd, table->real_name, "check", "error",
"Got error %d checking file structure",error);
return HA_ADMIN_CORRUPT;
}
for (uint i=0 ; i < table->keys ; i++)
{
if ((error=file->verify(key_file[i], name_buff, NullS, (FILE*) 0,
DB_ORDERCHKONLY)))
{
print_msg(thd, table->real_name, "check", "error",
"Key %d was not in order",error);
return HA_ADMIN_CORRUPT;
}
}
return HA_ADMIN_OK;
}
/****************************************************************************
Handling the shared BDB_SHARE structure that is needed to provide table
locking.
@ -1619,19 +1780,21 @@ static byte* bdb_get_key(BDB_SHARE *share,uint *length,
return (byte*) share->table_name;
}
static BDB_SHARE *get_share(const char *table_name)
static BDB_SHARE *get_share(const char *table_name, TABLE *table)
{
BDB_SHARE *share;
pthread_mutex_lock(&bdb_mutex);
uint length=(uint) strlen(table_name);
if (!(share=(BDB_SHARE*) hash_search(&bdb_open_tables, table_name, length)))
{
if ((share=(BDB_SHARE *) my_malloc(sizeof(*share)+length+1,
if ((share=(BDB_SHARE *) my_malloc(sizeof(*share)+length+1 +
sizeof(ha_rows)* table->keys,
MYF(MY_WME | MY_ZEROFILL))))
{
share->table_name_length=length;
share->table_name=(char*) (share+1);
strmov(share->table_name,table_name);
share->rec_per_key= (ha_rows*) (share+1);
if (hash_insert(&bdb_open_tables, (char*) share))
{
pthread_mutex_unlock(&bdb_mutex);
@ -1647,11 +1810,14 @@ static BDB_SHARE *get_share(const char *table_name)
return share;
}
static void free_share(BDB_SHARE *share)
static void free_share(BDB_SHARE *share, TABLE *table)
{
pthread_mutex_lock(&bdb_mutex);
if (!--share->use_count)
{
update_status(share,table);
if (share->status_block)
share->status_block->close(share->status_block,0);
hash_delete(&bdb_open_tables, (gptr) share);
thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex);
@ -1660,20 +1826,124 @@ static void free_share(BDB_SHARE *share)
pthread_mutex_unlock(&bdb_mutex);
}
/*
Get status information that is stored in the 'status' sub database
and the max used value for the hidden primary key.
*/
void ha_berkeley::update_auto_primary_key()
void ha_berkeley::get_status()
{
pthread_mutex_lock(&share->mutex);
if (!share->primary_key_inited)
if (!test_all_bits(share->status,(STATUS_PRIMARY_KEY_INIT |
STATUS_ROW_COUNT_INIT)))
{
(void) extra(HA_EXTRA_KEYREAD);
index_init(primary_key);
if (!index_last(table->record[1]))
share->auto_ident=uint5korr(current_ident);
index_end();
(void) extra(HA_EXTRA_NO_KEYREAD);
pthread_mutex_lock(&share->mutex);
if (!(share->status & STATUS_PRIMARY_KEY_INIT))
{
(void) extra(HA_EXTRA_KEYREAD);
index_init(primary_key);
if (!index_last(table->record[1]))
share->auto_ident=uint5korr(current_ident);
index_end();
(void) extra(HA_EXTRA_NO_KEYREAD);
}
if (! share->status_block)
{
char name_buff[FN_REFLEN];
uint open_mode= (((table->db_stat & HA_READ_ONLY) ? DB_RDONLY : 0)
| DB_THREAD);
fn_format(name_buff, share->table_name,"", ha_berkeley_ext, 2 | 4);
if (!db_create(&share->status_block, db_env, 0))
{
if (!share->status_block->open(share->status_block, name_buff,
"status", DB_BTREE, open_mode, 0))
{
share->status_block->close(share->status_block, 0);
share->status_block=0;
}
}
}
if (!(share->status & STATUS_ROW_COUNT_INIT) && share->status_block)
{
share->org_rows=share->rows=
table->max_rows ? table->max_rows : HA_BERKELEY_MAX_ROWS;
if (!file->cursor(share->status_block, 0, &cursor, 0))
{
DBT row;
char rec_buff[64],*pos=rec_buff;
bzero((char*) &row,sizeof(row));
bzero((char*) &last_key,sizeof(last_key));
row.data=rec_buff;
row.size=sizeof(rec_buff);
row.flags=DB_DBT_USERMEM;
if (!cursor->c_get(cursor, &last_key, &row, DB_FIRST))
{
uint i;
share->org_rows=share->rows=uint4korr(pos); pos+=4;
for (i=0 ; i < table->keys ; i++)
{
share->rec_per_key[i]=uint4korr(pos); pos+=4;
}
}
cursor->c_close(cursor);
}
cursor=0; // Safety
}
share->status|= STATUS_PRIMARY_KEY_INIT | STATUS_ROW_COUNT_INIT;
pthread_mutex_unlock(&share->mutex);
}
pthread_mutex_unlock(&share->mutex);
}
static void update_status(BDB_SHARE *share, TABLE *table)
{
DBUG_ENTER("update_status");
if (share->rows != share->org_rows ||
(share->status & STATUS_BDB_ANALYZE))
{
pthread_mutex_lock(&share->mutex);
if (!share->status_block)
{
/*
Create sub database 'status' if it doesn't exist from before
(This '*should*' always exist for table created with MySQL)
*/
char name_buff[FN_REFLEN];
if (db_create(&share->status_block, db_env, 0))
goto end;
share->status_block->set_flags(share->status_block,0);
if (share->status_block->open(share->status_block,
fn_format(name_buff,share->table_name,"",
ha_berkeley_ext,2 | 4),
"status", DB_BTREE,
DB_THREAD | DB_CREATE, my_umask))
goto end;
}
{
uint i;
DBT row,key;
char rec_buff[4+MAX_KEY*sizeof(ulong)], *pos=rec_buff;
const char *key_buff="status";
bzero((char*) &row,sizeof(row));
bzero((char*) &key,sizeof(key));
row.data=rec_buff;
key.data=(void*) key_buff;
key.size=sizeof(key_buff);
row.flags=key.flags=DB_DBT_USERMEM;
int4store(pos,share->rows); pos+=4;
for (i=0 ; i < table->keys ; i++)
{
int4store(pos,share->rec_per_key[i]); pos+=4;
}
row.size=(uint) (pos-rec_buff);
(void) share->status_block->put(share->status_block, 0, &key, &row, 0);
share->status&= ~STATUS_BDB_ANALYZE;
}
end:
pthread_mutex_unlock(&share->mutex);
}
DBUG_VOID_RETURN;
}
/*
@ -1683,14 +1953,7 @@ void ha_berkeley::update_auto_primary_key()
ha_rows ha_berkeley::estimate_number_of_rows()
{
ulonglong max_ident;
ulonglong max_rows=table->max_rows ? table->max_rows : HA_BERKELEY_MAX_ROWS;
if (!hidden_primary_key)
return (ha_rows) max_rows;
pthread_mutex_lock(&share->mutex);
max_ident=share->auto_ident+EXTRA_RECORDS;
pthread_mutex_unlock(&share->mutex);
return (ha_rows) min(max_ident,max_rows);
return share->rows + HA_BERKELEY_EXTRA_ROWS;
}
#endif /* HAVE_BERKELEY_DB */