/* Copyright (C) 2013 Codership Oy <info@codership.com>

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License along
   with this program; if not, write to the Free Software Foundation, Inc.,
   51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */

#include "mariadb.h"
#include "mysql/service_wsrep.h"
#include "wsrep_binlog.h"
#include "log.h"
#include "slave.h"
#include "log_event.h"
#include "wsrep_applier.h"
#include "wsrep_mysqld.h"

#include "transaction.h"

extern handlerton *binlog_hton;
/*
  Write the contents of a cache to a memory buffer.

  This function quite the same as MYSQL_BIN_LOG::write_cache(),
  with the exception that here we write in buffer instead of log file.
 */
int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
{
  *buf= NULL;
  *buf_len= 0;
  my_off_t const saved_pos(my_b_tell(cache));
  DBUG_ENTER("wsrep_write_cache_buf");

  if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
  {
    WSREP_ERROR("failed to initialize io-cache");
    DBUG_RETURN(ER_ERROR_ON_WRITE);
  }

  uint length= my_b_bytes_in_cache(cache);
  if (unlikely(0 == length)) length= my_b_fill(cache);

  size_t total_length= 0;

  if (likely(length > 0)) do
  {
      total_length += length;
      /*
        Bail out if buffer grows too large.
        A temporary fix to avoid allocating indefinitely large buffer,
        not a real limit on a writeset size which includes other things
        like header and keys.
      */
      if (total_length > wsrep_max_ws_size)
      {
          WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
                     wsrep_max_ws_size, total_length);
          goto error;
      }
      uchar* tmp= (uchar *)my_realloc(PSI_INSTRUMENT_ME, *buf, total_length,
                                       MYF(MY_ALLOW_ZERO_PTR));
      if (!tmp)
      {
          WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
                      *buf_len, length);
          goto error;
      }
      *buf= tmp;

      memcpy(*buf + *buf_len, cache->read_pos, length);
      *buf_len= total_length;

      if (cache->file < 0)
      {
        cache->read_pos= cache->read_end;
        break;
      }
  } while ((length= my_b_fill(cache)));

  if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
  {
    WSREP_WARN("failed to initialize io-cache");
    goto cleanup;
  }

  DBUG_RETURN(0);

error:
  if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
  {
    WSREP_WARN("failed to initialize io-cache");
  }
cleanup:
  my_free(*buf);
  *buf= NULL;
  *buf_len= 0;
  DBUG_RETURN(ER_ERROR_ON_WRITE);
}

#define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
                         * many transactions would fit in there
                         * so there is no need to reach for the heap */

/*
  Write the contents of a cache to wsrep provider.

  This function quite the same as MYSQL_BIN_LOG::write_cache(),
  with the exception that here we write in buffer instead of log file.

  This version uses incremental data appending as it reads it from cache.
 */
static int wsrep_write_cache_inc(THD*      const thd,
                                 IO_CACHE* const cache,
                                 size_t*   const len)
{
  DBUG_ENTER("wsrep_write_cache_inc");
  my_off_t const saved_pos(my_b_tell(cache));

  if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
  {
    WSREP_ERROR("failed to initialize io-cache");
    DBUG_RETURN(1);;
  }

  int ret= 0;
  size_t total_length(0);

  uint length(my_b_bytes_in_cache(cache));
  if (unlikely(0 == length)) length= my_b_fill(cache);

  if (likely(length > 0))
  {
    do
    {
      total_length += length;
      /* bail out if buffer grows too large
         not a real limit on a writeset size which includes other things
         like header and keys.
      */
      if (unlikely(total_length > wsrep_max_ws_size))
      {
        WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
                   wsrep_max_ws_size, total_length);
        ret= 1;
        goto cleanup;
      }
      if (thd->wsrep_cs().append_data(wsrep::const_buffer(cache->read_pos, length)))
        goto cleanup;
      cache->read_pos= cache->read_end;
    } while ((cache->file >= 0) && (length= my_b_fill(cache)));
    if (ret == 0)
    {
      assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
    }
  }

cleanup:
  *len= total_length;
  if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
  {
    WSREP_ERROR("failed to reinitialize io-cache");
  }
  DBUG_RETURN(ret);
}

/*
  Write the contents of a cache to wsrep provider.

  This function quite the same as MYSQL_BIN_LOG::write_cache(),
  with the exception that here we write in buffer instead of log file.
 */
int wsrep_write_cache(THD*      const thd,
                      IO_CACHE* const cache,
                      size_t*   const len)
{
  return wsrep_write_cache_inc(thd, cache, len);
}

void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
{
  int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld.log",
                    wsrep_data_home_dir, (longlong) thd->thread_id,
                    (longlong) wsrep_thd_trx_seqno(thd));
  if (len < 0)
  {
    WSREP_ERROR("snprintf error: %d, skipping dump.", len);
    return;
  }
  /*
    len doesn't count the \0 end-of-string. Use len+1 below
    to alloc and pass as an argument to snprintf.
  */

  char *filename= (char *) my_malloc(key_memory_WSREP, len+1, 0);
  int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld.log",
                    wsrep_data_home_dir, (longlong) thd->thread_id,
                    (long long)wsrep_thd_trx_seqno(thd));

  if (len > len1)
  {
    WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
    my_free(filename);
    return;
  }

  FILE *of= fopen(filename, "wb");

  if (of)
  {
    if (fwrite(rbr_buf, buf_len, 1, of) == 0)
       WSREP_ERROR("Failed to write buffer of length %llu to '%s'",
                   (unsigned long long)buf_len, filename);

    fclose(of);
  }
  else
  {
    WSREP_ERROR("Failed to open file '%s': %d (%s)",
                filename, errno, strerror(errno));
  }
  my_free(filename);
}

/* Dump replication buffer along with header to a file. */
void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
                                    size_t buf_len)
{
  DBUG_ENTER("wsrep_dump_rbr_buf_with_header");

  File file;
  IO_CACHE cache;
  Log_event_writer writer(&cache, 0);
  Format_description_log_event *ev= 0;

  longlong thd_trx_seqno= (long long)wsrep_thd_trx_seqno(thd);
  int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld_v2.log",
                    wsrep_data_home_dir, (longlong)thd->thread_id,
                    thd_trx_seqno);
  /*
    len doesn't count the \0 end-of-string. Use len+1 below
    to alloc and pass as an argument to snprintf.
  */
  char *filename;
  if (len < 0 || !(filename= (char*) my_malloc(key_memory_WSREP, len+1, 0)))
  {
    WSREP_ERROR("snprintf error: %d, skipping dump.", len);
    DBUG_VOID_RETURN;
  }

  int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld_v2.log",
                     wsrep_data_home_dir, (longlong) thd->thread_id,
                     thd_trx_seqno);

  if (len > len1)
  {
    WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
    my_free(filename);
    DBUG_VOID_RETURN;
  }

  if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
                             O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
  {
    WSREP_ERROR("Failed to open file '%s' : %d (%s)",
                filename, errno, strerror(errno));
    goto cleanup1;
  }

  if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
  {
    goto cleanup2;
  }

  if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
  {
    goto cleanup2;
  }

  /*
    Instantiate an FDLE object for non-wsrep threads (to be written
    to the dump file).
  */
  ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) :
    (new Format_description_log_event(4));

  if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
      flush_io_cache(&cache))
  {
    WSREP_ERROR("Failed to write to '%s'.", filename);
    goto cleanup2;
  }

cleanup2:
  end_io_cache(&cache);

cleanup1:
  my_free(filename);
  mysql_file_close(file, MYF(MY_WME));

  if (!thd->wsrep_applier) delete ev;

  DBUG_VOID_RETURN;
}

int wsrep_write_skip_event(THD* thd)
{
  DBUG_ENTER("wsrep_write_skip_event");
  Ignorable_log_event skip_event(thd);
  int ret= mysql_bin_log.write_event(&skip_event);
  if (ret)
  {
    WSREP_WARN("wsrep_write_skip_event: write to binlog failed: %d", ret);
  }
  if (!ret && (ret= trans_commit_stmt(thd)))
  {
    WSREP_WARN("wsrep_write_skip_event: statt commit failed");
  }
  DBUG_RETURN(ret);
}

int wsrep_write_dummy_event_low(THD *thd, const char *msg)
{
  ::abort();
  return 0;
}

int wsrep_write_dummy_event(THD *orig_thd, const char *msg)
{
  return 0;
}

bool wsrep_commit_will_write_binlog(THD *thd)
{
  return (!wsrep_emulate_bin_log && /* binlog enabled*/
          (wsrep_thd_is_local(thd) || /* local thd*/
           (thd->wsrep_applier_service && /* applier and log-slave-updates */
            opt_log_slave_updates)));
}

/*
  The last THD/commit_for_wait registered for group commit.
*/
static wait_for_commit *commit_order_tail= NULL;

void wsrep_register_for_group_commit(THD *thd)
{
  DBUG_ENTER("wsrep_register_for_group_commit");
  if (wsrep_emulate_bin_log)
  {
    /* Binlog is off, no need to maintain group commit queue */
    DBUG_VOID_RETURN;
  }

  DBUG_ASSERT(thd->wsrep_trx().ordered());

  wait_for_commit *wfc= thd->wait_for_commit_ptr= &thd->wsrep_wfc;

  mysql_mutex_lock(&LOCK_wsrep_group_commit);
  if (commit_order_tail)
  {
    wfc->register_wait_for_prior_commit(commit_order_tail);
  }
  commit_order_tail= thd->wait_for_commit_ptr;
  mysql_mutex_unlock(&LOCK_wsrep_group_commit);

  /*
    Now we have queued for group commit. If the commit will go
    through TC log_and_order(), the commit ordering is done
    by TC group commit. Otherwise the wait for prior
    commits to complete is done in ha_commit_one_phase().
  */
  DBUG_VOID_RETURN;
}

void wsrep_unregister_from_group_commit(THD *thd)
{
  DBUG_ASSERT(thd->wsrep_trx().ordered());
  wait_for_commit *wfc= thd->wait_for_commit_ptr;

  if (wfc)
  {
    mysql_mutex_lock(&LOCK_wsrep_group_commit);
    wfc->unregister_wait_for_prior_commit();
    thd->wakeup_subsequent_commits(0);

    /* The last one queued for group commit has completed commit, it is
       safe to set tail to NULL. */
    if (wfc == commit_order_tail)
      commit_order_tail= NULL;
    mysql_mutex_unlock(&LOCK_wsrep_group_commit);
    thd->wait_for_commit_ptr= NULL;
  }
}