Logo Search packages:      
Sourcecode: firebird1.5 version File versions

walw.cpp

/*
 *        PROGRAM:        JRD Write Ahead Log Writer
 *        MODULE:         walw.c
 *        DESCRIPTION:    Write Ahead Log Writer
 *
 * The contents of this file are subject to the Interbase Public
 * License Version 1.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy
 * of the License at http://www.Inprise.com/IPL.html
 *
 * Software distributed under the License is distributed on an
 * "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express
 * or implied. See the License for the specific language governing
 * rights and limitations under the License.
 *
 * The Original Code was created by Inprise Corporation
 * and its predecessors. Portions created by Inprise Corporation are
 * Copyright (C) Inprise Corporation.
 *
 * All Rights Reserved.
 * Contributor(s): ______________________________________.
 *
 * 2002.02.15 Sean Leyne - Code Cleanup, removed obsolete "DELTA" port
 * 2002.02.15 Sean Leyne - Code Cleanup, removed obsolete "IMP" port
 *
 * 2002.10.29 Sean Leyne - Removed obsolete "Netware" port
 *
 */

#include "firebird.h"
#include "../jrd/ib_stdio.h"
#include <stdlib.h>
#include <string.h>
#include "../jrd/jrd_time.h"
#include "../jrd/jrd.h"
#include "../jrd/isc.h"
#include "../wal/wal.h"
#include "../jrd/dsc.h"
#include "../jrd/jrn.h"
#include "../jrd/iberr.h"
#include "../jrd/llio.h"
#include "gen/codes.h"
#include "../jrd/license.h"
#include "../wal/wal_proto.h"
#include "../wal/walc_proto.h"
#include "../wal/walf_proto.h"
#include "../wal/walw_proto.h"
#include "../jrd/gds_proto.h"
#include "../jrd/iberr_proto.h"
#include "../jrd/isc_i_proto.h"
#include "../jrd/isc_s_proto.h"
#include "../jrd/jrn_proto.h"
#include "../jrd/llio_proto.h"
#include "../jrd/misc_proto.h"
#include "../jrd/divorce.h"
#include <setjmp.h>

#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

#if !(defined WIN_NT)
#ifndef VMS
#include <sys/types.h>
#include <sys/file.h>
#else
#include <types.h>
#include <file.h>
#endif
#include <errno.h>
#include <signal.h>
#endif

#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

#ifdef UNIX
#include <sys/stat.h>
#endif

#ifdef WIN_NT
#include <process.h>
#include <io.h>
#include <windows.h>
#ifdef TEXT
#undef TEXT
#endif
#define TEXT      SCHAR
#define ERRNO     GetLastError()
#endif


extern "C" {


#ifdef SUPERSERVER
#define exit(code)      return (code)
#ifdef WIN_NT
#define getpid          GetCurrentThreadId
#endif
#endif

#ifndef ERRNO
#define ERRNO     errno
#endif

typedef struct walwl
{
      jmp_buf walwl_env;
      SLONG walwl_log_fd;
      struct jrn *walwl_journal_handle;
      SLONG walwl_local_time[2];
      SLONG walwl_flags;
      IB_FILE *walwl_debug_fd;
} *WALWL;

#define WALW_WRITER_TIMEOUT_USECS   3000000
#define WALW_DISABLING_JRN          1
#define MAXLOGS                     32
#define WALW_ERROR(status_vector)   IBERR_build_status (status_vector, gds_walw_err, 0)

#define ENV             ((WALWL)WAL_handle->wal_local_info_ptr)->walwl_env
#define LOG_FD                ((WALWL)WAL_handle->wal_local_info_ptr)->walwl_log_fd
#define JOURNAL_HANDLE        ((WALWL)WAL_handle->wal_local_info_ptr)->walwl_journal_handle
#define LOCAL_TIME            ((WALWL)WAL_handle->wal_local_info_ptr)->walwl_local_time
#define LOCAL_FLAGS           ((WALWL)WAL_handle->wal_local_info_ptr)->walwl_flags
#define DEBUG_FD        ((WALWL)WAL_handle->wal_local_info_ptr)->walwl_debug_fd
#define DBNAME                WAL_handle->wal_dbname
#define PRINT_DEBUG_MSGS      (WAL_segment->wals_flags2 & WALS2_DEBUG_MSGS)

#define PRINT_TIME(fd,t)      { time((time_t*) t); ib_fprintf (fd, "%s", ctime((time_t*) t)); }

static void close_log(ISC_STATUS *, WAL, SCHAR *, WALFH, SLONG);
static SSHORT discard_prev_logs(ISC_STATUS *, SCHAR *, SCHAR *, SLONG, SSHORT);
static void finishup_checkpoint(WALS);
static SSHORT flush_all_buffers(ISC_STATUS *, WAL);
static SSHORT get_logfile_index(WALS, SCHAR *);
static BOOLEAN get_log_usability(ISC_STATUS *, SCHAR *, SCHAR *, SLONG);
static SSHORT get_next_logname(ISC_STATUS *, WALS, SCHAR *, SLONG *, SLONG *);
static SSHORT get_next_prealloc_logname(ISC_STATUS *, WALS, SCHAR *, SLONG *,
                                                            SLONG *);
static SSHORT get_next_serial_logname(ISC_STATUS *, WALS, SCHAR *, SLONG *,
                                                        SLONG *);
static BOOLEAN get_next_usable_partition(ISC_STATUS *, SCHAR *, SCHAR *, SLONG *);
static SSHORT get_overflow_logname(ISC_STATUS *, WALS, SCHAR *, SLONG *, SLONG *);
static void get_time_stamp(SLONG *);
static SSHORT increase_buffers(ISC_STATUS *, WAL, SSHORT);
static SSHORT init_raw_partitions(ISC_STATUS *, WAL);
static SSHORT journal_connect(ISC_STATUS *, WAL);
static void journal_disable(ISC_STATUS *, WAL, WALFH);
static SSHORT journal_enable(ISC_STATUS *, WAL);
static void prepare_wal_block(WALS, WALBLK *);
static void release_wal_block(WALS, WALBLK *);
static void report_walw_bug_or_error(ISC_STATUS *, struct wal *, SSHORT, ISC_STATUS);
static SSHORT rollover_log(ISC_STATUS *, WAL, WALFH);
static void setup_for_checkpoint(WALS);
static SSHORT setup_log(ISC_STATUS *, WAL, SCHAR *, SLONG, SLONG, SLONG *, WALFH,
                                    SSHORT, SCHAR *, SLONG);
static SSHORT setup_log_header_info(ISC_STATUS *, WAL, SCHAR *, SLONG, SLONG,
                                                      SLONG *, WALFH, SSHORT, SCHAR *, SLONG,
                                                      SSHORT *);
static SSHORT write_log_header_and_reposition(ISC_STATUS *, SCHAR *, SLONG,
                                                                    WALFH);
static SSHORT write_wal_block(ISC_STATUS *, WALBLK *, SCHAR *, SLONG);
static void write_wal_statistics(WAL);

static WAL_TERMINATOR(log_terminator_block);


#ifdef SUPERSERVER
int main_walw( char **argv)
#else

/* JMB - I didn't like have a "main" symbol is the shared library, so
 * I change the function name from main to walw_classic_main and
 * added the walw_main.c file.
 */
int CLIB_ROUTINE walw_classic_main( int argc, char **argv)
#endif
{
/**************************************
 *
 *    m a i n
 *
 **************************************
 *
 * Functional description
 *    WAL_writer is used only in the privileged WAL writer
 *    process.  One WAL writer process is started per database.
 *
 **************************************/
      ISC_STATUS_ARRAY status_vector;
      WAL WAL_handle;
      SCHAR dbg_file[MAXPATHLEN];
      IB_FILE *debug_fd;
      struct walwl local_info;
      SCHAR *dbname, c, *p, **end;
#ifdef SUPERSERVER
      int argc;

      argc = (int) argv[0];
#endif

      dbname = "";
      for (end = argv++ + argc; argv < end;) {
            p = *argv++;
            if (*p != '-')
                  dbname = p;
            else {
                  while (c = *++p)
                        switch (UPPER(c)) {
                        case 'D':
                              dbname = *argv++;
                              break;

                        case 'Z':
                              ib_printf("WAL writer version %s\n", GDS_VERSION);
                              exit(FINI_OK);
                        }
            }
      }

#ifdef UNIX
/* WAL writer has to run as 'root' to be able to access a
   raw device logs.  */

      if (setreuid(0, 0) == -1) {
            ib_printf
                  ("WAL writer: couldn't set uid to 0 for database %s, errno=%d\n",
                   dbname, errno);
            exit(FINI_ERROR);
      }

      divorce_terminal(0);
#endif

/* Create a debug file if not already present */

      WALC_build_dbg_filename(dbname, dbg_file);

      debug_fd = ib_fopen(dbg_file, "a"); /* Do it after divorce_terminal() */

      WAL_handle = NULL;
      if (WALC_init(status_vector, &WAL_handle, dbname, 0,
                          NULL, 0L, FALSE, 1L, 0, NULL, FALSE) != FB_SUCCESS) {
            gds__log_status(dbname, status_vector);
            gds__print_status(status_vector);
            exit(FINI_ERROR);
      }

      ISC_signal_init();
      WAL_handle->wal_local_info_ptr = (UCHAR *) & local_info;
      JOURNAL_HANDLE = NULL;
      LOCAL_FLAGS = 0L;

      DEBUG_FD = debug_fd;

      WALW_writer(status_vector, WAL_handle);

      WALC_fini(status_vector, &WAL_handle);

      ib_fclose(debug_fd);

      exit(FINI_OK);
}


#ifdef VMS
void ERR_post(stuff)
       ISC_STATUS stuff;
{
/**************************************
 *
 *    E R R _ p o s t
 *
 **************************************
 *
 * Functional description
 *    Post an error sequence to the status vector.  Since an error
 *    sequence can, in theory, be arbitrarily lock, pull a cheap
 *    trick to get the address of the argument vector.
 *
 **************************************/
      ISC_STATUS *p, *q;
      ISC_STATUS_ARRAY status_vector;
      int type;

/* Get the addresses of the argument vector and the status vector, and do
   word-wise copy. */

      p = status_vector;
      q = &stuff;

/* Copy first argument */

      *p++ = gds_arg_gds;
      *p++ = *((SLONG *) q)++;

/* Pick up remaining args */

      while (*p++ = type = *((int *) q)++)
            switch (type) {
            case gds_arg_gds:
                  *p++ = *((ISC_STATUS *) q)++;
                  break;

            case gds_arg_number:
                  *p++ = *((SLONG *) q)++;
                  break;

            case gds_arg_vms:
            case gds_arg_unix:
            case gds_arg_domain:
            case gds_arg_mpexl:
            default:
                  *p++ = *((int *) q)++;
                  break;

            case gds_arg_string:
            case gds_arg_interpreted:
                  *p++ = *q++;
                  break;

            case gds_arg_cstring:
                  *p++ = *((int *) q)++;
                  *p++ = *q++;
                  break;
            }

      gds__print_status(status_vector);

      exit(FINI_ERROR);
}
#endif


SSHORT WALW_writer(ISC_STATUS * status_vector, WAL WAL_handle)
{
/**************************************
 *
 *    W A L W _ w r i t e r
 *
 **************************************
 *
 * Functional description
 *    This is the main processing routine for the WAL writer.
 *    This routine simply waits on its semaphore in a loop.  If
 *    the semaphore is poked, or an alarm expires, it wakes up,
 *    looks for buffers to be written to the log file, writes
 *    them, and wakes up everyone waiting for a free buffer
 *    before going back to sleep.
 *
 *    In addition to writing WAL buffers to log files, it also
 *    takes care of such functions as checkpointing, rollover,
 *    enabling and disabling journalling and WAL shutdown.
 *
 **************************************/
      SSHORT ret;
      WALS WAL_segment;
      WALBLK *wblk;
      SSHORT bufnum, first_logfile;
      WALFH log_header;
      int buffer_full, journal_enable_or_disable, rollover_required;
      BOOLEAN acquired;
      EVENT ptr;
      SLONG value;
      SLONG log_type;
#define WALW_WRITER_RETURN(code) {gds__free((SLONG*)log_header); return(code);}

      acquired = FALSE;
      log_header = (WALFH) gds__alloc(WALFH_LENGTH);
/* NOMEM: return failure, FREE: error returns & macro WALW_WRITER_RETURN */
      if (!log_header) {
            return FB_FAILURE;
      }

      try {

/* If there already is a WAL writer running, return quietly. */

      if (WALC_check_writer(WAL_handle) == FB_SUCCESS)
            WALW_WRITER_RETURN(FB_SUCCESS);

/* Now we are the WAL writer. */

      ret = FB_SUCCESS;
      WAL_TERMINATOR_SET(log_terminator_block);

      WALC_acquire(WAL_handle, &WAL_segment);
      acquired = TRUE;
      if (PRINT_DEBUG_MSGS) {
            ib_fprintf(DEBUG_FD,
                           "====================================================\n");
            PRINT_TIME(DEBUG_FD, LOCAL_TIME);
            ib_fprintf(DEBUG_FD, "WAL writer for database %s starting, pid=%d.\n",
                           DBNAME, getpid());
      }

      WAL_segment->wals_writer_pid = getpid();
      WAL_segment->wals_flags &= ~WALS_ERROR_HAPPENED;
      WAL_segment->wals_last_err = 0;
      WAL_CHECK_BUG_ERROR(WAL_handle, WAL_segment);

      first_logfile =
            (WAL_segment->wals_flags & WALS_FIRST_TIME_LOG) ? TRUE : FALSE;
      log_type = 0L;
      if (first_logfile) {
            /* Initialize raw partitions which need root permission */

            init_raw_partitions(status_vector, WAL_handle);

            /* Get the new log name and partition */

            if (get_next_logname(status_vector, WAL_segment,
                                           WAL_segment->wals_logname,
                                           &WAL_segment->wals_log_partition_offset,
                                           &log_type) != FB_SUCCESS)
                        report_walw_bug_or_error(status_vector, WAL_handle, FB_FAILURE,
                                                             (ISC_STATUS) gds_wal_err_rollover2);
      }

      if (strlen(WAL_segment->wals_jrn_dirname))
            journal_connect(status_vector, WAL_handle);

      if (WAL_segment->wals_ckpted_log_seqno == 0L) {
            /* Initialize checkpoint info.  Current logfile may be assumed
               to have the latest checkpoint */

            WAL_segment->wals_ckpted_log_seqno = WAL_segment->wals_log_seqno;
            strcpy(WAL_segment->wals_ckpt_logname, WAL_segment->wals_logname);
            WAL_segment->wals_ckpt_log_p_offset =
                  WAL_segment->wals_log_partition_offset;
            WAL_segment->wals_ckpted_offset = 0;

            WAL_segment->wals_prev_ckpted_log_seqno = WAL_segment->wals_log_seqno;
            strcpy(WAL_segment->wals_prev_ckpt_logname,
                     WAL_segment->wals_logname);
            WAL_segment->wals_prev_ckpt_log_p_offset =
                  WAL_segment->wals_log_partition_offset;
            WAL_segment->wals_prev_ckpted_offset = 0;
      }

      ret = setup_log(status_vector, WAL_handle, WAL_segment->wals_logname,
                              WAL_segment->wals_log_partition_offset, log_type,
                              &(LOG_FD), log_header, first_logfile, "", 0L);
      if (ret == FB_SUCCESS) {
            WAL_segment->wals_flags |= WALS_WRITER_INITIALIZED;
            WAL_segment->wals_flags &= ~WALS_FIRST_TIME_LOG;
      }
      WALC_release(WAL_handle);
      acquired = FALSE;

      if (ret != FB_SUCCESS) {
            report_walw_bug_or_error(status_vector, WAL_handle, ret,
                                                 (ISC_STATUS) gds_wal_err_setup);
            if (JOURNAL_HANDLE)
                  JRN_fini(status_vector, &(JOURNAL_HANDLE));
            WALW_WRITER_RETURN(ret);
      }

      for (;;) {
            WALC_acquire(WAL_handle, &WAL_segment);
            acquired = TRUE;
            WAL_CHECK_BUG_ERROR(WAL_handle, WAL_segment);

            /* First find out the work to be done. */

            if (WAL_segment->wals_flags & WALS_SHUTDOWN_WRITER)
                  break;                        /* This is the only way to get out of this loop. */

            /* Check the next buffer to be written. */

            bufnum =
                  (WAL_segment->wals_last_flushed_buf +
                   1) % WAL_segment->wals_maxbufs;
            wblk = WAL_BLOCK(bufnum);
            buffer_full = wblk->walblk_flags & WALBLK_to_be_written;

            /* Check if journalling is enabled or disabled */

            journal_enable_or_disable =
                  (WAL_segment->wals_flags & WALS_ENABLE_JOURNAL) ||
                  (WAL_segment->wals_flags & WALS_DISABLE_JOURNAL);

            /* Check if rollover to a new log file is required */

            rollover_required = WAL_segment->wals_flags & WALS_ROLLOVER_REQUIRED;

            if (!buffer_full && !journal_enable_or_disable && !rollover_required) {
                  /* Nothing to do.  Prepare to wait for a timeout or a wakeup
                     from somebody else. */

                  ptr = &WAL_EVENTS[WAL_WRITER_WORK_SEM];
                  value = ISC_event_clear(ptr);
                  WAL_segment->wals_flags &= ~WALS_WRITER_INFORMED;
                  WALC_release(WAL_handle);
                  acquired = FALSE;
                  ISC_event_wait(1, &ptr, &value, WALW_WRITER_TIMEOUT_USECS,
                                       reinterpret_cast < void (*)() >
                                       (WALC_alarm_handler), ptr);
                  continue;
            }

            /* Now do the work */

            if (buffer_full) {
                  /* Prepare and flush the associated buffer to log file. */

                  prepare_wal_block(WAL_segment, wblk);

                  /* Do the write after releasing the shared segment so that
                     other concurrent activities may go on in the meantime. */

                  WALC_release(WAL_handle);
                  acquired = FALSE;

                  if ((ret = write_wal_block(status_vector, wblk,
                                                         WAL_segment->wals_logname,
                                                         LOG_FD)) !=
                        FB_SUCCESS) report_walw_bug_or_error(status_vector, WAL_handle,
                                                                          ret,
                                                                          (ISC_STATUS)
                                                                          gds_wal_err_logwrite);

                  WALC_acquire(WAL_handle, &WAL_segment);
                  acquired = TRUE;

                  wblk = WAL_BLOCK(bufnum);
                  release_wal_block(WAL_segment, wblk);

                  /* If the WAL segment needs to be expanded and can be expanded
                     try to do it now because no WAL I/O is in progress and we
                     can move things around. */

                  if (!(WAL_segment->wals_flags2 & WALS2_CANT_EXPAND) &&
                        (WAL_segment->wals_flags2 & WALS2_EXPAND)) {
                        increase_buffers(status_vector, WAL_handle, 1);
                        WAL_segment = WAL_handle->wal_segment;
                        WAL_segment->wals_buf_waiting_count = 0;
                        WAL_segment->wals_flags2 &= ~(WALS2_EXPAND);
                  }

                  /* If it is time for checkpointing, then initiate one. */

                  if (
                        (WAL_segment->wals_cur_ckpt_intrvl >
                         WAL_segment->wals_thrshold_intrvl)
                        && !(WAL_segment->wals_flags & WALS_CKPT_START)) {
                        setup_for_checkpoint(WAL_segment);
                  }

                  WAL_segment->wals_buf_waiters = 0;
                  ISC_event_post(WAL_EVENTS + WAL_WRITER_WORK_DONE_SEM);
                  ISC_event_clear(WAL_EVENTS + WAL_WRITER_WORK_DONE_SEM);
            }

            /* if a checkpoint has been recorded, discard old log files. */

            if ((WAL_segment->wals_flags & WALS_CKPT_START) &&
                  (WAL_segment->wals_flags & WALS_CKPT_RECORDED)) {
                  discard_prev_logs(status_vector, WAL_segment->wals_dbname,
                                            WAL_segment->wals_prev_ckpt_logname,
                                            WAL_segment->wals_prev_ckpt_log_p_offset,
                                            FALSE);
#ifdef SUPERSERVER
                  /* In Netware, file handles are shared if the file is reopened in the
                     same thread.  discard_prev_log() may open the current log file to
                     check its header for previously linked log files.  That would change
                     the file pointer to WALFH_LENGTH (2048) causing us to loose log records
                     in the current log file unless we do reseeking. This action won't
                     hurt on other platforms. */
                  lseek(LOG_FD, WAL_segment->wals_flushed_offset, SEEK_SET);
#endif
                  WAL_segment->wals_flags &=
                        ~(WALS_CKPT_START + WALS_CKPT_RECORDED);
            }

            /* If it is time to rollover to the next log file then do it.  But
               make sure that the last rollover information, if any, has
               already been recorded. */

            if (rollover_required) {
                  rollover_log(status_vector, WAL_handle, log_header);
                  WAL_segment->wals_buf_waiters = 0;
                  WAL_segment->wals_flags &= ~WALS_ROLLOVER_REQUIRED;
                  ISC_event_post(WAL_EVENTS + WAL_WRITER_WORK_DONE_SEM);
                  ISC_event_clear(WAL_EVENTS + WAL_WRITER_WORK_DONE_SEM);
            }

            /* If journalling has been enabled or disabled, take care of that
               situation now. */

            if (journal_enable_or_disable) {
                  if ((WAL_segment->wals_flags & WALS_ENABLE_JOURNAL) &&
                        !JOURNAL_HANDLE) journal_enable(status_vector, WAL_handle);
                  else if ((WAL_segment->wals_flags & WALS_DISABLE_JOURNAL) &&
                               JOURNAL_HANDLE)
                              journal_disable(status_vector, WAL_handle, log_header);

                  ISC_event_post(WAL_EVENTS + WAL_WRITER_WORK_DONE_SEM);
                  ISC_event_clear(WAL_EVENTS + WAL_WRITER_WORK_DONE_SEM);
            }

            WALC_release(WAL_handle);
            acquired = FALSE;
      }

/* Do cleanup and shutdown. */

      log_header->walfh_length = WAL_segment->wals_flushed_offset;
      log_header->walfh_hibsn = WAL_segment->wals_blkseqno - 1;
      close_log(status_vector, WAL_handle, WAL_segment->wals_logname,
                    log_header,
                    WAL_segment->wals_flags & WALS_INFORM_CLOSE_TO_JOURNAL);
      write_wal_statistics(WAL_handle);
      WAL_segment->wals_flags |= WALS_WRITER_DONE;
      WAL_segment->wals_writer_pid = 0;
      WALC_release(WAL_handle);
      acquired = FALSE;

      if (JOURNAL_HANDLE)
            JRN_fini(status_vector, &(JOURNAL_HANDLE));

      ISC_event_clear(WAL_EVENTS + WAL_WRITER_WORK_DONE_SEM);

      WALW_WRITER_RETURN(FB_SUCCESS);

      }     // try
      catch (const std::exception&) {
            if (acquired) {
                  WALC_release(WAL_handle);
            }
            WALW_WRITER_RETURN(FB_FAILURE);
      }
}


static void close_log(
                                ISC_STATUS * status_vector,
                                WAL WAL_handle,
                                SCHAR * logname, WALFH log_header, SLONG journal_flag)
{
/**************************************
 *
 *    c l o s e _ l o g
 *
 **************************************
 *
 * Functional description
 *    Closes the log file after updating its header with info from
 *    the passed log_header pointers.
 *
 **************************************/
      SSHORT ret;
      WALS WAL_segment;

      WAL_segment = WAL_handle->wal_segment;
      log_header->walfh_flags &= ~WALFH_OPEN;
      log_header->walfh_data_len =
            MISC_build_parameters_block(log_header->walfh_data,
                                                      PARAM_BYTE(WALFH_dbname),
                                                      PARAM_STRING(log_header->walfh_dbname),
                                                      PARAM_BYTE(WALFH_prev_logname),
                                                      PARAM_STRING(log_header->
                                                                         walfh_prev_logname),
                                                      PARAM_BYTE(WALFH_next_logname),
                                                      PARAM_STRING(log_header->
                                                                         walfh_next_logname),
                                                      PARAM_BYTE(WALFH_end), 0);
      if (log_header->walfh_dbname)
            gds__free((SLONG *) log_header->walfh_dbname);
      if (log_header->walfh_prev_logname)
            gds__free((SLONG *) log_header->walfh_prev_logname);
      if (log_header->walfh_next_logname)
            gds__free((SLONG *) log_header->walfh_next_logname);

      if ((ret = write_log_header_and_reposition(status_vector, logname, LOG_FD,
                                                                     log_header)) != FB_SUCCESS)
            report_walw_bug_or_error(status_vector, WAL_handle, ret,
                                                 (ISC_STATUS) gds_wal_err_logwrite);
      LLIO_close(0, LOG_FD);

      if (PRINT_DEBUG_MSGS) {
            PRINT_TIME(DEBUG_FD, LOCAL_TIME);
            ib_fprintf(DEBUG_FD,
                           "Closed seqno=%"SLONGFORMAT", log %s, p_offset=%"SLONGFORMAT
                           ", length=%"SLONGFORMAT"\n",
                           log_header->walfh_seqno, logname,
                           log_header->walfh_log_partition_offset,
                           log_header->walfh_length);
      }

      if (journal_flag && JOURNAL_HANDLE) {
            ret = JRN_put_wal_name(status_vector, JOURNAL_HANDLE,
                                             logname, strlen(logname),
                                             log_header->walfh_seqno,
                                             log_header->walfh_length,
                                             log_header->walfh_log_partition_offset,
                                             JRNW_CLOSE);

            if (PRINT_DEBUG_MSGS) {
                  PRINT_TIME(DEBUG_FD, LOCAL_TIME);
                  ib_fprintf(DEBUG_FD,
                                 "After calling JRN_put_wal_name() for seqno=%"SLONGFORMAT
                                 ", ret=%d\n",
                                 log_header->walfh_seqno, ret);
            }

            if (ret != FB_SUCCESS)
                  report_walw_bug_or_error(status_vector, WAL_handle, ret,
                                                       (ISC_STATUS) gds_wal_err_jrn_comm);
      }
}


static SSHORT discard_prev_logs(
                                                ISC_STATUS * status_vector,
                                                SCHAR * dbname,
                                                SCHAR * starting_logname,
SLONG starting_log_partition_offset, SSHORT delete_flag)
{
/**************************************
 *
 *    d i s c a r d _ p r e v _ l o g s
 *
 **************************************
 *
 * Functional description
 *    From the starting_logname backwards, excluding the starting
 *    one, mark all the log files as NOT needed for short-term
 *    recovery.  Delete those files if appropriate AND/OR if
 *    delete_flag is TRUE.
 *
 *    If there is any error, return FB_FAILURE else return FB_SUCCESS.
 *    In case of error, status_vector would be updated.
 *
 **************************************/
      SCHAR s_logname[MAXPATHLEN];
      SLONG s_log_partition_offset;
      SCHAR lognames_buffer[MAXLOGS * MAXPATHLEN];    /* To hold all the lognames */
      SCHAR *logs_names[MAXLOGS];
      SLONG log_partitions_offsets[MAXLOGS];
      SLONG logs_seqnos[MAXLOGS];
      SLONG logs_lengths[MAXLOGS];
      SLONG logs_flags[MAXLOGS];
      int log_count, count;
      SCHAR *log_name;
      SLONG log_partition_offset;
      SLONG log_flag;
      SSHORT ret;
      SLONG old_seqno, new_seqno;

      strcpy(s_logname, starting_logname);
      s_log_partition_offset = starting_log_partition_offset;

      old_seqno = 0;
      while (TRUE) {
            /* Exhaust checking all the previously linked log files */

            log_count = 0;
            ret = WALF_get_all_next_logs_info(status_vector, dbname,
                                                              s_logname, s_log_partition_offset,
                                                              MAXLOGS,
                                                              lognames_buffer, &log_count,
                                                              logs_names, log_partitions_offsets,
                                                              logs_seqnos, logs_lengths,
                                                              logs_flags, -1);
            if (ret != FB_SUCCESS)
                  return ret;

            if (log_count < MAXLOGS)
                  count = 0;              /* to start deleting log files */
            else {
                  count = 1;              /* to start deleting log files */

                  /* setup for the next iteration */

                  strcpy(s_logname, logs_names[0]);
                  s_log_partition_offset = log_partitions_offsets[0];
            }

            /* Now see if any old log files may be deleted. */
            for (; count < log_count; count++) {
                  new_seqno = logs_seqnos[count];
                  if (new_seqno <= old_seqno)
                        return FB_SUCCESS;      /* We are done with the log list */
                  old_seqno = new_seqno;
                  log_name = logs_names[count];
                  log_partition_offset = log_partitions_offsets[count];
                  log_flag = logs_flags[count];

                  /* We can delete a log file if it is NOT needed for SLONG-term
                     recovery AND it is NOT a pre-allocated file AND (EITHER it
                     is an overflow file OR the delete flag is true). */

                  if (!(log_flag & WALFH_KEEP_FOR_LONG_TERM_RECV) &&
                        !(log_flag & WALFH_PREALLOCATED) &&
                        ((log_flag & WALFH_OVERFLOW) || delete_flag)) {
                        /* We may delete this file. */
                        WALF_delink_log(status_vector, dbname, log_name,
                                                log_partition_offset);
                        if (unlink(log_name) != FB_SUCCESS) {
                              WAL_IO_ERROR(status_vector, "logfile unlink", log_name,
                                                 isc_io_delete_err, ERRNO);
                              WALC_save_status_strings(status_vector);
                              return FB_FAILURE;
                        }
                  }
                  else {
                        /* Just update the header flag stating that we don't need this
                           file for short-term recovery. */
                        WALF_set_log_header_flag(status_vector, dbname, log_name,
                                                             log_partition_offset,
                                                             WALFH_KEEP_FOR_SHORT_TERM_RECV, 0);
                  }
            }

            if (log_count < MAXLOGS)
                  break;                        /* No more previous log files to check */
      }

      return FB_SUCCESS;
}


static void finishup_checkpoint( WALS WAL_segment)
{
/**************************************
 *
 *    f i n i s h u p _ c h e c k p o i n t
 *
 **************************************
 *
 * Functional description
 *    Store latest checkpoint information.
 *
 **************************************/

      WAL_segment->wals_prev_ckpted_log_seqno =
            WAL_segment->wals_ckpted_log_seqno;
      strcpy(WAL_segment->wals_prev_ckpt_logname,
               WAL_segment->wals_ckpt_logname);
      WAL_segment->wals_prev_ckpt_log_p_offset =
            WAL_segment->wals_ckpt_log_p_offset;
      WAL_segment->wals_prev_ckpted_offset = WAL_segment->wals_ckpted_offset;

      WAL_segment->wals_ckpted_log_seqno = WAL_segment->wals_log_seqno;
      strcpy(WAL_segment->wals_ckpt_logname, WAL_segment->wals_logname);
      WAL_segment->wals_ckpt_log_p_offset =
            WAL_segment->wals_log_partition_offset;

/* The checkpoint record is the last "empty" record in the current block.
   Its offset has already been saved for us, just use it. */

      WAL_segment->wals_ckpted_offset = WAL_segment->wals_saved_ckpted_offset;
      WAL_segment->wals_saved_ckpted_offset = 0L;

      WAL_segment->wals_cur_ckpt_intrvl = 0L;
}


static SSHORT flush_all_buffers( ISC_STATUS * status_vector, WAL WAL_handle)
{
/**************************************
 *
 *    f l u s h _ a l l _ b u f f e r s
 *
 **************************************
 *
 * Functional description
 *    Flush all the non-empty buffer blocks to the current
 *    log file.  We need not worry about rollover because we
 *    have already given its log file sequence number for
 *    the records in those buffers.
 *
 **************************************/
      WALS WAL_segment;
      SSHORT bufnum, lastbuf, maxbufs;
      WALBLK *wblk;
      SSHORT ret;

      WAL_segment = WAL_handle->wal_segment;

      maxbufs = WAL_segment->wals_maxbufs;
      bufnum = (WAL_segment->wals_last_flushed_buf + 1) % maxbufs;
      lastbuf = CUR_BUF;
      if (lastbuf == -1)                  /* Meaning all the buffers are full */
            lastbuf = (bufnum + maxbufs - 1) % maxbufs;

      do {
            wblk = WAL_BLOCK(bufnum);
            if (wblk->walblk_cur_offset > BLK_HDROVHD) {
                  /* Prepare and flush the associated buffer to log file. */

                  if (!wblk->walblk_flags & WALBLK_to_be_written)
                        WALC_setup_buffer_block(WAL_segment, wblk, 0);

                  prepare_wal_block(WAL_segment, wblk);
                  if ((ret = write_wal_block(status_vector, wblk,
                                                         WAL_segment->wals_logname,
                                                         LOG_FD)) != FB_SUCCESS) {
                        report_walw_bug_or_error(status_vector, WAL_handle, ret,
                                                             gds_wal_err_logwrite);
                        return FB_FAILURE;
                  }
                  release_wal_block(WAL_segment, wblk);
            }
            bufnum = (bufnum + 1) % maxbufs;
      } while (bufnum != lastbuf);

/* Since all the log buffers are flushed and empty now, we can start
   fresh. */

      WAL_segment->wals_last_flushed_buf = -1;
      CUR_BUF = 0;

      return FB_SUCCESS;
}


static SSHORT get_logfile_index( WALS WAL_segment, SCHAR * logname)
{
/**************************************
 *
 *    g e t _ l o g f i l e _ i n d e x
 *
 **************************************
 *
 * Functional description
 *    Get the index of the given logname in the list of pre-allocated
 *    log files.  If not found, return -1.
 *
 **************************************/
      LOGF *logfil;
      SSHORT i, j, count;

      if (WAL_segment->wals_max_logfiles == 0)
            return -1;

/* Start searching from the next log file onwards */

      i = j =
            (WAL_segment->wals_cur_logfile + 1) % WAL_segment->wals_max_logfiles;
      count = 0;

      while (TRUE) {
            if ((j == i) && (count > 1))
                  return -1;              /* We have gone through the whole list */
            logfil = LOGF_INFO(j);
            if (!strcmp(logname, LOGF_NAME(logfil)))
                  return j;
            j = (j + 1) % WAL_segment->wals_max_logfiles;
            count++;
      }
}


static BOOLEAN get_log_usability(
                                                 ISC_STATUS * status_vector,
                                                 SCHAR * dbname,
                                                 SCHAR * logname, SLONG log_partition_offset)
{
/**************************************
 *
 *    g e t _ l o g _ u s a b i l i t y
 *
 **************************************
 *
 * Functional description
 *    Returns TRUE if the given logname is usable else returns FALSE.
 *    If logname is usable, new_logname and new_offset are updated.
 *
 **************************************/
      BOOLEAN found;
      SLONG log_seqno;
      SLONG log_length;
      SLONG log_flag;

      found = FALSE;
      log_flag = 0;
      if (WALF_get_log_info
            (status_vector, dbname, logname, log_partition_offset, &log_seqno,
             &log_length, &log_flag) == FB_SUCCESS) {
            /* Check the log flag */

            if (!(log_flag & WALFH_KEEP_FOR_RECV))
                  found = TRUE;
      }
      else                                /* The log file has not apparently been used for logging purpose
                                                   before or it is brand new.  So we can use it. */
            found = TRUE;

      if (found) {
            /* We have found a pre-allocated file which can be reused. */

            WALF_delink_log(status_vector, dbname, logname, log_partition_offset);
      }

      return found;
}


static SSHORT get_next_logname(
                                             ISC_STATUS * status_vector,
                                             WALS WAL_segment,
                                             SCHAR * new_logname,
SLONG * new_offset, SLONG * log_type)
{
/**************************************
 *
 *    g e t _ n e x t _ l o g n a m e
 *
 **************************************
 *
 * Functional description
 *    Returns FB_SUCCESS if a usable log filename is found else
 *    returns FB_FAILURE.
 *
 **************************************/

      if (WAL_segment->wals_max_logfiles > 0)
            /* Check the pre-allocated files or overflow files */
            return get_next_prealloc_logname(status_vector, WAL_segment,
                                                             new_logname, new_offset, log_type);
      else
            /* Get a new serial log file */
            return get_next_serial_logname(status_vector, WAL_segment,
                                                         new_logname, new_offset, log_type);
}


static SSHORT get_next_prealloc_logname(
                                                            ISC_STATUS * status_vector,
                                                            WALS WAL_segment,
                                                            SCHAR * new_logname,
SLONG * new_offset, SLONG * log_type)
{
/**************************************
 *
 *    g e t _ n e x t _ p r e a l l o c _ l o g n a m e
 *
 **************************************
 *
 * Functional description
 *    Get the name of the next usable log file from the list of
 *    the preallocated log files.  If preallocated files are not
 *    available, try an overflow log file.
 *
 *    Returns FB_SUCCESS if a usable log filename is found else
 *    returns FB_FAILURE.
 *
 **************************************/
      LOGF *logfil;
      SSHORT i, j, count, found;
      SLONG p_offset;

      i = j =
            (WAL_segment->wals_cur_logfile + 1) % WAL_segment->wals_max_logfiles;
      count = 0;
      found = FALSE;
      p_offset = 0L;

      while (TRUE) {
            if ((j == i) && (count > 1))  /* We have exhausted all the preallocated files */
                  break;
            logfil = LOGF_INFO(j);
            if (logfil->logf_flags & LOGF_PARTITIONED)
                  found = get_next_usable_partition(status_vector,
                                                                    WAL_segment->wals_dbname,
                                                                    LOGF_NAME(logfil), &p_offset);
            else
                  found = get_log_usability(status_vector,
                                                        WAL_segment->wals_dbname,
                                                        LOGF_NAME(logfil), p_offset);
            if (found) {
                  strcpy(new_logname, LOGF_NAME(logfil));
                  *new_offset = p_offset;
                  WAL_segment->wals_cur_logfile = j;
                  *log_type = *log_type | WALFH_PREALLOCATED;
                  if (logfil->logf_flags & LOGF_PARTITIONED)
                        *log_type = *log_type | WALFH_PARTITIONED;
                  if (logfil->logf_flags & LOGF_RAW)
                        *log_type = *log_type | WALFH_RAW;
                  return FB_SUCCESS;
            }

            /* Check for the next one */

            j = (j + 1) % WAL_segment->wals_max_logfiles;
            count++;
      }

/* No pre-allocated file is avaialable, so look for an overflow file */

      return get_overflow_logname(status_vector, WAL_segment, new_logname,
                                                new_offset, log_type);
}


static SSHORT get_next_serial_logname(
                                                        ISC_STATUS * status_vector,
                                                        WALS WAL_segment,
                                                        SCHAR * new_logname,
SLONG * new_offset, SLONG * log_type)
{
/**************************************
 *
 *    g e t _ n e x t _ s e r i a l _ l o g n a m e
 *
 **************************************
 *
 * Functional description
 *    Returns FB_SUCCESS if a usable log filename is found else
 *    returns FB_FAILURE.
 *
 **************************************/
      int prev_logs_count;
      SCHAR last_logname[MAXPATHLEN];
      SLONG last_log_partition_offset;
      SLONG last_log_flags;
      SSHORT any_log_to_be_archived;
      LOGF *logfil;
      SLONG fd;
      int retry_count;
#define MAX_RETRIES     1000

      logfil = &WAL_segment->wals_log_serial_file_info;
      if (logfil->logf_name_offset == 0)
            return FB_FAILURE;

/* The next log name is formed in the following fashion :
   <basename>.log.<seqno>  */

      if (logfil->logf_fname_seqno == 0L) /* Initialize for the first time */
            logfil->logf_fname_seqno = WAL_segment->wals_log_seqno + 1;

      for (retry_count = 0; retry_count < MAX_RETRIES; retry_count++) {
            /* Now find a non-existent (i.e. new) serial log file. */

            WALC_build_logname(new_logname, LOGF_NAME(logfil),
                                       logfil->logf_fname_seqno);
            logfil->logf_fname_seqno++;
            if (LLIO_open(status_vector, new_logname, LLIO_OPEN_NEW_RW, TRUE, &fd)
                  == FB_SUCCESS) {
                  /* Found one */

                  LLIO_close(status_vector, fd);
                  *new_offset = 0;
                  break;
            }
      }

      if (retry_count >= MAX_RETRIES)
            return FB_FAILURE;

/* Now see if we can reuse an old log file. */

      prev_logs_count = 0;
      WALF_get_linked_logs_info(status_vector, WAL_segment->wals_dbname,
                                            WAL_segment->wals_logname,
                                            WAL_segment->wals_log_partition_offset,
                                            &prev_logs_count, last_logname,
                                            &last_log_partition_offset, &last_log_flags,
                                            &any_log_to_be_archived);

      if ((prev_logs_count > 0) && !(last_log_flags & WALFH_KEEP_FOR_RECV)) {
            /* We can reuse this log file */

            WALF_delink_log(status_vector, WAL_segment->wals_dbname, last_logname,
                                    last_log_partition_offset);
#ifdef WIN_NT
            unlink(new_logname);
            MoveFile(last_logname, new_logname);
#else
            rename(last_logname, new_logname);
#endif
      }

#ifdef UNIX
/* WAL writer would typically be running as root.  So give the
   ownership of the new log file to the database owner. */
      if (chown(new_logname, WAL_segment->wals_owner_id,
                    WAL_segment->wals_group_id) == -1) {
            WAL_IO_ERROR(status_vector, "logfile chown", new_logname,
                               isc_io_open_err, errno);
            return FB_FAILURE;
      }
#endif

      *log_type |= WALFH_SERIAL;

      return FB_SUCCESS;
}


static BOOLEAN get_next_usable_partition(
                                                             ISC_STATUS * status_vector,
                                                             SCHAR * dbname,
                                                             SCHAR * master_logname,
SLONG * new_offset)
{
/**************************************
 *
 *    g e t _ n e x t _ u s a b l e _ p a r t i t i o n
 *
 **************************************
 *
 * Functional description
 *    Tries to find a usable partition in the master_logname.
 *    Returns TRUE if a usable partition is found else returns FALSE.
 *    new_logname and new_offset are updated in case of success.
 *
 **************************************/
      P_LOGFH p_log_header;
      SLONG p_log_fd;
      int i, j, count;
      BOOLEAN found;
      SLONG p_offset;

      p_log_header = (P_LOGFH) gds__alloc(P_LOGFH_LENGTH);
/* NOMEM: return failure, FREE: by returns in this procedure */
      if (!p_log_header)
            return FALSE;

      if (WALF_open_partitioned_log_file(status_vector, dbname, master_logname,
                                                         p_log_header, &p_log_fd) != FB_SUCCESS) {
            gds__free((SLONG *) p_log_header);
            return FALSE;
      }

/* Now check for a free partition */

      i = j = (p_log_header->p_logfh_curp + 1) % p_log_header->p_logfh_maxp;
      count = 0;
      found = FALSE;
      while (TRUE) {
            if ((j == i) && (count > 1))  /* We have exhausted all the partitions */
                  break;
            p_offset = PARTITION_OFFSET(p_log_header, j);
            if (found = get_log_usability(status_vector, dbname, master_logname,
                                                        p_offset)) {
                  p_log_header->p_logfh_curp = j;
                  *new_offset = p_offset;
                  WALF_update_partitioned_log_hdr(status_vector, master_logname,
                                                                  p_log_header, p_log_fd);
                  break;
            }

            /* Check for the next partition */

            j = (j + 1) % p_log_header->p_logfh_maxp;
            count++;
      }

      LLIO_close(0, p_log_fd);
      gds__free((SLONG *) p_log_header);

      return found;
}


static SSHORT get_overflow_logname(
                                                   ISC_STATUS * status_vector,
                                                   WALS WAL_segment,
                                                   SCHAR * new_logname,
SLONG * new_offset, SLONG * log_type)
{
/**************************************
 *
 *    g e t _ o v e r f l o w _ l o g n a m e
 *
 **************************************
 *
 * Functional description
 *    Get the name of the next overflow log file, if base name specified.
 *    Returns FB_SUCCESS if an overflow log file name can be created else
 *    returns FB_FAILURE.
 *
 **************************************/
      LOGF *logfil;
      SLONG fd;

      logfil = &WAL_segment->wals_log_ovflow_file_info;
      if (logfil->logf_name_offset != 0) {
            if (logfil->logf_fname_seqno <= WAL_segment->wals_log_seqno)
                  logfil->logf_fname_seqno = WAL_segment->wals_log_seqno + 1;

            for (;;) {
                  /* Now find a non-existent (i.e. new) overflow log file. */

                  WALC_build_logname(new_logname, LOGF_NAME(logfil),
                                             logfil->logf_fname_seqno);
                  logfil->logf_fname_seqno++;
                  if (LLIO_open
                        (status_vector, new_logname, LLIO_OPEN_NEW_RW, TRUE,
                         &fd) == FB_SUCCESS) {
                        LLIO_close(status_vector, fd);
                        *new_offset = 0;
                        break;
                  }
            }

            *log_type = *log_type | WALFH_OVERFLOW;

#ifdef UNIX
            /* WAL writer would typically be running as root.  So give the
               ownership of the new log file to the database owner. */
            if (chown(new_logname, WAL_segment->wals_owner_id,
                          WAL_segment->wals_group_id) == -1) {
                  WAL_IO_ERROR(status_vector, "logfile chown", new_logname,
                                     isc_io_open_err, errno);
                  return FB_FAILURE;
            }
#endif
            return FB_SUCCESS;
      }

      return FB_FAILURE;
}


static void get_time_stamp( SLONG * date)
{
/**************************************
 *
 *    g e t _ t i m e _ s t a m p
 *
 **************************************
 *
 * Functional description
 *    Get the current time in gds format.
 *    (Copied from mov.c)
 *
 **************************************/
      time_t clock;
      struct tm times;

      clock = time(NULL);
      times = *localtime(&clock);
      isc_encode_date(&times, (GDS_QUAD *) date);
}


static SSHORT increase_buffers(
                                             ISC_STATUS * status_vector,
                                             WAL WAL_handle, SSHORT num_buffers)
{
/**************************************
 *
 *    i n c r e a s e _ b u f f e r s
 *
 **************************************
 *
 * Functional description
 *    This function would try to increase the number of buffers in
 *    the shared WAL segment by the given number of buffers.  May
 *    involve remapping (or reallocating) the WAL segment.
 *    Used for performance tuning.   Assumes that acquire() has been
 *    done before calling this routine.
 *
 **************************************/
      WALS WAL_segment;
      WALBLK *wblk;
      SLONG old_wal_length, new_wal_length;
      SSHORT i, old_num_buffers;
      SSHORT ret;

/* First flush all the filled buffer blocks to the current log file */

      if ((ret = flush_all_buffers(status_vector, WAL_handle)) != FB_SUCCESS)
            return ret;

      WAL_segment = WAL_handle->wal_segment;

      old_wal_length = WAL_segment->wals_length;
      new_wal_length = old_wal_length + num_buffers * WAL_segment->wals_blksize;
      if (PRINT_DEBUG_MSGS) {
            PRINT_TIME(DEBUG_FD, LOCAL_TIME);
            ib_fprintf(DEBUG_FD, "In increase_buffers(), before expansion:\n");
            ib_fprintf(DEBUG_FD,
                           "WAL length =%ld, WAL segment flags=%ld, flags2=%ld\n",
                           WAL_segment->wals_length, WAL_segment->wals_flags,
                           WAL_segment->wals_flags2);
            ib_fprintf(DEBUG_FD, "maxbufs=%d, cur_buf=%d, last_flushed_buf=%d\n",
                           WAL_segment->wals_maxbufs, CUR_BUF,
                           WAL_segment->wals_last_flushed_buf);
      }
      WAL_segment = (WALS) ISC_remap_file(status_vector,
                                                            &WAL_handle->wal_shmem_data,
                                                            new_wal_length, TRUE);

      if (status_vector[1] == gds_unavailable) {
            WAL_segment = WAL_handle->wal_segment;
            WAL_segment->wals_flags2 |= WALS2_CANT_EXPAND;
            return FB_SUCCESS;
      }
      if (WAL_segment == (WALS) NULL) {
            WAL_ERROR(status_vector, gds_wal_cant_expand, DBNAME);
            WAL_handle->wal_segment = NULL;
            report_walw_bug_or_error(status_vector, WAL_handle, FB_FAILURE,
                                                 (ISC_STATUS) gds_wal_err_expansion);
      }

      old_num_buffers = WAL_segment->wals_maxbufs;
      WAL_segment->wals_maxbufs += num_buffers;

/* initialize the newly allocated buffer blocks. */

      for (i = old_num_buffers; i < WAL_segment->wals_maxbufs; i++) {
            wblk = WAL_BLOCK(i);
            wblk->walblk_number = i;
            wblk->walblk_flags = 0;
            wblk->walblk_cur_offset = 0;
            wblk->walblk_roundup_offset = 0;
      }

      WAL_segment->wals_length = new_wal_length;
      WAL_handle->wal_segment = WAL_segment;

      if (PRINT_DEBUG_MSGS) {
            PRINT_TIME(DEBUG_FD, LOCAL_TIME);
            ib_fprintf(DEBUG_FD, "In increase_buffers(), after expansion:\n");
            ib_fprintf(DEBUG_FD,
                           "WAL length =%ld, WAL segment flags=%ld, flags2=%ld\n",
                           WAL_segment->wals_length, WAL_segment->wals_flags,
                           WAL_segment->wals_flags2);
            ib_fprintf(DEBUG_FD, "maxbufs=%d, cur_buf=%d, last_flushed_buf=%d\n",
                           WAL_segment->wals_maxbufs, CUR_BUF,
                           WAL_segment->wals_last_flushed_buf);
            ib_fflush(DEBUG_FD);
      }

      return FB_SUCCESS;
}


static SSHORT init_raw_partitions( ISC_STATUS * status_vector, WAL WAL_handle)
{
/**************************************
 *
 *    i n i t _ r a w _ p a r t i t i o n s
 *
 **************************************
 *
 * Functional description
 *    Initialize all the configured raw partitioned log files.
 *
 **************************************/
      WALS WAL_segment;
      LOGF *logfil;
      SSHORT i, ret_val;

      WAL_segment = WAL_handle->wal_segment;

      for (i = 0; i < WAL_segment->wals_max_logfiles; i++) {
            logfil = LOGF_INFO(i);
            if (logfil->logf_flags & LOGF_RAW) {
                  ret_val = WALF_init_p_log(status_vector, DBNAME, LOGF_NAME(logfil),
                                                        PARTITIONED_LOG_TOTAL_SIZE(logfil->
                                                                                                 logf_partitions,
                                                                                                 logfil->
                                                                                                 logf_max_size),
                                                        logfil->logf_partitions);
                  if (ret_val != FB_SUCCESS)
                        report_walw_bug_or_error(status_vector, WAL_handle, ret_val,
                                                             (ISC_STATUS) gds_wal_err_logwrite);
            }
      }

      return FB_SUCCESS;
}


static SSHORT journal_connect( ISC_STATUS * status_vector, WAL WAL_handle)
{
/**************************************
 *
 *    j o u r n a l _ c o n n e c t
 *
 **************************************
 *
 * Functional description
 *    Connects to the journal server.
 *    Returns FB_SUCCESS or FB_FAILURE.
 *
 **************************************/
      SSHORT ret;
      WALS WAL_segment;

      WAL_segment = WAL_handle->wal_segment;

      if (PRINT_DEBUG_MSGS) {
            PRINT_TIME(DEBUG_FD, LOCAL_TIME);
            ib_fprintf(DEBUG_FD, "Calling JRN_init for database %s\n", DBNAME);
      }

      JOURNAL_HANDLE = NULL;
      ret = JRN_init(status_vector,
                           &(JOURNAL_HANDLE),
                           WAL_segment->wals_dbpage_size,
                           reinterpret_cast <
                           UCHAR * >(WAL_segment->wals_jrn_dirname),
                           strlen(WAL_segment->wals_jrn_dirname),
                           WAL_segment->wals_jrn_init_data,
                           WAL_segment->wals_jrn_init_data_len);

      if (PRINT_DEBUG_MSGS) {
            PRINT_TIME(DEBUG_FD, LOCAL_TIME);
            ib_fprintf(DEBUG_FD, "After calling JRN_init(), ret=%d\n", ret);
      }

      if (ret == FB_SUCCESS)
            WAL_segment->wals_flags |= WALS_JOURNAL_ENABLED;
      else
            report_walw_bug_or_error(status_vector, WAL_handle, ret,
                                                 (ISC_STATUS) gds_wal_err_jrn_comm);

      return ret;
}


static void journal_disable(
                                          ISC_STATUS * status_vector,
                                          WAL WAL_handle, WALFH log_header)
{
/**************************************
 *
 *    j o u r n a l _ d i s a b l e
 *
 **************************************
 *
 * Functional description
 *    Disconnects from the journal server after rolling over to a new
 *    log file so that the current log file does not get deleted by
 *    the journal server and the user can at least recover upto journal
 *    disable point.
 *
 **************************************/
      WALS WAL_segment;

      LOCAL_FLAGS |= WALW_DISABLING_JRN;
      rollover_log(status_vector, WAL_handle, log_header);
      LOCAL_FLAGS &= ~(WALW_DISABLING_JRN);
      JRN_fini(status_vector, &(JOURNAL_HANDLE));

      if (!JOURNAL_HANDLE) {
            /* Disable succeeded */

            WAL_segment = WAL_handle->wal_segment;
            WAL_segment->wals_flags &= ~WALS_JOURNAL_ENABLED;
            WAL_segment->wals_flags &= ~WALS_DISABLE_JOURNAL;
      }
}


static SSHORT journal_enable( ISC_STATUS * status_vector, WAL WAL_handle)
{
/**************************************
 *
 *    j o u r n a l _ e n a b l e
 *
 **************************************
 *
 * Functional description
 *    Connects to the journal server and sends it the current WAL file
 *    info.  Returns FB_SUCCESS or FB_FAILURE.
 *
 **************************************/
      SSHORT ret;
      WALS WAL_segment;

      WAL_segment = WAL_handle->wal_segment;
      ret = journal_connect(status_vector, WAL_handle);
      if (ret == FB_SUCCESS) {
            if (PRINT_DEBUG_MSGS) {
                  PRINT_TIME(DEBUG_FD, LOCAL_TIME);
                  ib_fprintf(DEBUG_FD, "Enabling journaling for database %s\n",
                                 DBNAME);
                  ib_fprintf(DEBUG_FD,
                                 "Sending open for seqno=%"SLONGFORMAT", log %s, p_offset=%"
                                 SLONGFORMAT", length=%"SLONGFORMAT"\n",
                                 WAL_segment->wals_flushed_log_seqno,
                                 WAL_segment->wals_logname,
                                 WAL_segment->wals_log_partition_offset,
                                 WAL_segment->wals_flushed_offset);
            }

            ret = JRN_put_wal_name(status_vector, JOURNAL_HANDLE,
                                             WAL_segment->wals_logname,
                                             strlen(WAL_segment->wals_logname),
                                             WAL_segment->wals_flushed_log_seqno,
                                             WAL_segment->wals_flushed_offset,
                                             WAL_segment->wals_log_partition_offset,
                                             JRNW_OPEN);

            if (PRINT_DEBUG_MSGS) {
                  PRINT_TIME(DEBUG_FD, LOCAL_TIME);
                  ib_fprintf(DEBUG_FD,
                                 "After calling JRN_put_wal_name() for seqno=%"SLONGFORMAT
                                 ", ret=%d\n",
                                 WAL_segment->wals_flushed_log_seqno, ret);
            }

      }
      if (ret != FB_SUCCESS) {
            WAL_segment->wals_flags &= ~WALS_JOURNAL_ENABLED;
            report_walw_bug_or_error(status_vector, WAL_handle, ret,
                                                 (ISC_STATUS) gds_wal_err_jrn_comm);
      }
      else {
            WAL_segment->wals_flags &= ~WALS_ENABLE_JOURNAL;
      }

      return ret;
}


static void prepare_wal_block( WALS WAL_segment, WALBLK * wblk)
{
/**************************************
 *
 *    p r e p a r e _ w a l _ b l o c k
 *
 **************************************
 *
 * Functional description
 *    Initialize block sequence number, timestamp and length bytes
 *    of the WAL buffer in the given WAL block.
 *
 **************************************/
      WALBLK_HDR header;
      USHORT len;                         /* The header and trailer length for the buffer block */
      SCHAR *p, *q;

      len = (USHORT) wblk->walblk_roundup_offset;     /* Already includes the block trailing bytes */

/* The following way of initializing and then copying the header to
   walblk_buf is used to avoid datatype misalignment problem on some machines. */

      header.walblk_hdr_blklen = len;
      header.walblk_hdr_len = wblk->walblk_cur_offset;
      header.walblk_hdr_bsn = WAL_segment->wals_blkseqno++;
      get_time_stamp(header.walblk_hdr_timestamp);
      memcpy(wblk->walblk_buf, (SCHAR *) & header, BLK_HDROVHD);

      p = (SCHAR *) & wblk->walblk_buf[(int) len - BLK_TAILOVHD];
      q = (SCHAR *) & len;
      *p++ = *q++;
      *p++ = *q++;
}


static void release_wal_block( WALS WAL_segment, WALBLK * wblk)
{
/**************************************
 *
 *    r e l e a s e _ w a l _ b l o c k
 *
 **************************************
 *
 * Functional description
 *    A block has just been written to disk.  Update statistics and
 *    make this block available to other users.
 *
 **************************************/
      USHORT len;                         /* Number of bytes written to log file */

      len = (USHORT) wblk->walblk_roundup_offset;
      wblk->walblk_cur_offset = 0;
      wblk->walblk_roundup_offset = 0;
      wblk->walblk_flags &= ~WALBLK_to_be_written;

      WAL_segment->wals_last_flushed_buf = wblk->walblk_number;
      WAL_segment->wals_flushed_offset += len;
      WAL_segment->wals_cur_ckpt_intrvl += len;
      WAL_segment->wals_total_IO_bytes += len;
      WAL_segment->wals_IO_count++;

      if (wblk->walblk_flags & WALBLK_checkpoint) {
            /* This buffer finished a checkpoint */

            finishup_checkpoint(WAL_segment);
            wblk->walblk_flags &= ~WALBLK_checkpoint;
      }

      if (CUR_BUF == -1)
            CUR_BUF = wblk->walblk_number;
}


static void report_walw_bug_or_error(
                                                       ISC_STATUS * status_vector,
                                                       WAL WAL_handle,
                                                       SSHORT failure_type, ISC_STATUS code)
{
/**************************************
 *
 *     r e p o r t _ w a l w _ b u g _ o r _ e r r o r
 *
 **************************************
 *
 * Functional description
 *    Handle bug or error for the WAL writer.
 *
 **************************************/
      ISC_STATUS_ARRAY local_status;
      TEXT errbuf[MAX_ERRMSG_LEN];
      WALS WAL_segment;

      WAL_segment = WAL_handle->wal_segment;

/* First give a generic WAL writer error message */

      WALW_ERROR(local_status);
      gds__log_status(DBNAME, local_status);
      gds__print_status(local_status);

/* Now give the error message for the particular error or bug */

      if (failure_type > 0) {
            /* An error happened. status_vector is already inititalized */

            if (WAL_segment) {
                  WAL_segment->wals_flags |= WALS_ERROR_HAPPENED;
                  WAL_segment->wals_last_err = code;
            }
            gds__log_status(DBNAME, status_vector);
      }
      else if (failure_type < 0) {
            /* A bug happened. status_vector is already inititalized */

            if (WAL_segment) {
                  WAL_segment->wals_flags |= WALS_BUG_HAPPENED;
                  WAL_segment->wals_last_bug = code;
            }
            IBERR_bugcheck(status_vector, DBNAME, NULL, -1 * failure_type,
                                 errbuf);
            WALC_save_status_strings(status_vector);
      }
      else {
            /* An error happened. status_vector needs to be inititalized */

            if (WAL_segment) {
                  WAL_segment->wals_flags |= WALS_ERROR_HAPPENED;
                  WAL_segment->wals_last_err = code;
            }
            IBERR_error(status_vector, DBNAME, NULL, code, errbuf);
            WALC_save_status_strings(status_vector);
            gds__log_status(DBNAME, status_vector);
      }
      gds__print_status(status_vector);

      if (PRINT_DEBUG_MSGS) {
            PRINT_TIME(DEBUG_FD, LOCAL_TIME);
            ib_fprintf(DEBUG_FD, "WAL writer encountered error, code=%ld\n", code);
      }

      Firebird::status_exception::raise(status_vector[1]);
}


static SSHORT rollover_log(
                                       ISC_STATUS * status_vector,
                                       WAL WAL_handle, WALFH log_header)
{
/**************************************
 *
 *    r o l l o v e r _ l o g
 *
 **************************************
 *
 * Functional description
 *    The current log file has reached its limit length.  So move on
 *    to the next one.  Update WAL_segment and log_header.  WAL_segment
 *    is assumed to have already been acquired.
 *
 **************************************/
      WALFH new_log_header;
      SCHAR saved_logname[MAXPATHLEN];
      SCHAR new_logname[MAXPATHLEN];
      SLONG new_log_fd;
      SSHORT ret;
      SLONG saved_flushed_offset;
      SLONG new_log_partition_offset;
      WALS WAL_segment;
      SLONG log_type;
#define ROLLOVER_LOG_RETURN(code) {gds__free((SLONG*)new_log_header); return(code);}

/* First flush all the filled buffer blocks to the current log file
   because we have already given its log file sequence number for
   the records in those buffers. */

      if ((ret = flush_all_buffers(status_vector, WAL_handle)) != FB_SUCCESS)
            return ret;

      WAL_segment = WAL_handle->wal_segment;
      new_log_header = (WALFH) gds__alloc(WALFH_LENGTH);
/* NOMEM: return failure, FREE: by macro ROLLOVER_LOG_RETURN() */
      if (!new_log_header)
            return FB_FAILURE;

      saved_flushed_offset = WAL_segment->wals_flushed_offset;
      strcpy(saved_logname, WAL_segment->wals_logname);

      log_type = 0L;
      if (get_next_logname(status_vector, WAL_segment, new_logname,
                                     &new_log_partition_offset, &log_type) != FB_SUCCESS) {
            WAL_ERROR_APPEND(status_vector, gds_wal_err_rollover, new_logname);
            report_walw_bug_or_error(status_vector, WAL_handle, FB_FAILURE,
                                                 gds_wal_err_rollover2);
            ROLLOVER_LOG_RETURN(FB_FAILURE);
      }

      ret = setup_log(status_vector, WAL_handle, new_logname,
                              new_log_partition_offset, log_type,
                              &new_log_fd, new_log_header, TRUE,
                              WAL_segment->wals_logname,
                              WAL_segment->wals_log_partition_offset);
      if (ret == FB_SUCCESS) {
            /* Update the current log file with the name of the next log file name
               and close it. */

            if (log_header->walfh_next_logname)
                  gds__free((SLONG *) log_header->walfh_next_logname);
            STRING_DUP(log_header->walfh_next_logname, new_logname);
            /* NOMEM: failure return  FREE: unknown */
            if (!log_header->walfh_next_logname)
                  ROLLOVER_LOG_RETURN(FB_FAILURE);

            log_header->walfh_next_log_partition_offset =
                  new_log_partition_offset;

            /* The previous log name of the current log file may happen to be the
               new_logname if there are 2 round-robin log files and we are re-using
               the new_logname.  The same could happen even if we have one partitioned
               log file with only 2 partitions.  In those cases, make the prev_logname
               of the current log file to be empty to avoid a potential infinite loop. */

            if ((strcmp(log_header->walfh_prev_logname, new_logname) == 0) &&
                  (log_header->walfh_prev_log_partition_offset ==
                   new_log_partition_offset)) {
                  *log_header->walfh_prev_logname = 0;
                  log_header->walfh_prev_log_partition_offset = 0L;
            }

            log_header->walfh_length = saved_flushed_offset;
            log_header->walfh_hibsn = WAL_segment->wals_blkseqno - 1;
            close_log(status_vector, WAL_handle, saved_logname, log_header, TRUE);

            /* This is a good place to inform the long term journal server
               that we have rolled over to a new log file.  Note that the close
               info of the previous log file has already been sent during
               close_log() call made above. */

            if (JOURNAL_HANDLE && !(LOCAL_FLAGS & WALW_DISABLING_JRN)) {
                  if (PRINT_DEBUG_MSGS) {
                        PRINT_TIME(DEBUG_FD, LOCAL_TIME);
                        ib_fprintf(DEBUG_FD,
                                       "Opened seqno=%"SLONGFORMAT", log %s, p_offset=%"
                                       SLONGFORMAT", length=%"SLONGFORMAT"\n",
                                       new_log_header->walfh_seqno, new_logname,
                                       new_log_header->walfh_log_partition_offset,
                                       new_log_header->walfh_length);
                  }

                  ret = JRN_put_wal_name(status_vector, JOURNAL_HANDLE,
                                                   new_logname, strlen(new_logname),
                                                   new_log_header->walfh_seqno,
                                                   new_log_header->walfh_length,
                                                   new_log_header->walfh_log_partition_offset,
                                                   JRNW_OPEN);

                  if (PRINT_DEBUG_MSGS) {
                        PRINT_TIME(DEBUG_FD, LOCAL_TIME);
                        ib_fprintf(DEBUG_FD,
                                       "After calling JRN_put_wal_name() for seqno=%"
                                       SLONGFORMAT", ret=%d\n",
                                       new_log_header->walfh_seqno, ret);
                  }

                  if (ret != FB_SUCCESS)
                        report_walw_bug_or_error(status_vector, WAL_handle, ret,
                                                             gds_wal_err_jrn_comm);
            }

            WAL_segment->wals_flags |= WALS_ROLLOVER_HAPPENED;

            /* Update the context variables for the caller. */

            LOG_FD = new_log_fd;
            memcpy((SCHAR *) log_header, (SCHAR *) new_log_header, WALFH_LENGTH);

            ROLLOVER_LOG_RETURN(FB_SUCCESS);
      }
      else {
            report_walw_bug_or_error(status_vector, WAL_handle, ret,
                                                 gds_wal_err_rollover2);
            ROLLOVER_LOG_RETURN(FB_FAILURE);
      }
}


static void setup_for_checkpoint( WALS WAL_segment)
{
/**************************************
 *
 *    s e t u p _ f o r _ c h e c k p o i n t
 *
 **************************************
 *
 * Functional description
 *    Inform the Asynchronous Buffer writer that it is time to
 *    start checkpointing.   Assumes that acquire() has
 *    been done before calling this routine.
 ***************************************/

/* AB writer checks this condition by calling WAL_start_checkpoiting()
   in its main loop. */

      WAL_segment->wals_flags |= WALS_CKPT_START;
      WAL_segment->wals_flags &= ~WALS_CKPT_RECORDED; /* Force this anyway */
}


static SSHORT setup_log(
                                    ISC_STATUS * status_vector,
                                    WAL WAL_handle,
                                    SCHAR * logname,
                                    SLONG log_partition_offset,
SLONG log_type,
SLONG * logfile_fd,
WALFH log_header,
SSHORT rollover, SCHAR * prev_logname, SLONG prev_log_partition_offset)
{
/**************************************
 *
 *    s e t u p _ l o g
 *
 **************************************
 *
 * Functional description
 *    Initialize the log file header information and
 *    the corresponding WAL_segment information.
 *    Returns FB_SUCCESS or an errno.
 *
 **************************************/
      SSHORT ret;
      LOGF *logfil;
      SSHORT takeover;
      WALS WAL_segment;

      WAL_segment = WAL_handle->wal_segment;

      ret = setup_log_header_info(status_vector, WAL_handle, logname,
                                                log_partition_offset, log_type, logfile_fd,
                                                log_header, rollover, prev_logname,
                                                prev_log_partition_offset, &takeover);
      if (ret == FB_SUCCESS) {
            /* Now updates the appropriate fields of WAL_segment from the given
               log_header.  */

            strcpy(WAL_segment->wals_logname, logname);
            WAL_segment->wals_log_partition_offset =
                  log_header->walfh_log_partition_offset;
            if (!takeover) {
                  /* If a new WAL writer is taking over, we don't want to change the
                     wals_buf_offset etc. which might reflect the uptodate WAL buffer
                     position. */

                  WAL_segment->wals_log_seqno = log_header->walfh_seqno;
                  WAL_segment->wals_buf_offset = log_header->walfh_length;
                  WAL_segment->wals_blkseqno = log_header->walfh_hibsn + 1;
                  WAL_segment->wals_flushed_offset = log_header->walfh_length;
                  WAL_segment->wals_flushed_log_seqno = log_header->walfh_seqno;
            }

            if (WAL_segment->wals_max_logfiles > 0) {
                  /* Get the index of this logfile in the list of pre-allocated
                     log files. */

                  WAL_segment->wals_cur_logfile =
                        get_logfile_index(WAL_segment, logname);
                  if (WAL_segment->wals_cur_logfile == -1)
                        /* we are using an overflow log */
                        logfil = &WAL_segment->wals_log_ovflow_file_info;
                  else
                        logfil = LOGF_INFO(WAL_segment->wals_cur_logfile);
            }
            else
                  logfil = &WAL_segment->wals_log_serial_file_info;

            WAL_segment->wals_rollover_threshold = (logfil->logf_max_size ?
                                                                        logfil->
                                                                        logf_max_size : WAL_segment->
                                                                        wals_max_log_length);
            WAL_segment->wals_roundup_size = logfil->logf_roundup_size;

            if ((PRINT_DEBUG_MSGS) && rollover) {
                  PRINT_TIME(DEBUG_FD, LOCAL_TIME);
                  ib_fprintf(DEBUG_FD, "new log %s, partition offset=%ld\n",
                                 WAL_segment->wals_logname,
                                 WAL_segment->wals_log_partition_offset);
            }

      }

      return ret;
}


static SSHORT setup_log_header_info(
                                                      ISC_STATUS * status_vector,
                                                      WAL WAL_handle,
                                                      SCHAR * logname,
SLONG log_partition_offset,
SLONG log_type,
SLONG * logfile_fd,
WALFH log_header,
SSHORT rollover,
SCHAR * prev_logname, SLONG prev_log_partition_offset, SSHORT * takeover)
{
/**************************************
 *
 *    s e t u p _ l o g _ h e a d e r _ i n f o
 *
 **************************************
 *
 * Functional description
 *    Open a given log file.
 *    Initialize the log file header information.
 *    If 'rollover' flag is TRUE, then we are going to open a new log
 *    file.  So use the 'prev_logname' as the previous log file
 *    name for the new log file.
 *    If we are opening an existing log file and it has a 'next' log
 *    file, then open the 'next' log file automatically using a recursive
 *    call.
 *
 *    If we determine that this is a takeover situation, set the takeover
 *    parameter to TRUE.
 *
 *    Returns FB_SUCCESS or FB_FAILURE.
 *
 **************************************/
      WALS WAL_segment;
      SLONG log_fd, read_len;
      SCHAR prev_logfname[MAXPATHLEN];
      SLONG prev_logfpartition_offset;

      WAL_segment = WAL_handle->wal_segment;

/* Open the Log file */

      if (LLIO_open
            (status_vector, logname, LLIO_OPEN_WITH_SYNC_RW, TRUE,
             &log_fd)) return FB_FAILURE;

      *takeover = FALSE;

      if (!rollover) {
            /* Try to read the logfile header and take appropriate steps
               afterwards. */

            if (LLIO_read(status_vector, log_fd, logname,
                                log_partition_offset, LLIO_SEEK_BEGIN,
                                (UCHAR *) log_header, WALFH_LENGTH, &read_len)) {
                  LLIO_close(0, log_fd);
                  return FB_FAILURE;
            }
      }

      if (rollover || (read_len == 0)) {
            /* The logfile is brand new */

            log_header->walfh_version = WALFH_VERSION;
            log_header->walfh_wals_id = WAL_segment->wals_id;
            log_header->walfh_seqno = WAL_segment->wals_log_seqno + 1;
            log_header->walfh_log_partition_offset = log_partition_offset;
            log_header->walfh_length = (SLONG) WALFH_LENGTH;
            log_header->walfh_lowbsn = WAL_segment->wals_blkseqno;      /* What the lowbsn would be */
            log_header->walfh_hibsn = WAL_segment->wals_blkseqno - 1;   /* So far */
            log_header->walfh_flags = log_type + WALFH_OPEN;
            log_header->walfh_offset = (SLONG) WALFH_LENGTH;
            log_header->walfh_ckpt1 = (SLONG) 0;
            log_header->walfh_ckpt2 = (SLONG) 0;
            log_header->walfh_reserved1 = (SLONG) 0;
            log_header->walfh_reserved2 = (SLONG) 0;
            log_header->walfh_data[0] = 0;
            STRING_DUP(log_header->walfh_dbname, WAL_segment->wals_dbname);
            /* NOMEM: failure, FREE: unknown */
            if (!log_header->walfh_dbname)
                  return FB_FAILURE;
            STRING_DUP(log_header->walfh_prev_logname, prev_logname);
            /* NOMEM: failure, FREE: unknown */
            if (!log_header->walfh_prev_logname)
                  return FB_FAILURE;
            log_header->walfh_prev_log_partition_offset =
                  prev_log_partition_offset;
            STRING_DUP(log_header->walfh_next_logname, "");
            /* NOMEM: failure, FREE: unknown */
            if (!log_header->walfh_next_logname)
                  return FB_FAILURE;

            log_header->walfh_data_len =
                  MISC_build_parameters_block(log_header->walfh_data,
                                                            PARAM_BYTE(WALFH_dbname),
                                                            PARAM_STRING(log_header->
                                                                               walfh_dbname),
                                                            PARAM_BYTE(WALFH_prev_logname),
                                                            PARAM_STRING(log_header->
                                                                               walfh_prev_logname),
                                                            PARAM_BYTE(WALFH_next_logname),
                                                            PARAM_STRING(log_header->
                                                                               walfh_next_logname),
                                                            PARAM_BYTE(WALFH_end), 0);
      }
      else if (read_len < (SLONG) sizeof(struct walfh)) {
            LLIO_close(0, log_fd);
            WAL_ERROR(status_vector, gds_logh_small, logname);
            return FB_FAILURE;
      }
      else if (log_header->walfh_version != WALFH_VERSION) {
            LLIO_close(0, log_fd);
            WAL_ERROR(status_vector, gds_logh_inv_version, logname);
            return FB_FAILURE;
      }
      else if (log_header->walfh_flags & WALFH_OPEN) {
            if ((log_header->walfh_wals_id == WAL_segment->wals_id) &&
                  (log_header->walfh_seqno == WAL_segment->wals_log_seqno)) {
                  /* The file had been in use, may be the previous WAL writer process
                     died.  So, take over.  Do some sanity checks though. */

                  *takeover = TRUE;
                  log_header->walfh_length = WAL_segment->wals_flushed_offset;
                  log_header->walfh_dbname = NULL;    /* will be updated below */
                  log_header->walfh_prev_logname = NULL;    /* will be updated below */
                  log_header->walfh_next_logname = NULL;    /* will be updated below */
                  WALF_upd_log_hdr_frm_walfh_data(log_header,
                                                                  log_header->walfh_data);

                  /* Well, in this case, walfh_next_logname should be empty. Make
                     sure that it is, else give an error message for using an earlier
                     log file. */

                  if (strlen(log_header->walfh_next_logname) != 0) {
                        LLIO_close(0, log_fd);
                        WAL_ERROR(status_vector, gds_logh_open_flag, logname);
                        return FB_FAILURE;
                  }
            }
            else {
                  /* This situation should not arise.   Before this, a recovery should
                     have taken place.  However, a graceful way of handling it
                     could be to scan the log and position at the end of the last
                     'valid looking' block. */

                  LLIO_close(0, log_fd);
                  WAL_ERROR(status_vector, gds_logh_open_flag2, logname);
                  return FB_FAILURE;
            }
      }
      else {
            /* We are opening a logfile after it was properly shutdown last time.
               Prepare the existing log file for new blocks. */

            log_header->walfh_dbname = NULL;
            log_header->walfh_prev_logname = NULL;
            log_header->walfh_next_logname = NULL;
            WALF_upd_log_hdr_frm_walfh_data(log_header, log_header->walfh_data);

            /* In this case, walfh_next_logname should be empty.  But there is
               a small window wherein the rollover to a new logfile occurred but
               that fact was not recorded at higher level (in database header page).
               In that case,  walfh_next_logname would not be empty and it would be
               reasonable for us to open and use the next log file. */

            if (strlen(log_header->walfh_next_logname) != 0) {
                  LLIO_close(0, log_fd);
                  strcpy(prev_logfname, logname);
                  strcpy(logname, log_header->walfh_next_logname);
                  prev_logfpartition_offset = log_partition_offset;
                  return setup_log_header_info(status_vector, WAL_handle, logname,
                                                             log_header->
                                                             walfh_next_log_partition_offset, 0L,
                                                             logfile_fd, log_header, rollover,
                                                             prev_logfname,
                                                             prev_logfpartition_offset, takeover);
            }
      }

      *logfile_fd = log_fd;
      log_header->walfh_wals_id = WAL_segment->wals_id;
      log_header->walfh_flags |= WALFH_OPEN;
      log_header->walfh_flags |= WALFH_KEEP_FOR_SHORT_TERM_RECV;
      if (JOURNAL_HANDLE && !(LOCAL_FLAGS & WALW_DISABLING_JRN))
            log_header->walfh_flags |= WALFH_KEEP_FOR_LONG_TERM_RECV;

      return write_log_header_and_reposition(status_vector, logname, log_fd,
                                                               log_header);
}


static SSHORT write_log_header_and_reposition(
                                                                    ISC_STATUS * status_vector,
                                                                    SCHAR * logname,
                                                                    SLONG log_fd, WALFH log_header)
{
/**************************************
 *
 *    w r i t e _ l o g _ h e a d e r _ a n d _ r e p o s i t i o n
 *
 **************************************
 *
 * Functional description
 *    Write the passed log_header at the beginning of the log file
 *    and then seek it to the end as per log_header->walfh_length value.
 *    Returns FB_SUCCESS or FB_FAILURE.
 *
 **************************************/

      if (LLIO_write(status_vector, log_fd, logname,
                           log_header->walfh_log_partition_offset, LLIO_SEEK_BEGIN,
                           (UCHAR *) log_header, WALFH_LENGTH, 0))
            return FB_FAILURE;

/* Let's write an invalid block header at the end so that even if we
   are reusing an old log file, we don't use its old data */

      if (LLIO_write(status_vector, log_fd, logname,
                           log_header->walfh_log_partition_offset +
                           log_header->walfh_length, LLIO_SEEK_BEGIN,
                           (UCHAR *) log_terminator_block, WAL_TERMINATOR_BLKLEN, 0))
            return FB_FAILURE;

      if (LLIO_seek(status_vector, log_fd, logname, -1 * WAL_TERMINATOR_BLKLEN,
                          LLIO_SEEK_CURRENT)) return FB_FAILURE;

      return FB_SUCCESS;
}


static SSHORT write_wal_block(
                                            ISC_STATUS * status_vector,
                                            WALBLK * wblk, SCHAR * logname, SLONG log_fd)
{
/**************************************
 *
 *    w r i t e _ w a l _ b l o c k
 *
 **************************************
 *
 * Functional description
 *    Writes the prepared wal buffer to WAL file.
 *
 **************************************/

      if (LLIO_write(status_vector, log_fd, logname, 0L, LLIO_SEEK_NONE,
                           wblk->walblk_buf, wblk->walblk_roundup_offset, 0))
            return FB_FAILURE;

      return FB_SUCCESS;
}


static void write_wal_statistics( WAL WAL_handle)
{
/**************************************
 *
 *    w r i t e _ w a l _ s t a t i s t i c s
 *
 **************************************
 *
 * Functional description
 *    Writes the performance statistics for the current WAL session.
 *
 *    these messages were considered for translation but left as is for now
 *    since they are likely only to be seen by DBA's and technical types
 *
 **************************************/
      WALS WAL_segment;
      IB_FILE *stat_file;

      WAL_segment = WAL_handle->wal_segment;
      if (PRINT_DEBUG_MSGS) {
            stat_file = DEBUG_FD;

            ib_fprintf(stat_file,
                           "-----------------------------------------------\n");
            PRINT_TIME(stat_file, LOCAL_TIME);
            ib_fprintf(stat_file,
                           "WAL writer (pid=%d) for database %s, shutdown statistics:\n",
                           getpid(), DBNAME);
            ib_fprintf(stat_file,
                           "WAL buffer size=%d, total buffers=%d, original buffers=%d\n",
                           WAL_segment->wals_bufsize, WAL_segment->wals_maxbufs,
                           WAL_segment->wals_initial_maxbufs);
            ib_fprintf(stat_file, "Max ckpt interval=%ld\n",
                           WAL_segment->wals_max_ckpt_intrvl);
            ib_fprintf(stat_file,
                           "WAL segment acquire count=%ld, number of WAL_put() calls=%ld\n",
                           WAL_segment->wals_acquire_count,
                           WAL_segment->wals_put_count);
            ib_fprintf(stat_file,
                           "Total IOs=%"SLONGFORMAT", Avg IO size=%"SLONGFORMAT
                           ", last blk_seqno=%"SLONGFORMAT"\n",
                           WAL_segment->wals_IO_count,
                           WAL_segment->wals_total_IO_bytes /
                           (WAL_segment->wals_IO_count ? WAL_segment->
                              wals_IO_count : 1), WAL_segment->wals_blkseqno - 1);
            ib_fprintf(stat_file, "grpc wait micro-seconds=%"SLONGFORMAT"\n",
                           WAL_segment->wals_grpc_wait_usecs);
            ib_fprintf(stat_file,
                           "Total commits=%"SLONGFORMAT", Number of group-commits=%"
                           SLONGFORMAT", Avg grpc size=%f\n",
                           WAL_segment->wals_commit_count,
                           WAL_segment->wals_grpc_count,
                           (WAL_segment->wals_commit_count * 1.0) /
                           (WAL_segment->wals_grpc_count ? WAL_segment->
                              wals_grpc_count : WAL_segment->wals_commit_count));
            ib_fprintf(stat_file, "current log seqno=%ld, logfile=%s\n",
                           WAL_segment->wals_log_seqno, WAL_segment->wals_logname);
            ib_fprintf(stat_file,
                           "log partition offset=%"SLONGFORMAT", current offset=%"
                           SLONGFORMAT"\n",
                           WAL_segment->wals_log_partition_offset,
                           WAL_segment->wals_buf_offset);
            ib_fprintf(stat_file,
                           "-----------------------------------------------\n");
      }
}


} // extern "C"


Generated by  Doxygen 1.6.0   Back to index