Fixed ability to read without read lock acquiring. (BUG#39665 related)

storage/maria/ma_pagecache.c:
  Fixed ability to read without read lock acquiring.
storage/maria/unittest/CMakeLists.txt:
  New unit test which tests simple read and prolonged writes consistency added.
storage/maria/unittest/Makefile.am:
  New unit test which tests simple read and prolonged writes consistency added.
storage/maria/unittest/ma_pagecache_rwconsist2.c:
  New unit test which tests simple read and prolonged writes consistency added.
This commit is contained in:
unknown 2008-10-16 22:44:12 +03:00
parent 8ecda6cd26
commit b70f7317fa
4 changed files with 551 additions and 59 deletions

View file

@ -2775,7 +2775,7 @@ void pagecache_unlock(PAGECACHE *pagecache,
inc_counter_for_resize_op(pagecache);
/* See NOTE for pagecache_unlock about registering requests */
block= find_block(pagecache, file, pageno, 0, 0,
test(pin == PAGECACHE_PIN_LEFT_UNPINNED), &page_st);
pin == PAGECACHE_PIN_LEFT_UNPINNED, &page_st);
PCBLOCK_INFO(block);
DBUG_ASSERT(block != 0 && page_st == PAGE_READ);
if (first_REDO_LSN_for_page)
@ -3080,6 +3080,146 @@ void pagecache_unpin_by_link(PAGECACHE *pagecache,
DBUG_VOID_RETURN;
}
/* description of how to change lock before and after read/write */
struct rw_lock_change
{
my_bool need_lock_change; /* need changing of lock at the end */
enum pagecache_page_lock new_lock; /* lock at the beginning */
enum pagecache_page_lock unlock_lock; /* lock at the end */
};
/* description of how to change pin before and after read/write */
struct rw_pin_change
{
enum pagecache_page_pin new_pin; /* pin status at the beginning */
enum pagecache_page_pin unlock_pin; /* pin status at the end */
};
/**
Depending on the lock which the user wants in pagecache_read(), we
need to acquire a first type of lock at start of pagecache_read(), and
downgrade it to a second type of lock at end. For example, if user
asked for no lock (PAGECACHE_LOCK_LEFT_UNLOCKED) this translates into
taking first a read lock PAGECACHE_LOCK_READ (to rightfully block on
existing write locks) then read then unlock the lock i.e. change lock
to PAGECACHE_LOCK_READ_UNLOCK (the "1" below tells that a change is
needed).
*/
static struct rw_lock_change lock_to_read[8]=
{
{ /*PAGECACHE_LOCK_LEFT_UNLOCKED*/
1,
PAGECACHE_LOCK_READ, PAGECACHE_LOCK_READ_UNLOCK
},
{ /*PAGECACHE_LOCK_LEFT_READLOCKED*/
0,
PAGECACHE_LOCK_LEFT_READLOCKED, PAGECACHE_LOCK_LEFT_READLOCKED
},
{ /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/
0,
PAGECACHE_LOCK_LEFT_WRITELOCKED, PAGECACHE_LOCK_LEFT_WRITELOCKED
},
{ /*PAGECACHE_LOCK_READ*/
1,
PAGECACHE_LOCK_READ, PAGECACHE_LOCK_LEFT_READLOCKED
},
{ /*PAGECACHE_LOCK_WRITE*/
1,
PAGECACHE_LOCK_WRITE, PAGECACHE_LOCK_LEFT_WRITELOCKED
},
{ /*PAGECACHE_LOCK_READ_UNLOCK*/
1,
PAGECACHE_LOCK_LEFT_READLOCKED, PAGECACHE_LOCK_READ_UNLOCK
},
{ /*PAGECACHE_LOCK_WRITE_UNLOCK*/
1,
PAGECACHE_LOCK_LEFT_WRITELOCKED, PAGECACHE_LOCK_WRITE_UNLOCK
},
{ /*PAGECACHE_LOCK_WRITE_TO_READ*/
1,
PAGECACHE_LOCK_LEFT_WRITELOCKED, PAGECACHE_LOCK_WRITE_TO_READ
}
};
/**
Two sets of pin modes (every as for lock upper but for pinning). The
difference between sets if whether we are going to provide caller with
reference on the block or not
*/
static struct rw_pin_change lock_to_pin[2][8]=
{
{
{ /*PAGECACHE_LOCK_LEFT_UNLOCKED*/
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_PIN_LEFT_UNPINNED
},
{ /*PAGECACHE_LOCK_LEFT_READLOCKED*/
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_PIN_LEFT_UNPINNED,
},
{ /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/
PAGECACHE_PIN_LEFT_PINNED,
PAGECACHE_PIN_LEFT_PINNED
},
{ /*PAGECACHE_LOCK_READ*/
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_PIN_LEFT_UNPINNED
},
{ /*PAGECACHE_LOCK_WRITE*/
PAGECACHE_PIN,
PAGECACHE_PIN_LEFT_PINNED
},
{ /*PAGECACHE_LOCK_READ_UNLOCK*/
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_PIN_LEFT_UNPINNED
},
{ /*PAGECACHE_LOCK_WRITE_UNLOCK*/
PAGECACHE_PIN_LEFT_PINNED,
PAGECACHE_UNPIN
},
{ /*PAGECACHE_LOCK_WRITE_TO_READ*/
PAGECACHE_PIN_LEFT_PINNED,
PAGECACHE_UNPIN
}
},
{
{ /*PAGECACHE_LOCK_LEFT_UNLOCKED*/
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_PIN_LEFT_UNPINNED
},
{ /*PAGECACHE_LOCK_LEFT_READLOCKED*/
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_PIN_LEFT_UNPINNED,
},
{ /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/
PAGECACHE_PIN_LEFT_PINNED,
PAGECACHE_PIN_LEFT_PINNED
},
{ /*PAGECACHE_LOCK_READ*/
PAGECACHE_PIN,
PAGECACHE_PIN_LEFT_PINNED
},
{ /*PAGECACHE_LOCK_WRITE*/
PAGECACHE_PIN,
PAGECACHE_PIN_LEFT_PINNED
},
{ /*PAGECACHE_LOCK_READ_UNLOCK*/
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_PIN_LEFT_UNPINNED
},
{ /*PAGECACHE_LOCK_WRITE_UNLOCK*/
PAGECACHE_PIN_LEFT_PINNED,
PAGECACHE_UNPIN
},
{ /*PAGECACHE_LOCK_WRITE_TO_READ*/
PAGECACHE_PIN_LEFT_PINNED,
PAGECACHE_PIN_LEFT_PINNED,
}
}
};
/*
@brief Read a block of data from a cached file into a buffer;
@ -3096,34 +3236,11 @@ void pagecache_unpin_by_link(PAGECACHE *pagecache,
@return address from where the data is placed if successful, 0 - otherwise.
@note Pin will be chosen according to lock parameter (see lock_to_pin)
*/
static enum pagecache_page_pin lock_to_pin[2][8]=
{
{
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_WRITE*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_TO_READ*/
},
{
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_READ*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_WRITE*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_WRITE_TO_READ*/
}
};
/**
@note 'buff', if not NULL, must be long-aligned.
@note If buff==0 then we provide reference on the page so should keep the
page pinned.
*/
uchar *pagecache_read(PAGECACHE *pagecache,
@ -3136,21 +3253,26 @@ uchar *pagecache_read(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK **page_link)
{
my_bool error= 0;
enum pagecache_page_pin pin= lock_to_pin[test(buff==0)][lock];
enum pagecache_page_pin
new_pin= lock_to_pin[buff==0][lock].new_pin,
unlock_pin= lock_to_pin[buff==0][lock].unlock_pin;
PAGECACHE_BLOCK_LINK *fake_link;
my_bool reg_request;
#ifndef DBUG_OFF
char llbuf[22];
DBUG_ENTER("pagecache_read");
DBUG_PRINT("enter", ("fd: %u page: %s buffer: 0x%lx level: %u "
"t:%s %s %s",
"t:%s (%d)%s->%s %s->%s",
(uint) file->file, ullstr(pageno, llbuf),
(ulong) buff, level,
page_cache_page_type_str[type],
page_cache_page_lock_str[lock],
page_cache_page_pin_str[pin]));
DBUG_ASSERT(buff != 0 || (buff == 0 && (pin == PAGECACHE_PIN ||
pin == PAGECACHE_PIN_LEFT_PINNED)));
lock_to_read[lock].need_lock_change,
page_cache_page_lock_str[lock_to_read[lock].new_lock],
page_cache_page_lock_str[lock_to_read[lock].unlock_lock],
page_cache_page_pin_str[new_pin],
page_cache_page_pin_str[unlock_pin]));
DBUG_ASSERT(buff != 0 || (buff == 0 && (unlock_pin == PAGECACHE_PIN ||
unlock_pin == PAGECACHE_PIN_LEFT_PINNED)));
DBUG_ASSERT(pageno < ((ULL(1)) << 40));
#endif
@ -3177,10 +3299,10 @@ restart:
inc_counter_for_resize_op(pagecache);
pagecache->global_cache_r_requests++;
/* See NOTE for pagecache_unlock about registering requests. */
reg_request= ((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
(pin == PAGECACHE_PIN));
reg_request= ((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
(new_pin == PAGECACHE_PIN));
block= find_block(pagecache, file, pageno, level,
test(lock == PAGECACHE_LOCK_WRITE),
lock == PAGECACHE_LOCK_WRITE,
reg_request, &page_st);
DBUG_PRINT("info", ("Block type: %s current type %s",
page_cache_page_type_str[block->type],
@ -3214,7 +3336,8 @@ restart:
block->type == PAGECACHE_EMPTY_PAGE)
block->type= type;
if (make_lock_and_pin(pagecache, block, lock, pin, FALSE))
if (make_lock_and_pin(pagecache, block, lock_to_read[lock].new_lock,
new_pin, FALSE))
{
/*
We failed to write lock the block, cache is unlocked,
@ -3262,12 +3385,20 @@ restart:
}
remove_reader(block);
if (lock_to_read[lock].need_lock_change)
{
if (make_lock_and_pin(pagecache, block,
lock_to_read[lock].unlock_lock,
unlock_pin, FALSE))
DBUG_ASSERT(0);
}
/*
Link the block into the LRU chain if it's the last submitted request
for the block and block will not be pinned.
See NOTE for pagecache_unlock about registering requests.
*/
if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN)
if (unlock_pin == PAGECACHE_PIN_LEFT_UNPINNED ||
unlock_pin == PAGECACHE_UNPIN)
unreg_request(pagecache, block, 1);
else
*page_link= block;
@ -3485,6 +3616,18 @@ void pagecache_add_level_by_link(PAGECACHE_BLOCK_LINK *block,
write locked before) or PAGECACHE_LOCK_WRITE (delete will write
lock page before delete)
*/
static enum pagecache_page_pin lock_to_pin_one_phase[8]=
{
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
PAGECACHE_PIN_LEFT_PINNED /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ*/,
PAGECACHE_PIN /*PAGECACHE_LOCK_WRITE*/,
PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
PAGECACHE_UNPIN /*PAGECACHE_LOCK_WRITE_TO_READ*/
};
my_bool pagecache_delete(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
pgcache_page_no_t pageno,
@ -3492,7 +3635,7 @@ my_bool pagecache_delete(PAGECACHE *pagecache,
my_bool flush)
{
my_bool error= 0;
enum pagecache_page_pin pin= lock_to_pin[0][lock];
enum pagecache_page_pin pin= lock_to_pin_one_phase[lock];
DBUG_ENTER("pagecache_delete");
DBUG_PRINT("enter", ("fd: %u page: %lu %s %s",
(uint) file->file, (ulong) pageno,
@ -3608,15 +3751,7 @@ my_bool pagecache_delete_pages(PAGECACHE *pagecache,
@retval 1 Error.
*/
/* description of how to change lock before and after write */
struct write_lock_change
{
int need_lock_change; /* need changing of lock at the end of write */
enum pagecache_page_lock new_lock; /* lock at the beginning */
enum pagecache_page_lock unlock_lock; /* lock at the end */
};
static struct write_lock_change write_lock_change_table[]=
static struct rw_lock_change write_lock_change_table[]=
{
{1,
PAGECACHE_LOCK_WRITE,
@ -3640,14 +3775,8 @@ static struct write_lock_change write_lock_change_table[]=
PAGECACHE_LOCK_WRITE_TO_READ} /*PAGECACHE_LOCK_WRITE_TO_READ*/
};
/* description of how to change pin before and after write */
struct write_pin_change
{
enum pagecache_page_pin new_pin; /* pin status at the beginning */
enum pagecache_page_pin unlock_pin; /* pin status at the end */
};
static struct write_pin_change write_pin_change_table[]=
static struct rw_pin_change write_pin_change_table[]=
{
{PAGECACHE_PIN_LEFT_PINNED,
PAGECACHE_PIN_LEFT_PINNED} /*PAGECACHE_PIN_LEFT_PINNED*/,
@ -3729,10 +3858,10 @@ restart:
reg_request= ((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
(pin == PAGECACHE_PIN));
block= find_block(pagecache, file, pageno, level,
test(write_mode != PAGECACHE_WRITE_DONE &&
lock != PAGECACHE_LOCK_LEFT_WRITELOCKED &&
lock != PAGECACHE_LOCK_WRITE_UNLOCK &&
lock != PAGECACHE_LOCK_WRITE_TO_READ),
(write_mode != PAGECACHE_WRITE_DONE &&
lock != PAGECACHE_LOCK_LEFT_WRITELOCKED &&
lock != PAGECACHE_LOCK_WRITE_UNLOCK &&
lock != PAGECACHE_LOCK_WRITE_TO_READ),
reg_request, &page_st);
if (!block)
{

View file

@ -94,3 +94,5 @@ SET_TARGET_PROPERTIES(ma_pagecache_consist_64kWR-t
PROPERTIES COMPILE_FLAGS "${ma_pagecache_common_cppflags} -DTEST_PAGE_SIZE=65536 -DTEST_WRITERS")
ADD_EXECUTABLE(ma_pagecache_rwconsist_1k-t ma_pagecache_rwconsist.c)
SET_TARGET_PROPERTIES(ma_pagecache_rwconsist_1k-t PROPERTIES COMPILE_FLAGS "-DTEST_PAGE_SIZE=1024")
ADD_EXECUTABLE(ma_pagecache_rwconsist2_1k-t ma_pagecache_rwconsist2.c)
SET_TARGET_PROPERTIES(ma_pagecache_rwconsist2_1k-t PROPERTIES COMPILE_FLAGS "-DTEST_PAGE_SIZE=1024")

View file

@ -39,6 +39,7 @@ noinst_PROGRAMS = ma_control_file-t trnman-t \
ma_pagecache_consist_1kWR-t \
ma_pagecache_consist_64kWR-t \
ma_pagecache_rwconsist_1k-t \
ma_pagecache_rwconsist2_1k-t \
ma_test_loghandler-t \
ma_test_loghandler_multigroup-t \
ma_test_loghandler_multithread-t \
@ -101,6 +102,8 @@ ma_pagecache_consist_64kWR_t_CPPFLAGS = $(ma_pagecache_common_cppflags) -DTEST_P
ma_pagecache_rwconsist_1k_t_SOURCES = ma_pagecache_rwconsist.c
ma_pagecache_rwconsist_1k_t_CPPFLAGS = -DTEST_PAGE_SIZE=1024
ma_pagecache_rwconsist2_1k_t_SOURCES = ma_pagecache_rwconsist2.c
ma_pagecache_rwconsist2_1k_t_CPPFLAGS = -DTEST_PAGE_SIZE=1024
# the generic lock manager may not be used in the end and lockman1-t crashes,
# and lockman2-t takes at least quarter an hour,

View file

@ -0,0 +1,358 @@
/* Copyright (C) 2006-2008 MySQL AB, 2008 Sun Microsystems, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/**
@file this unit tests consistence of long block writing under write lock
and simultaneous reading of this block with read request without read lock
requirement.
*/
/*
TODO: use pthread_join instead of wait_for_thread_count_to_be_zero, like in
my_atomic-t.c (see BUG#22320).
*/
#include <tap.h>
#include <my_sys.h>
#include <m_string.h>
#include "test_file.h"
#include <tap.h>
#define PCACHE_SIZE (TEST_PAGE_SIZE*1024*8)
#ifndef DBUG_OFF
static const char* default_dbug_option;
#endif
#define SLEEP my_sleep(5)
static char *file1_name= (char*)"page_cache_test_file_1";
static PAGECACHE_FILE file1;
static pthread_cond_t COND_thread_count;
static pthread_mutex_t LOCK_thread_count;
static uint thread_count= 0;
static PAGECACHE pagecache;
static uint number_of_readers= 5;
static uint number_of_writers= 5;
static uint number_of_read_tests= 20000;
static uint number_of_write_tests= 1000;
static uint report_divisor= 50;
/**
@brief Dummy pagecache callback.
*/
static my_bool
dummy_callback(uchar *page __attribute__((unused)),
pgcache_page_no_t page_no __attribute__((unused)),
uchar* data_ptr __attribute__((unused)))
{
return 0;
}
/**
@brief Dummy pagecache callback.
*/
static void
dummy_fail_callback(uchar* data_ptr __attribute__((unused)))
{
return;
}
/**
@brief Checks page consistency
@param buff pointer to the page content
@param task task ID
*/
void check_page(uchar *buff, int task)
{
uint i;
DBUG_ENTER("check_page");
for (i= 1; i < TEST_PAGE_SIZE; i++)
{
if (buff[0] != buff[i])
goto err;
}
DBUG_VOID_RETURN;
err:
diag("Task %d char #%u '%u' != '%u'", task, i, (uint) buff[0],
(uint) buff[i]);
DBUG_PRINT("err", ("try to flush"));
exit(1);
}
void reader(int num)
{
unsigned char buff[TEST_PAGE_SIZE];
uint i;
for (i= 0; i < number_of_read_tests; i++)
{
if (i % report_divisor == 0)
diag("Reader %d - %u", num, i);
pagecache_read(&pagecache, &file1, 0, 3, buff,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
NULL);
check_page(buff, num);
}
}
void writer(int num)
{
uint i;
uchar *buff;
PAGECACHE_BLOCK_LINK *link;
for (i= 0; i < number_of_write_tests; i++)
{
uchar c= (uchar) rand() % 256;
if (i % report_divisor == 0)
diag("Writer %d - %u", num, i);
buff= pagecache_read(&pagecache, &file1, 0, 3, NULL,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_WRITE,
&link);
check_page(buff, num);
bfill(buff, TEST_PAGE_SIZE / 2, c);
SLEEP;
bfill(buff + TEST_PAGE_SIZE/2, TEST_PAGE_SIZE / 2, c);
check_page(buff, num);
pagecache_unlock_by_link(&pagecache, link,
PAGECACHE_LOCK_WRITE_UNLOCK,
PAGECACHE_UNPIN, 0, 0, 1, FALSE);
SLEEP;
}
}
static void *test_thread_reader(void *arg)
{
int param=*((int*) arg);
my_thread_init();
{
DBUG_ENTER("test_reader");
DBUG_PRINT("enter", ("param: %d", param));
reader(param);
DBUG_PRINT("info", ("Thread %s ended", my_thread_name()));
pthread_mutex_lock(&LOCK_thread_count);
ok(1, "reader%d: done", param);
thread_count--;
VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */
pthread_mutex_unlock(&LOCK_thread_count);
free((uchar*) arg);
my_thread_end();
}
return 0;
}
static void *test_thread_writer(void *arg)
{
int param=*((int*) arg);
my_thread_init();
{
DBUG_ENTER("test_writer");
writer(param);
DBUG_PRINT("info", ("Thread %s ended", my_thread_name()));
pthread_mutex_lock(&LOCK_thread_count);
ok(1, "writer%d: done", param);
thread_count--;
VOID(pthread_cond_signal(&COND_thread_count)); /* Tell main we are ready */
pthread_mutex_unlock(&LOCK_thread_count);
free((uchar*) arg);
my_thread_end();
}
return 0;
}
int main(int argc __attribute__((unused)),
char **argv __attribute__((unused)))
{
pthread_t tid;
pthread_attr_t thr_attr;
int *param, error, pagen;
MY_INIT(argv[0]);
#ifndef DBUG_OFF
#if defined(__WIN__)
default_dbug_option= "d:t:i:O,\\test_pagecache_consist.trace";
#else
default_dbug_option= "d:t:i:O,/tmp/test_pagecache_consist.trace";
#endif
if (argc > 1)
{
DBUG_SET(default_dbug_option);
DBUG_SET_INITIAL(default_dbug_option);
}
#endif
{
DBUG_ENTER("main");
DBUG_PRINT("info", ("Main thread: %s\n", my_thread_name()));
plan(number_of_writers + number_of_readers);
SKIP_BIG_TESTS(number_of_writers + number_of_readers)
{
if ((file1.file= my_open(file1_name,
O_CREAT | O_TRUNC | O_RDWR, MYF(0))) == -1)
{
diag( "Got error during file1 creation from open() (errno: %d)\n",
errno);
exit(1);
}
pagecache_file_init(file1, &dummy_callback, &dummy_callback,
&dummy_fail_callback, &dummy_callback, NULL);
DBUG_PRINT("info", ("file1: %d", file1.file));
if (my_chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO, MYF(MY_WME)))
exit(1);
my_pwrite(file1.file, (const uchar*) "test file", 9, 0, MYF(0));
if ((error= pthread_cond_init(&COND_thread_count, NULL)))
{
diag( "COND_thread_count: %d from pthread_cond_init (errno: %d)\n",
error, errno);
exit(1);
}
if ((error= pthread_mutex_init(&LOCK_thread_count, MY_MUTEX_INIT_FAST)))
{
diag( "LOCK_thread_count: %d from pthread_cond_init (errno: %d)\n",
error, errno);
exit(1);
}
if ((error= pthread_attr_init(&thr_attr)))
{
diag("Got error: %d from pthread_attr_init (errno: %d)\n",
error,errno);
exit(1);
}
if ((error= pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED)))
{
diag(
"Got error: %d from pthread_attr_setdetachstate (errno: %d)\n",
error,errno);
exit(1);
}
#ifdef HAVE_THR_SETCONCURRENCY
VOID(thr_setconcurrency(2));
#endif
if ((pagen= init_pagecache(&pagecache, PCACHE_SIZE, 0, 0,
TEST_PAGE_SIZE, 0)) == 0)
{
diag("Got error: init_pagecache() (errno: %d)\n",
errno);
exit(1);
}
DBUG_PRINT("info", ("Page cache %d pages", pagen));
{
unsigned char *buffr= malloc(TEST_PAGE_SIZE);
memset(buffr, '\0', TEST_PAGE_SIZE);
pagecache_write(&pagecache, &file1, 0, 3, buffr,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY,
0, LSN_IMPOSSIBLE);
}
pthread_mutex_lock(&LOCK_thread_count);
while (number_of_readers != 0 || number_of_writers != 0)
{
if (number_of_readers != 0)
{
param=(int*) malloc(sizeof(int));
*param= number_of_readers + number_of_writers;
if ((error= pthread_create(&tid, &thr_attr, test_thread_reader,
(void*) param)))
{
diag("Got error: %d from pthread_create (errno: %d)\n",
error,errno);
exit(1);
}
thread_count++;
number_of_readers--;
}
if (number_of_writers != 0)
{
param=(int*) malloc(sizeof(int));
*param= number_of_writers + number_of_readers;
if ((error= pthread_create(&tid, &thr_attr, test_thread_writer,
(void*) param)))
{
diag("Got error: %d from pthread_create (errno: %d)\n",
error,errno);
exit(1);
}
thread_count++;
number_of_writers--;
}
}
DBUG_PRINT("info", ("Thread started"));
pthread_mutex_unlock(&LOCK_thread_count);
pthread_attr_destroy(&thr_attr);
/* wait finishing */
pthread_mutex_lock(&LOCK_thread_count);
while (thread_count)
{
if ((error= pthread_cond_wait(&COND_thread_count, &LOCK_thread_count)))
diag("COND_thread_count: %d from pthread_cond_wait\n", error);
}
pthread_mutex_unlock(&LOCK_thread_count);
DBUG_PRINT("info", ("thread ended"));
end_pagecache(&pagecache, 1);
DBUG_PRINT("info", ("Page cache ended"));
if (my_close(file1.file, MYF(0)) != 0)
{
diag( "Got error during file1 closing from close() (errno: %d)\n",
errno);
exit(1);
}
my_delete(file1_name, MYF(0));
DBUG_PRINT("info", ("file1 (%d) closed", file1.file));
DBUG_PRINT("info", ("Program end"));
} /* SKIP_BIG_TESTS */
my_end(0);
return exit_status();
}
}