diff --git a/sql/field.h b/sql/field.h index 76e18eeaeb4..c747bac2f8d 100644 --- a/sql/field.h +++ b/sql/field.h @@ -253,7 +253,15 @@ public: ptr-=row_offset; return tmp; } - + inline longlong val_int(char *new_ptr) + { + char *old_ptr= ptr; + longlong return_value; + ptr= new_ptr; + return_value= val_int(); + ptr= old_ptr; + return return_value; + } inline String *val_str(String *str, char *new_ptr) { char *old_ptr= ptr; diff --git a/sql/ha_archive.cc b/sql/ha_archive.cc index dfc01d01a80..aa93b09f212 100644 --- a/sql/ha_archive.cc +++ b/sql/ha_archive.cc @@ -104,6 +104,7 @@ rows - This is an unsigned long long which is the number of rows in the data file. check point - Reserved for future use + auto increment - MAX value for autoincrement dirty - Status of the file, whether or not its values are the latest. This flag is what causes a repair to occur @@ -125,9 +126,11 @@ static HASH archive_open_tables; #define ARN ".ARN" // Files used during an optimize call #define ARM ".ARM" // Meta file /* - uchar + uchar + ulonglong + ulonglong + uchar + uchar + uchar + ulonglong + ulonglong + ulonglong + uchar */ -#define META_BUFFER_SIZE 19 // Size of the data used in the meta file +#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \ + + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar) + /* uchar + uchar */ @@ -300,9 +303,11 @@ error: This method reads the header of a meta file and returns whether or not it was successful. *rows will contain the current number of rows in the data file upon success. */ -int ha_archive::read_meta_file(File meta_file, ha_rows *rows) +int ha_archive::read_meta_file(File meta_file, ha_rows *rows, + ulonglong *auto_increment) { uchar meta_buffer[META_BUFFER_SIZE]; + uchar *ptr= meta_buffer; ulonglong check_point; DBUG_ENTER("ha_archive::read_meta_file"); @@ -314,17 +319,24 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows) /* Parse out the meta data, we ignore version at the moment */ - *rows= (ha_rows)uint8korr(meta_buffer + 2); - check_point= uint8korr(meta_buffer + 10); + + ptr+= sizeof(uchar)*2; // Move past header + *rows= (ha_rows)uint8korr(ptr); + ptr+= sizeof(ulonglong); // Move past rows + check_point= uint8korr(ptr); + ptr+= sizeof(ulonglong); // Move past check_point + *auto_increment= uint8korr(ptr); + ptr+= sizeof(ulonglong); // Move past auto_increment DBUG_PRINT("ha_archive::read_meta_file", ("Check %d", (uint)meta_buffer[0])); DBUG_PRINT("ha_archive::read_meta_file", ("Version %d", (uint)meta_buffer[1])); - DBUG_PRINT("ha_archive::read_meta_file", ("Rows %lld", *rows)); - DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %lld", check_point)); - DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)meta_buffer[18])); + DBUG_PRINT("ha_archive::read_meta_file", ("Rows %llu", *rows)); + DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %llu", check_point)); + DBUG_PRINT("ha_archive::read_meta_file", ("Auto-Increment %llu", *auto_increment)); + DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr))); if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) || - ((bool)meta_buffer[18] == TRUE)) + ((bool)(*ptr)== TRUE)) DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); my_sync(meta_file, MYF(MY_WME)); @@ -337,22 +349,34 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows) By setting dirty you say whether or not the file represents the actual state of the data file. Upon ::open() we set to dirty, and upon ::close() we set to clean. */ -int ha_archive::write_meta_file(File meta_file, ha_rows rows, bool dirty) +int ha_archive::write_meta_file(File meta_file, ha_rows rows, + ulonglong auto_increment, bool dirty) { uchar meta_buffer[META_BUFFER_SIZE]; + uchar *ptr= meta_buffer; ulonglong check_point= 0; //Reserved for the future DBUG_ENTER("ha_archive::write_meta_file"); - meta_buffer[0]= (uchar)ARCHIVE_CHECK_HEADER; - meta_buffer[1]= (uchar)ARCHIVE_VERSION; - int8store(meta_buffer + 2, (ulonglong)rows); - int8store(meta_buffer + 10, check_point); - *(meta_buffer + 18)= (uchar)dirty; - DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", (uint)ARCHIVE_CHECK_HEADER)); - DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", (uint)ARCHIVE_VERSION)); + *ptr= (uchar)ARCHIVE_CHECK_HEADER; + ptr += sizeof(uchar); + *ptr= (uchar)ARCHIVE_VERSION; + ptr += sizeof(uchar); + int8store(ptr, (ulonglong)rows); + ptr += sizeof(ulonglong); + int8store(ptr, check_point); + ptr += sizeof(ulonglong); + int8store(ptr, auto_increment); + ptr += sizeof(ulonglong); + *ptr= (uchar)dirty; + DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", + (uint)ARCHIVE_CHECK_HEADER)); + DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", + (uint)ARCHIVE_VERSION)); DBUG_PRINT("ha_archive::write_meta_file", ("Rows %llu", (ulonglong)rows)); DBUG_PRINT("ha_archive::write_meta_file", ("Checkpoint %llu", check_point)); + DBUG_PRINT("ha_archive::write_meta_file", ("Auto Increment %llu", + auto_increment)); DBUG_PRINT("ha_archive::write_meta_file", ("Dirty %d", (uint)dirty)); VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0))); @@ -414,17 +438,19 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table) opposite. If the meta file will not open we assume it is crashed and leave it up to the user to fix. */ - if (read_meta_file(share->meta_file, &share->rows_recorded)) + if (read_meta_file(share->meta_file, &share->rows_recorded, + &share->auto_increment_value)) share->crashed= TRUE; else - (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE); - + (void)write_meta_file(share->meta_file, share->rows_recorded, + share->auto_increment_value, TRUE); /* It is expensive to open and close the data files and since you can't have a gzip file that can be both read and written we keep a writer open that is shared amoung all open tables. */ - if (!(azopen(&(share->archive_write), share->data_file_name, O_WRONLY|O_APPEND|O_BINARY))) + if (!(azopen(&(share->archive_write), share->data_file_name, + O_WRONLY|O_APPEND|O_BINARY))) { DBUG_PRINT("info", ("Could not open archive write file")); share->crashed= TRUE; @@ -452,7 +478,8 @@ int ha_archive::free_share(ARCHIVE_SHARE *share) hash_delete(&archive_open_tables, (byte*) share); thr_lock_delete(&share->lock); VOID(pthread_mutex_destroy(&share->mutex)); - (void)write_meta_file(share->meta_file, share->rows_recorded, FALSE); + (void)write_meta_file(share->meta_file, share->rows_recorded, + share->auto_increment_value, FALSE); if (azclose(&(share->archive_write))) rc= 1; if (my_close(share->meta_file, MYF(0))) @@ -561,7 +588,27 @@ int ha_archive::create(const char *name, TABLE *table_arg, error= my_errno; goto error; } - write_meta_file(create_file, 0, FALSE); + + for (uint key= 0; key < table_arg->s->keys; key++) + { + KEY *pos= table_arg->key_info+key; + KEY_PART_INFO *key_part= pos->key_part; + KEY_PART_INFO *key_part_end= key_part + pos->key_parts; + + for (; key_part != key_part_end; key_part++) + { + Field *field= key_part->field; + + DBUG_PRINT("info", ("Looking at field index%s", field->field_name)); + if (!(field->flags & AUTO_INCREMENT_FLAG)) + { + error= -1; + goto error; + } + } + } + + write_meta_file(create_file, 0, 0, FALSE); my_close(create_file,MYF(0)); /* @@ -614,7 +661,8 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer) DBUG_ENTER("ha_archive::real_write_row"); written= azwrite(writer, buf, table->s->reclength); - DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", written, table->s->reclength)); + DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", + written, table->s->reclength)); if (!delayed_insert || !bulk_insert) share->dirty= TRUE; @@ -655,6 +703,8 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer) int ha_archive::write_row(byte *buf) { int rc; + byte *read_buf= NULL; + ulonglong temp_auto; DBUG_ENTER("ha_archive::write_row"); if (share->crashed) @@ -664,13 +714,164 @@ int ha_archive::write_row(byte *buf) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) table->timestamp_field->set_time(); pthread_mutex_lock(&share->mutex); + + if (table->next_number_field) + { + KEY *mkey= &table->s->key_info[0]; // We only support one key right now + update_auto_increment(); + temp_auto= table->next_number_field->val_int(); + DBUG_PRINT("info", ("archive would see %d and %d", + temp_auto, share->auto_increment_value)); + + /* + Bad news, this will cause a search for the unique value which is very + expensive since we will have to do a table scan which will lock up + all other writers during this period. This could perhaps be optimized + in the future. + */ + if (temp_auto == share->auto_increment_value && + mkey->flags & HA_NOSAME) + { + rc= HA_ERR_FOUND_DUPP_KEY; + goto error; + } + + if (temp_auto < share->auto_increment_value && + mkey->flags & HA_NOSAME) + { + /* + First we create a buffer that we can use for reading rows, and can pass + to get_row(). + */ + if (!(read_buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) + { + rc= HA_ERR_OUT_OF_MEM; + goto error; + } + /* + All of the buffer must be written out or we won't see all of the + data + */ + azflush(&(share->archive_write), Z_SYNC_FLUSH); + /* + Set the position of the local read thread to the beginning postion. + */ + if (read_data_header(&archive)) + { + rc= HA_ERR_CRASHED_ON_USAGE; + goto error; + } + + /* + Now we read and check all of the rows. + if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length())) + */ + Field *mfield= table->next_number_field; + + while (!(get_row(&archive, read_buf))) + { + if ((longlong)temp_auto == + mfield->val_int((char*)(read_buf + mfield->offset()))) + { + rc= HA_ERR_FOUND_DUPP_KEY; + goto error; + } + } + } + else + { + auto_increment_value= share->auto_increment_value= temp_auto; + } + } + + /* + Notice that the global auto_increment has been increased. + In case of a failed row write, we will never try to reuse the value. + */ + share->rows_recorded++; rc= real_write_row(buf, &(share->archive_write)); +error: pthread_mutex_unlock(&share->mutex); + if (read_buf) + my_free(read_buf, MYF(0)); DBUG_RETURN(rc); } + +ulonglong ha_archive::get_auto_increment() +{ + return auto_increment_value= ++share->auto_increment_value; +} + +/* Initialized at each key walk (called multiple times unlike rnd_init()) */ +int ha_archive::index_init(uint keynr, bool sorted) +{ + DBUG_ENTER("ha_archive::index_init"); + active_index= keynr; + DBUG_RETURN(0); +} + + +/* + No indexes, so if we get a request for an index search since we tell + the optimizer that we have unique indexes, we scan +*/ +int ha_archive::index_read(byte *buf, const byte *key, + uint key_len, enum ha_rkey_function find_flag) +{ + int rc; + DBUG_ENTER("ha_archive::index_read"); + rc= index_read_idx(buf, active_index, key, key_len, find_flag); + DBUG_RETURN(rc); +} + + +int ha_archive::index_read_idx(byte *buf, uint index, const byte *key, + uint key_len, enum ha_rkey_function find_flag) +{ + int rc= 0; + bool found= 0; + KEY *mkey= &table->s->key_info[index]; + uint k_offset= mkey->key_part->offset; + + + DBUG_ENTER("ha_archive::index_read_idx"); + + /* + All of the buffer must be written out or we won't see all of the + data + */ + pthread_mutex_lock(&share->mutex); + azflush(&(share->archive_write), Z_SYNC_FLUSH); + pthread_mutex_unlock(&share->mutex); + + /* + Set the position of the local read thread to the beginning postion. + */ + if (read_data_header(&archive)) + { + rc= HA_ERR_CRASHED_ON_USAGE; + goto error; + } + + while (!(get_row(&archive, buf))) + { + if (!memcmp(key, buf+k_offset, key_len)) + { + found= 1; + break; + } + } + + if (found) + DBUG_RETURN(0); + +error: + DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE); +} + /* All calls that need to scan the table start with this method. If we are told that it is a table scan we rewind the file to the beginning, otherwise @@ -726,7 +927,8 @@ int ha_archive::get_row(azio_stream *file_to_read, byte *buf) DBUG_ENTER("ha_archive::get_row"); read= azread(file_to_read, buf, table->s->reclength); - DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read, table->s->reclength)); + DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read, + table->s->reclength)); if (read == Z_STREAM_ERROR) DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); @@ -912,9 +1114,18 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) if (!rc) { share->rows_recorded= 0; + auto_increment_value= share->auto_increment_value= 0; while (!(rc= get_row(&archive, buf))) { real_write_row(buf, &writer); + if (table->found_next_number_field) + { + Field *field= table->found_next_number_field; + if (share->auto_increment_value < + field->val_int((char*)(buf + field->offset()))) + auto_increment_value= share->auto_increment_value= + field->val_int((char*)(buf + field->offset())); + } share->rows_recorded++; } } @@ -1028,6 +1239,9 @@ void ha_archive::info(uint flag) delete_length= 0; index_file_length=0; + if (flag & HA_STATUS_AUTO) + auto_increment_value= share->auto_increment_value; + DBUG_VOID_RETURN; } diff --git a/sql/ha_archive.h b/sql/ha_archive.h index 7b957060f34..ec30ca7c10f 100644 --- a/sql/ha_archive.h +++ b/sql/ha_archive.h @@ -18,6 +18,7 @@ #pragma interface /* gcc class implementation */ #endif +#include #include #include "../storage/archive/azlib.h" @@ -38,13 +39,14 @@ typedef struct st_archive_share { bool dirty; /* Flag for if a flush should occur */ bool crashed; /* Meta file is crashed */ ha_rows rows_recorded; /* Number of rows in tables */ + ulonglong auto_increment_value; } ARCHIVE_SHARE; /* Version for file format. 1 - Initial Version */ -#define ARCHIVE_VERSION 1 +#define ARCHIVE_VERSION 2 class ha_archive: public handler { @@ -68,13 +70,22 @@ public: const char **bas_ext() const; ulong table_flags() const { - return (HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT | HA_NO_AUTO_INCREMENT | + return (HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT | HA_FILE_BASED | HA_CAN_INSERT_DELAYED | HA_CAN_GEOMETRY); } ulong index_flags(uint idx, uint part, bool all_parts) const { - return 0; + return HA_ONLY_WHOLE_INDEX; } + ulonglong get_auto_increment(); + uint max_supported_keys() const { return 1; } + uint max_supported_key_length() const { return sizeof(ulonglong); } + uint max_supported_key_part_length() const { return sizeof(ulonglong); } + int index_init(uint keynr, bool sorted); + virtual int index_read(byte * buf, const byte * key, + uint key_len, enum ha_rkey_function find_flag); + virtual int index_read_idx(byte * buf, uint index, const byte * key, + uint key_len, enum ha_rkey_function find_flag); int open(const char *name, int mode, uint test_if_locked); int close(void); int write_row(byte * buf); @@ -84,8 +95,9 @@ public: int rnd_next(byte *buf); int rnd_pos(byte * buf, byte *pos); int get_row(azio_stream *file_to_read, byte *buf); - int read_meta_file(File meta_file, ha_rows *rows); - int write_meta_file(File meta_file, ha_rows rows, bool dirty); + int read_meta_file(File meta_file, ha_rows *rows, ulonglong *auto_increment); + int write_meta_file(File meta_file, ha_rows rows, + ulonglong auto_increment, bool dirty); ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table); int free_share(ARCHIVE_SHARE *share); bool auto_repair() const { return 1; } // For the moment we just do this