Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Defines | Typedefs | Enumerations | Functions | Variables

pgbench.c File Reference

#include "postgres_fe.h"
#include "getopt_long.h"
#include "libpq-fe.h"
#include "portability/instr_time.h"
#include <ctype.h>
#include <math.h>
#include <signal.h>
#include <sys/time.h>
#include <unistd.h>
#include <sys/wait.h>
Include dependency graph for pgbench.c:

Go to the source code of this file.

Data Structures

struct  Variable
struct  CState
struct  TState
struct  TResult
struct  Command
struct  AggVals
struct  fork_pthread

Defines

#define INT64_MAX   INT64CONST(0x7FFFFFFFFFFFFFFF)
#define pthread_t   pg_pthread_t
#define pthread_attr_t   pg_pthread_attr_t
#define pthread_create   pg_pthread_create
#define pthread_join   pg_pthread_join
#define MAXCLIENTS   1024
#define LOG_STEP_SECONDS   5
#define DEFAULT_NXACTS   10
#define nbranches   1
#define ntellers   10
#define naccounts   100000
#define SCALE_32BIT_THRESHOLD   20000
#define MAX_FILES   128
#define SHELL_COMMAND_SIZE   256
#define INVALID_THREAD   ((pthread_t) 0)
#define SQL_COMMAND   1
#define META_COMMAND   2
#define MAX_ARGS   10
#define PARAMS_ARRAY_SIZE   7
#define MAX_PREPARE_NAME   32
#define SCALE_32BIT_THRESHOLD   20000
#define COMMANDS_ALLOC_NUM   128
#define COMMANDS_ALLOC_NUM   128

Typedefs

typedef struct fork_pthreadpthread_t
typedef int pthread_attr_t
typedef enum QueryMode QueryMode
typedef struct fork_pthread fork_pthread

Enumerations

enum  QueryMode { QUERY_SIMPLE, QUERY_EXTENDED, QUERY_PREPARED, NUM_QUERYMODE }

Functions

static int pthread_create (pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
static int pthread_join (pthread_t th, void **thread_return)
static void setalarm (int seconds)
static void * threadRun (void *arg)
static void usage (void)
static int64 strtoint64 (const char *str)
static int64 getrand (TState *thread, int64 min, int64 max)
static void executeStatement (PGconn *con, const char *sql)
static PGconndoConnect (void)
static void discard_response (CState *state)
static int compareVariables (const void *v1, const void *v2)
static char * getVariable (CState *st, char *name)
static bool isLegalVariableName (const char *name)
static int putVariable (CState *st, const char *context, char *name, char *value)
static char * parseVariable (const char *sql, int *eaten)
static char * replaceVariable (char **sql, char *param, int len, char *value)
static char * assignVariables (CState *st, char *sql)
static void getQueryParams (CState *st, const Command *command, const char **params)
static bool runShellCommand (CState *st, char *variable, char **argv, int argc)
static void preparedStatementName (char *buffer, int file, int state)
static bool clientDone (CState *st, bool ok)
static void agg_vals_init (AggVals *aggs, instr_time start)
static bool doCustom (TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
static void disconnect_all (CState *state, int length)
static void init (bool is_no_vacuum)
static bool parseQuery (Command *cmd, const char *raw_sql)
static Commandprocess_commands (char *buf)
static int process_file (char *filename)
static Command ** process_builtin (char *tb)
static void printResults (int ttype, int normal_xacts, int nclients, TState *threads, int nthreads, instr_time total_time, instr_time conn_total_time)
int main (int argc, char **argv)
static void handle_sig_alarm (SIGNAL_ARGS)

Variables

char * optarg
int optind
int nxacts = 0
int duration = 0
int scale = 1
int fillfactor = 100
int foreign_keys = 0
int unlogged_tables = 0
double sample_rate = 0.0
char * tablespace = NULL
char * index_tablespace = NULL
bool use_log
bool use_quiet
int agg_interval
bool is_connect
bool is_latencies
int main_pid
char * pghost = ""
char * pgport = ""
char * login = NULL
char * dbName
const char * progname
volatile bool timer_exceeded = false
static QueryMode querymode = QUERY_SIMPLE
static const char * QUERYMODE [] = {"simple", "extended", "prepared"}
static Command ** sql_files [MAX_FILES]
static int num_files
static int num_commands = 0
static int debug = 0
static char * tpc_b
static char * simple_update
static char * select_only

Define Documentation

#define COMMANDS_ALLOC_NUM   128
#define COMMANDS_ALLOC_NUM   128
#define DEFAULT_NXACTS   10

Definition at line 107 of file pgbench.c.

#define INT64_MAX   INT64CONST(0x7FFFFFFFFFFFFFFF)

Definition at line 58 of file pgbench.c.

Referenced by threadRun().

#define INVALID_THREAD   ((pthread_t) 0)

Definition at line 226 of file pgbench.c.

Referenced by main().

#define LOG_STEP_SECONDS   5

Definition at line 106 of file pgbench.c.

Referenced by init().

#define MAX_ARGS   10

Definition at line 239 of file pgbench.c.

Referenced by parseQuery().

#define MAX_FILES   128

Definition at line 185 of file pgbench.c.

Referenced by process_file().

#define MAX_PREPARE_NAME   32

Definition at line 856 of file pgbench.c.

#define MAXCLIENTS   1024

Definition at line 103 of file pgbench.c.

Referenced by main().

#define META_COMMAND   2

Definition at line 238 of file pgbench.c.

Referenced by doCustom(), and threadRun().

#define naccounts   100000

Definition at line 152 of file pgbench.c.

Referenced by init().

#define nbranches   1

Definition at line 149 of file pgbench.c.

Referenced by init().

#define ntellers   10

Definition at line 151 of file pgbench.c.

Referenced by init().

#define PARAMS_ARRAY_SIZE   7
#define pthread_attr_t   pg_pthread_attr_t

Definition at line 81 of file pgbench.c.

#define pthread_create   pg_pthread_create

Definition at line 82 of file pgbench.c.

Referenced by main().

#define pthread_join   pg_pthread_join

Definition at line 83 of file pgbench.c.

Referenced by main().

#define pthread_t   pg_pthread_t

Definition at line 80 of file pgbench.c.

#define SCALE_32BIT_THRESHOLD   20000

Definition at line 161 of file pgbench.c.

Referenced by init().

#define SCALE_32BIT_THRESHOLD   20000

Definition at line 161 of file pgbench.c.

#define SHELL_COMMAND_SIZE   256

Definition at line 186 of file pgbench.c.

Referenced by runShellCommand().

#define SQL_COMMAND   1

Definition at line 237 of file pgbench.c.

Referenced by doCustom().


Typedef Documentation

typedef struct fork_pthread fork_pthread
typedef int pthread_attr_t

Definition at line 86 of file pgbench.c.

typedef struct fork_pthread* pthread_t

Definition at line 85 of file pgbench.c.

typedef enum QueryMode QueryMode

Enumeration Type Documentation

enum QueryMode
Enumerator:
QUERY_SIMPLE 
QUERY_EXTENDED 
QUERY_PREPARED 
NUM_QUERYMODE 

Definition at line 241 of file pgbench.c.

{
    QUERY_SIMPLE,               /* simple query */
    QUERY_EXTENDED,             /* extended query */
    QUERY_PREPARED,             /* extended query with prepared statements */
    NUM_QUERYMODE
} QueryMode;


Function Documentation

static void agg_vals_init ( AggVals aggs,
instr_time  start 
) [static]

Definition at line 877 of file pgbench.c.

References AggVals::cnt, INSTR_TIME_GET_DOUBLE, AggVals::max_duration, AggVals::min_duration, AggVals::start_time, AggVals::sum, and AggVals::sum2.

Referenced by threadRun().

{
    /* basic counters */
    aggs->cnt = 0;      /* number of transactions */
    aggs->sum = 0;      /* SUM(duration) */
    aggs->sum2 = 0;     /* SUM(duration*duration) */

    /* min and max transaction duration */
    aggs->min_duration = 0;
    aggs->max_duration = 0;

    /* start of the current interval */
    aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
}

static char* assignVariables ( CState st,
char *  sql 
) [static]

Definition at line 706 of file pgbench.c.

References free, getVariable(), name, NULL, parseVariable(), replaceVariable(), and val.

Referenced by doCustom().

{
    char       *p,
               *name,
               *val;

    p = sql;
    while ((p = strchr(p, ':')) != NULL)
    {
        int         eaten;

        name = parseVariable(p, &eaten);
        if (name == NULL)
        {
            while (*p == ':')
            {
                p++;
            }
            continue;
        }

        val = getVariable(st, name);
        free(name);
        if (val == NULL)
        {
            p++;
            continue;
        }

        p = replaceVariable(&sql, p, eaten, val);
    }

    return sql;
}

static bool clientDone ( CState st,
bool  ok 
) [static]

Definition at line 864 of file pgbench.c.

References CState::con, NULL, and PQfinish().

Referenced by doCustom().

{
    (void) ok;                  /* unused */

    if (st->con != NULL)
    {
        PQfinish(st->con);
        st->con = NULL;
    }
    return false;               /* always false */
}

static int compareVariables ( const void *  v1,
const void *  v2 
) [static]

Definition at line 558 of file pgbench.c.

References name.

Referenced by putVariable().

{
    return strcmp(((const Variable *) v1)->name,
                  ((const Variable *) v2)->name);
}

static void discard_response ( CState state  )  [static]

Definition at line 545 of file pgbench.c.

References CState::con, PQclear(), and PQgetResult().

Referenced by doCustom().

{
    PGresult   *res;

    do
    {
        res = PQgetResult(state->con);
        if (res)
            PQclear(res);
    } while (res);
}

static void disconnect_all ( CState state,
int  length 
) [static]

Definition at line 1403 of file pgbench.c.

References CState::con, i, and PQfinish().

Referenced by main(), and threadRun().

{
    int         i;

    for (i = 0; i < length; i++)
    {
        if (state[i].con)
        {
            PQfinish(state[i].con);
            state[i].con = NULL;
        }
    }
}

static PGconn* doConnect ( void   )  [static]

Definition at line 478 of file pgbench.c.

References conn, CONNECTION_BAD, NULL, PQconnectdbParams(), PQconnectionNeedsPassword(), PQerrorMessage(), PQfinish(), PQstatus(), simple_prompt(), and values.

Referenced by doCustom(), init(), main(), and threadRun().

{
    PGconn     *conn;
    static char *password = NULL;
    bool        new_pass;

    /*
     * Start the connection.  Loop until we have a password if requested by
     * backend.
     */
    do
    {
#define PARAMS_ARRAY_SIZE   7

        const char *keywords[PARAMS_ARRAY_SIZE];
        const char *values[PARAMS_ARRAY_SIZE];

        keywords[0] = "host";
        values[0] = pghost;
        keywords[1] = "port";
        values[1] = pgport;
        keywords[2] = "user";
        values[2] = login;
        keywords[3] = "password";
        values[3] = password;
        keywords[4] = "dbname";
        values[4] = dbName;
        keywords[5] = "fallback_application_name";
        values[5] = progname;
        keywords[6] = NULL;
        values[6] = NULL;

        new_pass = false;

        conn = PQconnectdbParams(keywords, values, true);

        if (!conn)
        {
            fprintf(stderr, "Connection to database \"%s\" failed\n",
                    dbName);
            return NULL;
        }

        if (PQstatus(conn) == CONNECTION_BAD &&
            PQconnectionNeedsPassword(conn) &&
            password == NULL)
        {
            PQfinish(conn);
            password = simple_prompt("Password: ", 100, false);
            new_pass = true;
        }
    } while (new_pass);

    /* check to see that the backend connection was successfully made */
    if (PQstatus(conn) == CONNECTION_BAD)
    {
        fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
                dbName, PQerrorMessage(conn));
        PQfinish(conn);
        return NULL;
    }

    return conn;
}

static bool doCustom ( TState thread,
CState st,
instr_time conn_time,
FILE *  logfile,
AggVals agg 
) [static]

Definition at line 894 of file pgbench.c.

References Command::argc, Command::argv, assignVariables(), clientDone(), CState::cnt, AggVals::cnt, Command::command_num, CState::con, discard_response(), doConnect(), CState::ecnt, TState::exec_count, TState::exec_elapsed, free, getQueryParams(), getrand(), getVariable(), i, CState::id, INSTR_TIME_ACCUM_DIFF, INSTR_TIME_GET_DOUBLE, INSTR_TIME_GET_MICROSEC, INSTR_TIME_SET_CURRENT, INSTR_TIME_SUBTRACT, CState::listen, AggVals::max_duration, META_COMMAND, AggVals::min_duration, name, NULL, pg_erand48(), pg_strcasecmp(), pg_strdup(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfinish(), PQgetResult(), PQisBusy(), PQprepare(), PQresultStatus(), PQsendQuery(), PQsendQueryParams(), PQsendQueryPrepared(), CState::prepared, preparedStatementName(), putVariable(), QUERY_EXTENDED, QUERY_PREPARED, QUERY_SIMPLE, TState::random_state, runShellCommand(), CState::sleeping, snprintf(), SQL_COMMAND, AggVals::start_time, CState::state, CState::stmt_begin, strtoint64(), AggVals::sum, AggVals::sum2, CState::txn_begin, Command::type, CState::until, and CState::use_file.

Referenced by threadRun().

{
    PGresult   *res;
    Command   **commands;

top:
    commands = sql_files[st->use_file];

    if (st->sleeping)
    {                           /* are we sleeping? */
        instr_time  now;

        INSTR_TIME_SET_CURRENT(now);
        if (st->until <= INSTR_TIME_GET_MICROSEC(now))
            st->sleeping = 0;   /* Done sleeping, go ahead with next command */
        else
            return true;        /* Still sleeping, nothing to do here */
    }

    if (st->listen)
    {                           /* are we receiver? */
        if (commands[st->state]->type == SQL_COMMAND)
        {
            if (debug)
                fprintf(stderr, "client %d receiving\n", st->id);
            if (!PQconsumeInput(st->con))
            {                   /* there's something wrong */
                fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
                return clientDone(st, false);
            }
            if (PQisBusy(st->con))
                return true;    /* don't have the whole result yet */
        }

        /*
         * command finished: accumulate per-command execution times in
         * thread-local data structure, if per-command latencies are requested
         */
        if (is_latencies)
        {
            instr_time  now;
            int         cnum = commands[st->state]->command_num;

            INSTR_TIME_SET_CURRENT(now);
            INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
                                  now, st->stmt_begin);
            thread->exec_count[cnum]++;
        }

        /*
         * if transaction finished, record the time it took in the log
         */
        if (logfile && commands[st->state + 1] == NULL)
        {
            instr_time  now;
            instr_time  diff;
            double      usec;

            /*
             * write the log entry if this row belongs to the random sample,
             * or no sampling rate was given which means log everything.
             */
            if (sample_rate == 0.0 ||
                pg_erand48(thread->random_state) <= sample_rate)
            {
                INSTR_TIME_SET_CURRENT(now);
                diff = now;
                INSTR_TIME_SUBTRACT(diff, st->txn_begin);
                usec = (double) INSTR_TIME_GET_MICROSEC(diff);

                /* should we aggregate the results or not? */
                if (agg_interval > 0)
                {
                    /* are we still in the same interval? if yes, accumulate the
                    * values (print them otherwise) */
                    if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
                    {
                        agg->cnt += 1;
                        agg->sum  += usec;
                        agg->sum2 += usec * usec;

                        /* first in this aggregation interval */
                        if ((agg->cnt == 1) || (usec < agg->min_duration))
                            agg->min_duration =  usec;

                        if ((agg->cnt == 1) || (usec > agg->max_duration))
                            agg->max_duration = usec;
                    }
                    else
                    {
                        /* Loop until we reach the interval of the current transaction (and
                         * print all the empty intervals in between). */
                        while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
                        {
                            /* This is a non-Windows branch (thanks to the ifdef in usage), so
                             * we don't need to handle this in a special way (see below). */
                            fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n",
                                    agg->start_time, agg->cnt, agg->sum, agg->sum2,
                                    agg->min_duration, agg->max_duration);

                            /* move to the next inteval */
                            agg->start_time = agg->start_time + agg_interval;

                            /* reset for "no transaction" intervals */
                            agg->cnt = 0;
                            agg->min_duration = 0;
                            agg->max_duration = 0;
                            agg->sum = 0;
                            agg->sum2 = 0;
                        }

                        /* and now update the reset values (include the current) */
                        agg->cnt = 1;
                        agg->min_duration = usec;
                        agg->max_duration = usec;
                        agg->sum = usec;
                        agg->sum2 = usec * usec;
                    }
                }
                else
                {
                    /* no, print raw transactions */
#ifndef WIN32
                    /* This is more than we really ought to know about instr_time */
                    fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
                            st->id, st->cnt, usec, st->use_file,
                            (long) now.tv_sec, (long) now.tv_usec);
#else
                    /* On Windows, instr_time doesn't provide a timestamp anyway */
                    fprintf(logfile, "%d %d %.0f %d 0 0\n",
                            st->id, st->cnt, usec, st->use_file);
#endif
                }
            }
        }

        if (commands[st->state]->type == SQL_COMMAND)
        {
            /*
             * Read and discard the query result; note this is not included in
             * the statement latency numbers.
             */
            res = PQgetResult(st->con);
            switch (PQresultStatus(res))
            {
                case PGRES_COMMAND_OK:
                case PGRES_TUPLES_OK:
                    break;      /* OK */
                default:
                    fprintf(stderr, "Client %d aborted in state %d: %s",
                            st->id, st->state, PQerrorMessage(st->con));
                    PQclear(res);
                    return clientDone(st, false);
            }
            PQclear(res);
            discard_response(st);
        }

        if (commands[st->state + 1] == NULL)
        {
            if (is_connect)
            {
                PQfinish(st->con);
                st->con = NULL;
            }

            ++st->cnt;
            if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
                return clientDone(st, true);    /* exit success */
        }

        /* increment state counter */
        st->state++;
        if (commands[st->state] == NULL)
        {
            st->state = 0;
            st->use_file = (int) getrand(thread, 0, num_files - 1);
            commands = sql_files[st->use_file];
        }
    }

    if (st->con == NULL)
    {
        instr_time  start,
                    end;

        INSTR_TIME_SET_CURRENT(start);
        if ((st->con = doConnect()) == NULL)
        {
            fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
            return clientDone(st, false);
        }
        INSTR_TIME_SET_CURRENT(end);
        INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
    }

    /* Record transaction start time if logging is enabled */
    if (logfile && st->state == 0)
        INSTR_TIME_SET_CURRENT(st->txn_begin);

    /* Record statement start time if per-command latencies are requested */
    if (is_latencies)
        INSTR_TIME_SET_CURRENT(st->stmt_begin);

    if (commands[st->state]->type == SQL_COMMAND)
    {
        const Command *command = commands[st->state];
        int         r;

        if (querymode == QUERY_SIMPLE)
        {
            char       *sql;

            sql = pg_strdup(command->argv[0]);
            sql = assignVariables(st, sql);

            if (debug)
                fprintf(stderr, "client %d sending %s\n", st->id, sql);
            r = PQsendQuery(st->con, sql);
            free(sql);
        }
        else if (querymode == QUERY_EXTENDED)
        {
            const char *sql = command->argv[0];
            const char *params[MAX_ARGS];

            getQueryParams(st, command, params);

            if (debug)
                fprintf(stderr, "client %d sending %s\n", st->id, sql);
            r = PQsendQueryParams(st->con, sql, command->argc - 1,
                                  NULL, params, NULL, NULL, 0);
        }
        else if (querymode == QUERY_PREPARED)
        {
            char        name[MAX_PREPARE_NAME];
            const char *params[MAX_ARGS];

            if (!st->prepared[st->use_file])
            {
                int         j;

                for (j = 0; commands[j] != NULL; j++)
                {
                    PGresult   *res;
                    char        name[MAX_PREPARE_NAME];

                    if (commands[j]->type != SQL_COMMAND)
                        continue;
                    preparedStatementName(name, st->use_file, j);
                    res = PQprepare(st->con, name,
                          commands[j]->argv[0], commands[j]->argc - 1, NULL);
                    if (PQresultStatus(res) != PGRES_COMMAND_OK)
                        fprintf(stderr, "%s", PQerrorMessage(st->con));
                    PQclear(res);
                }
                st->prepared[st->use_file] = true;
            }

            getQueryParams(st, command, params);
            preparedStatementName(name, st->use_file, st->state);

            if (debug)
                fprintf(stderr, "client %d sending %s\n", st->id, name);
            r = PQsendQueryPrepared(st->con, name, command->argc - 1,
                                    params, NULL, NULL, 0);
        }
        else    /* unknown sql mode */
            r = 0;

        if (r == 0)
        {
            if (debug)
                fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
            st->ecnt++;
        }
        else
            st->listen = 1;     /* flags that should be listened */
    }
    else if (commands[st->state]->type == META_COMMAND)
    {
        int         argc = commands[st->state]->argc,
                    i;
        char      **argv = commands[st->state]->argv;

        if (debug)
        {
            fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
            for (i = 1; i < argc; i++)
                fprintf(stderr, " %s", argv[i]);
            fprintf(stderr, "\n");
        }

        if (pg_strcasecmp(argv[0], "setrandom") == 0)
        {
            char       *var;
            int64       min,
                        max;
            char        res[64];

            if (*argv[2] == ':')
            {
                if ((var = getVariable(st, argv[2] + 1)) == NULL)
                {
                    fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
                    st->ecnt++;
                    return true;
                }
                min = strtoint64(var);
            }
            else
                min = strtoint64(argv[2]);

#ifdef NOT_USED
            if (min < 0)
            {
                fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
                st->ecnt++;
                return;
            }
#endif

            if (*argv[3] == ':')
            {
                if ((var = getVariable(st, argv[3] + 1)) == NULL)
                {
                    fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
                    st->ecnt++;
                    return true;
                }
                max = strtoint64(var);
            }
            else
                max = strtoint64(argv[3]);

            if (max < min)
            {
                fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
                st->ecnt++;
                return true;
            }

            /*
             * getrand() needs to be able to subtract max from min and add
             * one to the result without overflowing.  Since we know max > min,
             * we can detect overflow just by checking for a negative result.
             * But we must check both that the subtraction doesn't overflow,
             * and that adding one to the result doesn't overflow either.
             */
            if (max - min < 0 || (max - min) + 1 < 0)
            {
                fprintf(stderr, "%s: range too large\n", argv[0]);
                st->ecnt++;
                return true;
            }

#ifdef DEBUG
            printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
#endif
            snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));

            if (!putVariable(st, argv[0], argv[1], res))
            {
                st->ecnt++;
                return true;
            }

            st->listen = 1;
        }
        else if (pg_strcasecmp(argv[0], "set") == 0)
        {
            char       *var;
            int64       ope1,
                        ope2;
            char        res[64];

            if (*argv[2] == ':')
            {
                if ((var = getVariable(st, argv[2] + 1)) == NULL)
                {
                    fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
                    st->ecnt++;
                    return true;
                }
                ope1 = strtoint64(var);
            }
            else
                ope1 = strtoint64(argv[2]);

            if (argc < 5)
                snprintf(res, sizeof(res), INT64_FORMAT, ope1);
            else
            {
                if (*argv[4] == ':')
                {
                    if ((var = getVariable(st, argv[4] + 1)) == NULL)
                    {
                        fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
                        st->ecnt++;
                        return true;
                    }
                    ope2 = strtoint64(var);
                }
                else
                    ope2 = strtoint64(argv[4]);

                if (strcmp(argv[3], "+") == 0)
                    snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
                else if (strcmp(argv[3], "-") == 0)
                    snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
                else if (strcmp(argv[3], "*") == 0)
                    snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
                else if (strcmp(argv[3], "/") == 0)
                {
                    if (ope2 == 0)
                    {
                        fprintf(stderr, "%s: division by zero\n", argv[0]);
                        st->ecnt++;
                        return true;
                    }
                    snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
                }
                else
                {
                    fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
                    st->ecnt++;
                    return true;
                }
            }

            if (!putVariable(st, argv[0], argv[1], res))
            {
                st->ecnt++;
                return true;
            }

            st->listen = 1;
        }
        else if (pg_strcasecmp(argv[0], "sleep") == 0)
        {
            char       *var;
            int         usec;
            instr_time  now;

            if (*argv[1] == ':')
            {
                if ((var = getVariable(st, argv[1] + 1)) == NULL)
                {
                    fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
                    st->ecnt++;
                    return true;
                }
                usec = atoi(var);
            }
            else
                usec = atoi(argv[1]);

            if (argc > 2)
            {
                if (pg_strcasecmp(argv[2], "ms") == 0)
                    usec *= 1000;
                else if (pg_strcasecmp(argv[2], "s") == 0)
                    usec *= 1000000;
            }
            else
                usec *= 1000000;

            INSTR_TIME_SET_CURRENT(now);
            st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
            st->sleeping = 1;

            st->listen = 1;
        }
        else if (pg_strcasecmp(argv[0], "setshell") == 0)
        {
            bool        ret = runShellCommand(st, argv[1], argv + 2, argc - 2);

            if (timer_exceeded) /* timeout */
                return clientDone(st, true);
            else if (!ret)      /* on error */
            {
                st->ecnt++;
                return true;
            }
            else    /* succeeded */
                st->listen = 1;
        }
        else if (pg_strcasecmp(argv[0], "shell") == 0)
        {
            bool        ret = runShellCommand(st, NULL, argv + 1, argc - 1);

            if (timer_exceeded) /* timeout */
                return clientDone(st, true);
            else if (!ret)      /* on error */
            {
                st->ecnt++;
                return true;
            }
            else    /* succeeded */
                st->listen = 1;
        }
        goto top;
    }

    return true;
}

static void executeStatement ( PGconn con,
const char *  sql 
) [static]

Definition at line 463 of file pgbench.c.

References PGRES_COMMAND_OK, PQclear(), PQerrorMessage(), PQexec(), and PQresultStatus().

Referenced by init(), and main().

{
    PGresult   *res;

    res = PQexec(con, sql);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        fprintf(stderr, "%s", PQerrorMessage(con));
        exit(1);
    }
    PQclear(res);
}

static void getQueryParams ( CState st,
const Command command,
const char **  params 
) [static]

Definition at line 742 of file pgbench.c.

References Command::argc, Command::argv, getVariable(), and i.

Referenced by doCustom().

{
    int         i;

    for (i = 0; i < command->argc - 1; i++)
        params[i] = getVariable(st, command->argv[i + 1]);
}

static int64 getrand ( TState thread,
int64  min,
int64  max 
) [static]

Definition at line 447 of file pgbench.c.

References pg_erand48(), and TState::random_state.

Referenced by doCustom(), and threadRun().

{
    /*
     * Odd coding is so that min and max have approximately the same chance of
     * being selected as do numbers between them.
     *
     * pg_erand48() is thread-safe and concurrent, which is why we use it
     * rather than random(), which in glibc is non-reentrant, and therefore
     * protected by a mutex, and therefore a bottleneck on machines with many
     * CPUs.
     */
    return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
}

static char* getVariable ( CState st,
char *  name 
) [static]

Definition at line 565 of file pgbench.c.

References Variable::name, NULL, CState::nvariables, Variable::value, and CState::variables.

Referenced by assignVariables(), doCustom(), getQueryParams(), main(), and runShellCommand().

{
    Variable    key,
               *var;

    /* On some versions of Solaris, bsearch of zero items dumps core */
    if (st->nvariables <= 0)
        return NULL;

    key.name = name;
    var = (Variable *) bsearch((void *) &key,
                               (void *) st->variables,
                               st->nvariables,
                               sizeof(Variable),
                               compareVariables);
    if (var != NULL)
        return var->value;
    else
        return NULL;
}

static void handle_sig_alarm ( SIGNAL_ARGS   )  [static]

Definition at line 2854 of file pgbench.c.

Referenced by setalarm().

{
    timer_exceeded = true;
}

static void init ( bool  is_no_vacuum  )  [static]

Definition at line 1419 of file pgbench.c.

References doConnect(), executeStatement(), i, INSTR_TIME_GET_DOUBLE, INSTR_TIME_SET_CURRENT, INSTR_TIME_SUBTRACT, lengthof, LOG_STEP_SECONDS, naccounts, nbranches, ntellers, NULL, PGRES_COPY_IN, PQclear(), PQendcopy(), PQerrorMessage(), PQescapeIdentifier(), PQexec(), PQfinish(), PQfreemem(), PQputline(), PQresultStatus(), SCALE_32BIT_THRESHOLD, and snprintf().

Referenced by compat_find_digest(), from_char_parse_int_len(), and main().

{

/* The scale factor at/beyond which 32bit integers are incapable of storing
 * 64bit values.
 *
 * Although the actual threshold is 21474, we use 20000 because it is easier to
 * document and remember, and isn't that far away from the real threshold.
 */
#define SCALE_32BIT_THRESHOLD 20000

    /*
     * Note: TPC-B requires at least 100 bytes per row, and the "filler"
     * fields in these table declarations were intended to comply with that.
     * But because they default to NULLs, they don't actually take any space.
     * We could fix that by giving them non-null default values. However, that
     * would completely break comparability of pgbench results with prior
     * versions.  Since pgbench has never pretended to be fully TPC-B
     * compliant anyway, we stick with the historical behavior.
     */
    struct ddlinfo
    {
        char       *table;
        char       *cols;
        int         declare_fillfactor;
    };
    struct ddlinfo DDLs[] = {
        {
            "pgbench_history",
            scale >= SCALE_32BIT_THRESHOLD
                ? "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)"
                : "tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",
            0
        },
        {
            "pgbench_tellers",
            "tid int not null,bid int,tbalance int,filler char(84)",
            1
        },
        {
            "pgbench_accounts",
            scale >= SCALE_32BIT_THRESHOLD
                ? "aid bigint not null,bid int,abalance int,filler char(84)"
                : "aid    int not null,bid int,abalance int,filler char(84)",
            1
        },
        {
            "pgbench_branches",
            "bid int not null,bbalance int,filler char(88)",
            1
        }
    };
    static char *DDLAFTERs[] = {
        "alter table pgbench_branches add primary key (bid)",
        "alter table pgbench_tellers add primary key (tid)",
        "alter table pgbench_accounts add primary key (aid)"
    };
    static char *DDLKEYs[] = {
        "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
        "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
        "alter table pgbench_history add foreign key (bid) references pgbench_branches",
        "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
        "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
    };

    PGconn     *con;
    PGresult   *res;
    char        sql[256];
    int         i;
    int64       k;

    /* used to track elapsed time and estimate of the remaining time */
    instr_time  start, diff;
    double      elapsed_sec, remaining_sec;
    int         log_interval = 1;

    if ((con = doConnect()) == NULL)
        exit(1);

    for (i = 0; i < lengthof(DDLs); i++)
    {
        char        opts[256];
        char        buffer[256];
        struct ddlinfo *ddl = &DDLs[i];

        /* Remove old table, if it exists. */
        snprintf(buffer, 256, "drop table if exists %s", ddl->table);
        executeStatement(con, buffer);

        /* Construct new create table statement. */
        opts[0] = '\0';
        if (ddl->declare_fillfactor)
            snprintf(opts + strlen(opts), 256 - strlen(opts),
                     " with (fillfactor=%d)", fillfactor);
        if (tablespace != NULL)
        {
            char       *escape_tablespace;

            escape_tablespace = PQescapeIdentifier(con, tablespace,
                                                   strlen(tablespace));
            snprintf(opts + strlen(opts), 256 - strlen(opts),
                     " tablespace %s", escape_tablespace);
            PQfreemem(escape_tablespace);
        }
        snprintf(buffer, 256, "create%s table %s(%s)%s",
                 unlogged_tables ? " unlogged" : "",
                 ddl->table, ddl->cols, opts);

        executeStatement(con, buffer);
    }

    executeStatement(con, "begin");

    for (i = 0; i < nbranches * scale; i++)
    {
        snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
        executeStatement(con, sql);
    }

    for (i = 0; i < ntellers * scale; i++)
    {
        snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
                 i + 1, i / ntellers + 1);
        executeStatement(con, sql);
    }

    executeStatement(con, "commit");

    /*
     * fill the pgbench_accounts table with some data
     */
    fprintf(stderr, "creating tables...\n");

    executeStatement(con, "begin");
    executeStatement(con, "truncate pgbench_accounts");

    res = PQexec(con, "copy pgbench_accounts from stdin");
    if (PQresultStatus(res) != PGRES_COPY_IN)
    {
        fprintf(stderr, "%s", PQerrorMessage(con));
        exit(1);
    }
    PQclear(res);

    INSTR_TIME_SET_CURRENT(start);

    for (k = 0; k < (int64) naccounts * scale; k++)
    {
        int64       j = k + 1;

        snprintf(sql, 256, INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n", j, k / naccounts + 1, 0);
        if (PQputline(con, sql))
        {
            fprintf(stderr, "PQputline failed\n");
            exit(1);
        }

        /* If we want to stick with the original logging, print a message each
         * 100k inserted rows. */
        if ((! use_quiet) && (j % 100000 == 0))
        {
            INSTR_TIME_SET_CURRENT(diff);
            INSTR_TIME_SUBTRACT(diff, start);

            elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
            remaining_sec = (scale * naccounts - j) * elapsed_sec / j;

            fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
                            j, (int64)naccounts * scale,
                            (int) (((int64) j * 100) / (naccounts * scale)),
                            elapsed_sec, remaining_sec);
        }
        /* let's not call the timing for each row, but only each 100 rows */
        else if (use_quiet && (j % 100 == 0))
        {
            INSTR_TIME_SET_CURRENT(diff);
            INSTR_TIME_SUBTRACT(diff, start);

            elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
            remaining_sec = (scale * naccounts - j) * elapsed_sec / j;

            /* have we reached the next interval (or end)? */
            if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) {

                fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
                        j, (int64)naccounts * scale,
                        (int) (((int64) j * 100) / (naccounts * scale)), elapsed_sec, remaining_sec);

                /* skip to the next interval */
                log_interval = (int)ceil(elapsed_sec/LOG_STEP_SECONDS);
            }
        }

    }
    if (PQputline(con, "\\.\n"))
    {
        fprintf(stderr, "very last PQputline failed\n");
        exit(1);
    }
    if (PQendcopy(con))
    {
        fprintf(stderr, "PQendcopy failed\n");
        exit(1);
    }
    executeStatement(con, "commit");

    /* vacuum */
    if (!is_no_vacuum)
    {
        fprintf(stderr, "vacuum...\n");
        executeStatement(con, "vacuum analyze pgbench_branches");
        executeStatement(con, "vacuum analyze pgbench_tellers");
        executeStatement(con, "vacuum analyze pgbench_accounts");
        executeStatement(con, "vacuum analyze pgbench_history");
    }

    /*
     * create indexes
     */
    fprintf(stderr, "set primary keys...\n");
    for (i = 0; i < lengthof(DDLAFTERs); i++)
    {
        char        buffer[256];

        strncpy(buffer, DDLAFTERs[i], 256);

        if (index_tablespace != NULL)
        {
            char       *escape_tablespace;

            escape_tablespace = PQescapeIdentifier(con, index_tablespace,
                                                   strlen(index_tablespace));
            snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
                     " using index tablespace %s", escape_tablespace);
            PQfreemem(escape_tablespace);
        }

        executeStatement(con, buffer);
    }

    /*
     * create foreign keys
     */
    if (foreign_keys)
    {
        fprintf(stderr, "set foreign keys...\n");
        for (i = 0; i < lengthof(DDLKEYs); i++)
        {
            executeStatement(con, DDLKEYs[i]);
        }
    }


    fprintf(stderr, "done.\n");
    PQfinish(con);
}

static bool isLegalVariableName ( const char *  name  )  [static]

Definition at line 588 of file pgbench.c.

References i.

Referenced by putVariable().

{
    int         i;

    for (i = 0; name[i] != '\0'; i++)
    {
        if (!isalnum((unsigned char) name[i]) && name[i] != '_')
            return false;
    }

    return true;
}

int main ( int  argc,
char **  argv 
)

Definition at line 2080 of file pgbench.c.

References _, TResult::conn_time, CONNECTION_BAD, disconnect_all(), doConnect(), TState::exec_count, TState::exec_elapsed, executeStatement(), filename, free, get_progname(), getopt_long(), getVariable(), i, CState::id, init(), INSTR_TIME_ADD, INSTR_TIME_GET_MICROSEC, INSTR_TIME_SET_CURRENT, INSTR_TIME_SET_ZERO, INSTR_TIME_SUBTRACT, INVALID_THREAD, MAXCLIENTS, name, TState::nstate, NULL, CState::nvariables, pg_malloc(), pg_realloc(), pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQfinish(), PQgetvalue(), PQresultStatus(), PQstatus(), printResults(), process_builtin(), process_file(), pthread_create, pthread_join, putVariable(), random(), TState::random_state, setalarm(), snprintf(), srandom(), TState::start_time, start_time, TState::state, strerror(), TState::thread, threadRun(), TState::tid, usage(), val, value, and TResult::xacts.

{
    static struct option long_options[] = {
        {"foreign-keys", no_argument, &foreign_keys, 1},
        {"index-tablespace", required_argument, NULL, 3},
        {"tablespace", required_argument, NULL, 2},
        {"unlogged-tables", no_argument, &unlogged_tables, 1},
        {"sampling-rate", required_argument, NULL, 4},
        {"aggregate-interval", required_argument, NULL, 5},
        {NULL, 0, NULL, 0}
    };

    int         c;
    int         nclients = 1;   /* default number of simulated clients */
    int         nthreads = 1;   /* default number of threads */
    int         is_init_mode = 0;       /* initialize mode? */
    int         is_no_vacuum = 0;       /* no vacuum at all before testing? */
    int         do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
    int         ttype = 0;      /* transaction type. 0: TPC-B, 1: SELECT only,
                                 * 2: skip update of branches and tellers */
    int         optindex;
    char       *filename = NULL;
    bool        scale_given = false;

    CState     *state;          /* status of clients */
    TState     *threads;        /* array of thread */

    instr_time  start_time;     /* start up time */
    instr_time  total_time;
    instr_time  conn_total_time;
    int         total_xacts;

    int         i;

#ifdef HAVE_GETRLIMIT
    struct rlimit rlim;
#endif

    PGconn     *con;
    PGresult   *res;
    char       *env;

    char        val[64];

    progname = get_progname(argv[0]);

    if (argc > 1)
    {
        if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
        {
            usage();
            exit(0);
        }
        if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
        {
            puts("pgbench (PostgreSQL) " PG_VERSION);
            exit(0);
        }
    }

#ifdef WIN32
    /* stderr is buffered on Win32. */
    setvbuf(stderr, NULL, _IONBF, 0);
#endif

    if ((env = getenv("PGHOST")) != NULL && *env != '\0')
        pghost = env;
    if ((env = getenv("PGPORT")) != NULL && *env != '\0')
        pgport = env;
    else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
        login = env;

    state = (CState *) pg_malloc(sizeof(CState));
    memset(state, 0, sizeof(CState));

    while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
    {
        switch (c)
        {
            case 'i':
                is_init_mode++;
                break;
            case 'h':
                pghost = pg_strdup(optarg);
                break;
            case 'n':
                is_no_vacuum++;
                break;
            case 'v':
                do_vacuum_accounts++;
                break;
            case 'p':
                pgport = pg_strdup(optarg);
                break;
            case 'd':
                debug++;
                break;
            case 'S':
                ttype = 1;
                break;
            case 'N':
                ttype = 2;
                break;
            case 'c':
                nclients = atoi(optarg);
                if (nclients <= 0 || nclients > MAXCLIENTS)
                {
                    fprintf(stderr, "invalid number of clients: %d\n", nclients);
                    exit(1);
                }
#ifdef HAVE_GETRLIMIT
#ifdef RLIMIT_NOFILE            /* most platforms use RLIMIT_NOFILE */
                if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
#else                           /* but BSD doesn't ... */
                if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
#endif   /* RLIMIT_NOFILE */
                {
                    fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
                    exit(1);
                }
                if (rlim.rlim_cur <= (nclients + 2))
                {
                    fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
                    fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
                    exit(1);
                }
#endif   /* HAVE_GETRLIMIT */
                break;
            case 'j':           /* jobs */
                nthreads = atoi(optarg);
                if (nthreads <= 0)
                {
                    fprintf(stderr, "invalid number of threads: %d\n", nthreads);
                    exit(1);
                }
                break;
            case 'C':
                is_connect = true;
                break;
            case 'r':
                is_latencies = true;
                break;
            case 's':
                scale_given = true;
                scale = atoi(optarg);
                if (scale <= 0)
                {
                    fprintf(stderr, "invalid scaling factor: %d\n", scale);
                    exit(1);
                }
                break;
            case 't':
                if (duration > 0)
                {
                    fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
                    exit(1);
                }
                nxacts = atoi(optarg);
                if (nxacts <= 0)
                {
                    fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
                    exit(1);
                }
                break;
            case 'T':
                if (nxacts > 0)
                {
                    fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
                    exit(1);
                }
                duration = atoi(optarg);
                if (duration <= 0)
                {
                    fprintf(stderr, "invalid duration: %d\n", duration);
                    exit(1);
                }
                break;
            case 'U':
                login = pg_strdup(optarg);
                break;
            case 'l':
                use_log = true;
                break;
            case 'q':
                use_quiet = true;
                break;
            case 'f':
                ttype = 3;
                filename = pg_strdup(optarg);
                if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
                    exit(1);
                break;
            case 'D':
                {
                    char       *p;

                    if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
                    {
                        fprintf(stderr, "invalid variable definition: %s\n", optarg);
                        exit(1);
                    }

                    *p++ = '\0';
                    if (!putVariable(&state[0], "option", optarg, p))
                        exit(1);
                }
                break;
            case 'F':
                fillfactor = atoi(optarg);
                if ((fillfactor < 10) || (fillfactor > 100))
                {
                    fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
                    exit(1);
                }
                break;
            case 'M':
                if (num_files > 0)
                {
                    fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
                    exit(1);
                }
                for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
                    if (strcmp(optarg, QUERYMODE[querymode]) == 0)
                        break;
                if (querymode >= NUM_QUERYMODE)
                {
                    fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
                    exit(1);
                }
                break;
            case 0:
                /* This covers long options which take no argument. */
                break;
            case 2:             /* tablespace */
                tablespace = pg_strdup(optarg);
                break;
            case 3:             /* index-tablespace */
                index_tablespace = pg_strdup(optarg);
                break;
            case 4:
                sample_rate = atof(optarg);
                if (sample_rate <= 0.0 || sample_rate > 1.0)
                {
                    fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
                    exit(1);
                }
                break;
            case 5:
#ifdef WIN32
                fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
                exit(1);
#else
                agg_interval = atoi(optarg);
                if (agg_interval <= 0)
                {
                    fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
                    exit(1);
                }
#endif
                break;
            default:
                fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
                exit(1);
                break;
        }
    }

    if (argc > optind)
        dbName = argv[optind];
    else
    {
        if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
            dbName = env;
        else if (login != NULL && *login != '\0')
            dbName = login;
        else
            dbName = "";
    }

    if (is_init_mode)
    {
        init(is_no_vacuum);
        exit(0);
    }

    /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
    if (nxacts <= 0 && duration <= 0)
        nxacts = DEFAULT_NXACTS;

    if (nclients % nthreads != 0)
    {
        fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
        exit(1);
    }

    /* --sampling-rate may be used only with -l */
    if (sample_rate > 0.0 && !use_log)
    {
        fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
        exit(1);
    }

    /* -q may be used only with -i */
    if (use_quiet && !is_init_mode)
    {
        fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
        exit(1);
    }

    /* --sampling-rate may must not be used with --aggregate-interval */
    if (sample_rate > 0.0 && agg_interval > 0)
    {
        fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
        exit(1);
    }

    if (agg_interval > 0 && (! use_log)) {
        fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
        exit(1);
    }

    if ((duration > 0) && (agg_interval > duration)) {
        fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
        exit(1);
    }

    if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0)) {
        fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
        exit(1);
    }

    /*
     * is_latencies only works with multiple threads in thread-based
     * implementations, not fork-based ones, because it supposes that the
     * parent can see changes made to the per-thread execution stats by child
     * threads.  It seems useful enough to accept despite this limitation, but
     * perhaps we should FIXME someday (by passing the stats data back up
     * through the parent-to-child pipes).
     */
#ifndef ENABLE_THREAD_SAFETY
    if (is_latencies && nthreads > 1)
    {
        fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
        exit(1);
    }
#endif

    /*
     * save main process id in the global variable because process id will be
     * changed after fork.
     */
    main_pid = (int) getpid();

    if (nclients > 1)
    {
        state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
        memset(state + 1, 0, sizeof(CState) * (nclients - 1));

        /* copy any -D switch values to all clients */
        for (i = 1; i < nclients; i++)
        {
            int         j;

            state[i].id = i;
            for (j = 0; j < state[0].nvariables; j++)
            {
                if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
                    exit(1);
            }
        }
    }

    if (debug)
    {
        if (duration <= 0)
            printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
                   pghost, pgport, nclients, nxacts, dbName);
        else
            printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
                   pghost, pgport, nclients, duration, dbName);
    }

    /* opening connection... */
    con = doConnect();
    if (con == NULL)
        exit(1);

    if (PQstatus(con) == CONNECTION_BAD)
    {
        fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
        fprintf(stderr, "%s", PQerrorMessage(con));
        exit(1);
    }

    if (ttype != 3)
    {
        /*
         * get the scaling factor that should be same as count(*) from
         * pgbench_branches if this is not a custom query
         */
        res = PQexec(con, "select count(*) from pgbench_branches");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
            fprintf(stderr, "%s", PQerrorMessage(con));
            exit(1);
        }
        scale = atoi(PQgetvalue(res, 0, 0));
        if (scale < 0)
        {
            fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
            exit(1);
        }
        PQclear(res);

        /* warn if we override user-given -s switch */
        if (scale_given)
            fprintf(stderr,
            "Scale option ignored, using pgbench_branches table count = %d\n",
                    scale);
    }

    /*
     * :scale variables normally get -s or database scale, but don't override
     * an explicit -D switch
     */
    if (getVariable(&state[0], "scale") == NULL)
    {
        snprintf(val, sizeof(val), "%d", scale);
        for (i = 0; i < nclients; i++)
        {
            if (!putVariable(&state[i], "startup", "scale", val))
                exit(1);
        }
    }

    if (!is_no_vacuum)
    {
        fprintf(stderr, "starting vacuum...");
        executeStatement(con, "vacuum pgbench_branches");
        executeStatement(con, "vacuum pgbench_tellers");
        executeStatement(con, "truncate pgbench_history");
        fprintf(stderr, "end.\n");

        if (do_vacuum_accounts)
        {
            fprintf(stderr, "starting vacuum pgbench_accounts...");
            executeStatement(con, "vacuum analyze pgbench_accounts");
            fprintf(stderr, "end.\n");
        }
    }
    PQfinish(con);

    /* set random seed */
    INSTR_TIME_SET_CURRENT(start_time);
    srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));

    /* process builtin SQL scripts */
    switch (ttype)
    {
        case 0:
            sql_files[0] = process_builtin(tpc_b);
            num_files = 1;
            break;

        case 1:
            sql_files[0] = process_builtin(select_only);
            num_files = 1;
            break;

        case 2:
            sql_files[0] = process_builtin(simple_update);
            num_files = 1;
            break;

        default:
            break;
    }

    /* set up thread data structures */
    threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
    for (i = 0; i < nthreads; i++)
    {
        TState     *thread = &threads[i];

        thread->tid = i;
        thread->state = &state[nclients / nthreads * i];
        thread->nstate = nclients / nthreads;
        thread->random_state[0] = random();
        thread->random_state[1] = random();
        thread->random_state[2] = random();

        if (is_latencies)
        {
            /* Reserve memory for the thread to store per-command latencies */
            int         t;

            thread->exec_elapsed = (instr_time *)
                pg_malloc(sizeof(instr_time) * num_commands);
            thread->exec_count = (int *)
                pg_malloc(sizeof(int) * num_commands);

            for (t = 0; t < num_commands; t++)
            {
                INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
                thread->exec_count[t] = 0;
            }
        }
        else
        {
            thread->exec_elapsed = NULL;
            thread->exec_count = NULL;
        }
    }

    /* get start up time */
    INSTR_TIME_SET_CURRENT(start_time);

    /* set alarm if duration is specified. */
    if (duration > 0)
        setalarm(duration);

    /* start threads */
    for (i = 0; i < nthreads; i++)
    {
        TState     *thread = &threads[i];

        INSTR_TIME_SET_CURRENT(thread->start_time);

        /* the first thread (i = 0) is executed by main thread */
        if (i > 0)
        {
            int         err = pthread_create(&thread->thread, NULL, threadRun, thread);

            if (err != 0 || thread->thread == INVALID_THREAD)
            {
                fprintf(stderr, "cannot create thread: %s\n", strerror(err));
                exit(1);
            }
        }
        else
        {
            thread->thread = INVALID_THREAD;
        }
    }

    /* wait for threads and accumulate results */
    total_xacts = 0;
    INSTR_TIME_SET_ZERO(conn_total_time);
    for (i = 0; i < nthreads; i++)
    {
        void       *ret = NULL;

        if (threads[i].thread == INVALID_THREAD)
            ret = threadRun(&threads[i]);
        else
            pthread_join(threads[i].thread, &ret);

        if (ret != NULL)
        {
            TResult    *r = (TResult *) ret;

            total_xacts += r->xacts;
            INSTR_TIME_ADD(conn_total_time, r->conn_time);
            free(ret);
        }
    }
    disconnect_all(state, nclients);

    /* get end time */
    INSTR_TIME_SET_CURRENT(total_time);
    INSTR_TIME_SUBTRACT(total_time, start_time);
    printResults(ttype, total_xacts, nclients, threads, nthreads,
                 total_time, conn_total_time);

    return 0;
}

static bool parseQuery ( Command cmd,
const char *  raw_sql 
) [static]

Definition at line 1680 of file pgbench.c.

References Command::argc, Command::argv, MAX_ARGS, name, NULL, parseVariable(), pg_strdup(), and replaceVariable().

Referenced by process_commands().

{
    char       *sql,
               *p;

    sql = pg_strdup(raw_sql);
    cmd->argc = 1;

    p = sql;
    while ((p = strchr(p, ':')) != NULL)
    {
        char        var[12];
        char       *name;
        int         eaten;

        name = parseVariable(p, &eaten);
        if (name == NULL)
        {
            while (*p == ':')
            {
                p++;
            }
            continue;
        }

        if (cmd->argc >= MAX_ARGS)
        {
            fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
            return false;
        }

        sprintf(var, "$%d", cmd->argc);
        p = replaceVariable(&sql, p, eaten, var);

        cmd->argv[cmd->argc] = name;
        cmd->argc++;
    }

    cmd->argv[0] = sql;
    return true;
}

static char* parseVariable ( const char *  sql,
int *  eaten 
) [static]

Definition at line 665 of file pgbench.c.

References i, name, and pg_malloc().

Referenced by assignVariables(), and parseQuery().

{
    int         i = 0;
    char       *name;

    do
    {
        i++;
    } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
    if (i == 1)
        return NULL;

    name = pg_malloc(i);
    memcpy(name, &sql[1], i - 1);
    name[i - 1] = '\0';

    *eaten = i;
    return name;
}

static void preparedStatementName ( char *  buffer,
int  file,
int  state 
) [static]

Definition at line 858 of file pgbench.c.

Referenced by doCustom().

{
    sprintf(buffer, "P%d_%d", file, state);
}

static void printResults ( int  ttype,
int  normal_xacts,
int  nclients,
TState threads,
int  nthreads,
instr_time  total_time,
instr_time  conn_total_time 
) [static]

Definition at line 1989 of file pgbench.c.

References Command::command_num, TState::exec_count, TState::exec_elapsed, i, INSTR_TIME_ADD, INSTR_TIME_GET_DOUBLE, INSTR_TIME_GET_MILLISEC, INSTR_TIME_SET_ZERO, and Command::line.

Referenced by main().

{
    double      time_include,
                tps_include,
                tps_exclude;
    char       *s;

    time_include = INSTR_TIME_GET_DOUBLE(total_time);
    tps_include = normal_xacts / time_include;
    tps_exclude = normal_xacts / (time_include -
                        (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));

    if (ttype == 0)
        s = "TPC-B (sort of)";
    else if (ttype == 2)
        s = "Update only pgbench_accounts";
    else if (ttype == 1)
        s = "SELECT only";
    else
        s = "Custom query";

    printf("transaction type: %s\n", s);
    printf("scaling factor: %d\n", scale);
    printf("query mode: %s\n", QUERYMODE[querymode]);
    printf("number of clients: %d\n", nclients);
    printf("number of threads: %d\n", nthreads);
    if (duration <= 0)
    {
        printf("number of transactions per client: %d\n", nxacts);
        printf("number of transactions actually processed: %d/%d\n",
               normal_xacts, nxacts * nclients);
    }
    else
    {
        printf("duration: %d s\n", duration);
        printf("number of transactions actually processed: %d\n",
               normal_xacts);
    }
    printf("tps = %f (including connections establishing)\n", tps_include);
    printf("tps = %f (excluding connections establishing)\n", tps_exclude);

    /* Report per-command latencies */
    if (is_latencies)
    {
        int         i;

        for (i = 0; i < num_files; i++)
        {
            Command   **commands;

            if (num_files > 1)
                printf("statement latencies in milliseconds, file %d:\n", i + 1);
            else
                printf("statement latencies in milliseconds:\n");

            for (commands = sql_files[i]; *commands != NULL; commands++)
            {
                Command    *command = *commands;
                int         cnum = command->command_num;
                double      total_time;
                instr_time  total_exec_elapsed;
                int         total_exec_count;
                int         t;

                /* Accumulate per-thread data for command */
                INSTR_TIME_SET_ZERO(total_exec_elapsed);
                total_exec_count = 0;
                for (t = 0; t < nthreads; t++)
                {
                    TState     *thread = &threads[t];

                    INSTR_TIME_ADD(total_exec_elapsed,
                                   thread->exec_elapsed[cnum]);
                    total_exec_count += thread->exec_count[cnum];
                }

                if (total_exec_count > 0)
                    total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
                else
                    total_time = 0.0;

                printf("\t%f\t%s\n", total_time, command->line);
            }
        }
    }
}

static Command** process_builtin ( char *  tb  )  [static]

Definition at line 1937 of file pgbench.c.

References buf, NULL, pg_malloc(), pg_realloc(), and process_commands().

Referenced by main().

{
#define COMMANDS_ALLOC_NUM 128

    Command   **my_commands;
    int         lineno;
    char        buf[BUFSIZ];
    int         alloc_num;

    alloc_num = COMMANDS_ALLOC_NUM;
    my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);

    lineno = 0;

    for (;;)
    {
        char       *p;
        Command    *command;

        p = buf;
        while (*tb && *tb != '\n')
            *p++ = *tb++;

        if (*tb == '\0')
            break;

        if (*tb == '\n')
            tb++;

        *p = '\0';

        command = process_commands(buf);
        if (command == NULL)
            continue;

        my_commands[lineno] = command;
        lineno++;

        if (lineno >= alloc_num)
        {
            alloc_num += COMMANDS_ALLOC_NUM;
            my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
        }
    }

    my_commands[lineno] = NULL;

    return my_commands;
}

static Command* process_commands ( char *  buf  )  [static]

Definition at line 1724 of file pgbench.c.

References Command::argc, Command::argv, Command::command_num, Command::line, NULL, parseQuery(), pg_malloc(), pg_strcasecmp(), pg_strdup(), QUERY_EXTENDED, QUERY_PREPARED, QUERY_SIMPLE, and Command::type.

Referenced by process_builtin(), and process_file().

{
    const char  delim[] = " \f\n\r\t\v";

    Command    *my_commands;
    int         j;
    char       *p,
               *tok;

    /* Make the string buf end at the next newline */
    if ((p = strchr(buf, '\n')) != NULL)
        *p = '\0';

    /* Skip leading whitespace */
    p = buf;
    while (isspace((unsigned char) *p))
        p++;

    /* If the line is empty or actually a comment, we're done */
    if (*p == '\0' || strncmp(p, "--", 2) == 0)
        return NULL;

    /* Allocate and initialize Command structure */
    my_commands = (Command *) pg_malloc(sizeof(Command));
    my_commands->line = pg_strdup(buf);
    my_commands->command_num = num_commands++;
    my_commands->type = 0;      /* until set */
    my_commands->argc = 0;

    if (*p == '\\')
    {
        my_commands->type = META_COMMAND;

        j = 0;
        tok = strtok(++p, delim);

        while (tok != NULL)
        {
            my_commands->argv[j++] = pg_strdup(tok);
            my_commands->argc++;
            tok = strtok(NULL, delim);
        }

        if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
        {
            if (my_commands->argc < 4)
            {
                fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
                exit(1);
            }

            for (j = 4; j < my_commands->argc; j++)
                fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
                        my_commands->argv[0], my_commands->argv[j]);
        }
        else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
        {
            if (my_commands->argc < 3)
            {
                fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
                exit(1);
            }

            for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
                fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
                        my_commands->argv[0], my_commands->argv[j]);
        }
        else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
        {
            if (my_commands->argc < 2)
            {
                fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
                exit(1);
            }

            /*
             * Split argument into number and unit to allow "sleep 1ms" etc.
             * We don't have to terminate the number argument with null
             * because it will be parsed with atoi, which ignores trailing
             * non-digit characters.
             */
            if (my_commands->argv[1][0] != ':')
            {
                char       *c = my_commands->argv[1];

                while (isdigit((unsigned char) *c))
                    c++;
                if (*c)
                {
                    my_commands->argv[2] = c;
                    if (my_commands->argc < 3)
                        my_commands->argc = 3;
                }
            }

            if (my_commands->argc >= 3)
            {
                if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
                    pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
                    pg_strcasecmp(my_commands->argv[2], "s") != 0)
                {
                    fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
                            my_commands->argv[0], my_commands->argv[2]);
                    exit(1);
                }
            }

            for (j = 3; j < my_commands->argc; j++)
                fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
                        my_commands->argv[0], my_commands->argv[j]);
        }
        else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
        {
            if (my_commands->argc < 3)
            {
                fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
                exit(1);
            }
        }
        else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
        {
            if (my_commands->argc < 1)
            {
                fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
                exit(1);
            }
        }
        else
        {
            fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
            exit(1);
        }
    }
    else
    {
        my_commands->type = SQL_COMMAND;

        switch (querymode)
        {
            case QUERY_SIMPLE:
                my_commands->argv[0] = pg_strdup(p);
                my_commands->argc++;
                break;
            case QUERY_EXTENDED:
            case QUERY_PREPARED:
                if (!parseQuery(my_commands, p))
                    exit(1);
                break;
            default:
                exit(1);
        }
    }

    return my_commands;
}

static int process_file ( char *  filename  )  [static]

Definition at line 1881 of file pgbench.c.

References buf, MAX_FILES, NULL, pg_malloc(), pg_realloc(), process_commands(), and strerror().

Referenced by exec_command(), main(), and process_psqlrc_file().

{
#define COMMANDS_ALLOC_NUM 128

    Command   **my_commands;
    FILE       *fd;
    int         lineno;
    char        buf[BUFSIZ];
    int         alloc_num;

    if (num_files >= MAX_FILES)
    {
        fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
        exit(1);
    }

    alloc_num = COMMANDS_ALLOC_NUM;
    my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);

    if (strcmp(filename, "-") == 0)
        fd = stdin;
    else if ((fd = fopen(filename, "r")) == NULL)
    {
        fprintf(stderr, "%s: %s\n", filename, strerror(errno));
        return false;
    }

    lineno = 0;

    while (fgets(buf, sizeof(buf), fd) != NULL)
    {
        Command    *command;

        command = process_commands(buf);
        if (command == NULL)
            continue;

        my_commands[lineno] = command;
        lineno++;

        if (lineno >= alloc_num)
        {
            alloc_num += COMMANDS_ALLOC_NUM;
            my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
        }
    }
    fclose(fd);

    my_commands[lineno] = NULL;

    sql_files[num_files++] = my_commands;

    return true;
}

static int pthread_create ( pthread_t thread,
pthread_attr_t attr,
void *(*)(void *)  start_routine,
void *  arg 
) [static]

Definition at line 2879 of file pgbench.c.

References close, free, pg_malloc(), fork_pthread::pid, fork_pthread::pipes, setalarm(), and write.

{
    fork_pthread *th;
    void       *ret;

    th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
    if (pipe(th->pipes) < 0)
    {
        free(th);
        return errno;
    }

    th->pid = fork();
    if (th->pid == -1)          /* error */
    {
        free(th);
        return errno;
    }
    if (th->pid != 0)           /* in parent process */
    {
        close(th->pipes[1]);
        *thread = th;
        return 0;
    }

    /* in child process */
    close(th->pipes[0]);

    /* set alarm again because the child does not inherit timers */
    if (duration > 0)
        setalarm(duration);

    ret = start_routine(arg);
    write(th->pipes[1], ret, sizeof(TResult));
    close(th->pipes[1]);
    free(th);
    exit(0);
}

static int pthread_join ( pthread_t  th,
void **  thread_return 
) [static]

Definition at line 2922 of file pgbench.c.

References close, EINTR, free, NULL, pg_malloc(), fork_pthread::pid, fork_pthread::pipes, and read.

{
    int         status;

    while (waitpid(th->pid, &status, 0) != th->pid)
    {
        if (errno != EINTR)
            return errno;
    }

    if (thread_return != NULL)
    {
        /* assume result is TResult */
        *thread_return = pg_malloc(sizeof(TResult));
        if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
        {
            free(*thread_return);
            *thread_return = NULL;
        }
    }
    close(th->pipes[0]);

    free(th);
    return 0;
}

static int putVariable ( CState st,
const char *  context,
char *  name,
char *  value 
) [static]

Definition at line 602 of file pgbench.c.

References compareVariables(), free, isLegalVariableName(), Variable::name, NULL, CState::nvariables, pg_malloc(), pg_realloc(), pg_strdup(), qsort, val, Variable::value, and CState::variables.

Referenced by doCustom(), main(), and runShellCommand().

{
    Variable    key,
               *var;

    key.name = name;
    /* On some versions of Solaris, bsearch of zero items dumps core */
    if (st->nvariables > 0)
        var = (Variable *) bsearch((void *) &key,
                                   (void *) st->variables,
                                   st->nvariables,
                                   sizeof(Variable),
                                   compareVariables);
    else
        var = NULL;

    if (var == NULL)
    {
        Variable   *newvars;

        /*
         * Check for the name only when declaring a new variable to avoid
         * overhead.
         */
        if (!isLegalVariableName(name))
        {
            fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
            return false;
        }

        if (st->variables)
            newvars = (Variable *) pg_realloc(st->variables,
                                    (st->nvariables + 1) * sizeof(Variable));
        else
            newvars = (Variable *) pg_malloc(sizeof(Variable));

        st->variables = newvars;

        var = &newvars[st->nvariables];

        var->name = pg_strdup(name);
        var->value = pg_strdup(value);

        st->nvariables++;

        qsort((void *) st->variables, st->nvariables, sizeof(Variable),
              compareVariables);
    }
    else
    {
        char       *val;

        /* dup then free, in case value is pointing at this variable */
        val = pg_strdup(value);

        free(var->value);
        var->value = val;
    }

    return true;
}

static char* replaceVariable ( char **  sql,
char *  param,
int  len,
char *  value 
) [static]

Definition at line 686 of file pgbench.c.

References memmove, and pg_realloc().

Referenced by assignVariables(), and parseQuery().

{
    int         valueln = strlen(value);

    if (valueln > len)
    {
        size_t      offset = param - *sql;

        *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
        param = *sql + offset;
    }

    if (valueln != len)
        memmove(param + valueln, param + len, strlen(param + len) + 1);
    strncpy(param, value, valueln);

    return param + valueln;
}

static bool runShellCommand ( CState st,
char *  variable,
char **  argv,
int  argc 
) [static]

Definition at line 755 of file pgbench.c.

References arg, getVariable(), i, NULL, putVariable(), SHELL_COMMAND_SIZE, snprintf(), and system().

Referenced by doCustom().

{
    char        command[SHELL_COMMAND_SIZE];
    int         i,
                len = 0;
    FILE       *fp;
    char        res[64];
    char       *endptr;
    int         retval;

    /*----------
     * Join arguments with whitespace separators. Arguments starting with
     * exactly one colon are treated as variables:
     *  name - append a string "name"
     *  :var - append a variable named 'var'
     *  ::name - append a string ":name"
     *----------
     */
    for (i = 0; i < argc; i++)
    {
        char       *arg;
        int         arglen;

        if (argv[i][0] != ':')
        {
            arg = argv[i];      /* a string literal */
        }
        else if (argv[i][1] == ':')
        {
            arg = argv[i] + 1;  /* a string literal starting with colons */
        }
        else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
        {
            fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
            return false;
        }

        arglen = strlen(arg);
        if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
        {
            fprintf(stderr, "%s: too long shell command\n", argv[0]);
            return false;
        }

        if (i > 0)
            command[len++] = ' ';
        memcpy(command + len, arg, arglen);
        len += arglen;
    }

    command[len] = '\0';

    /* Fast path for non-assignment case */
    if (variable == NULL)
    {
        if (system(command))
        {
            if (!timer_exceeded)
                fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
            return false;
        }
        return true;
    }

    /* Execute the command with pipe and read the standard output. */
    if ((fp = popen(command, "r")) == NULL)
    {
        fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
        return false;
    }
    if (fgets(res, sizeof(res), fp) == NULL)
    {
        if (!timer_exceeded)
            fprintf(stderr, "%s: cannot read the result\n", argv[0]);
        return false;
    }
    if (pclose(fp) < 0)
    {
        fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
        return false;
    }

    /* Check whether the result is an integer and assign it to the variable */
    retval = (int) strtol(res, &endptr, 10);
    while (*endptr != '\0' && isspace((unsigned char) *endptr))
        endptr++;
    if (*res == '\0' || *endptr != '\0')
    {
        fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
        return false;
    }
    snprintf(res, sizeof(res), "%d", retval);
    if (!putVariable(st, "setshell", variable, res))
        return false;

#ifdef DEBUG
    printf("shell parameter name: %s, value: %s\n", argv[1], res);
#endif
    return true;
}

static void setalarm ( int  seconds  )  [static]

Definition at line 2860 of file pgbench.c.

References handle_sig_alarm(), pqsignal(), and SIGALRM.

Referenced by main(), and pthread_create().

{
    pqsignal(SIGALRM, handle_sig_alarm);
    alarm(seconds);
}

static int64 strtoint64 ( const char *  str  )  [static]

Definition at line 384 of file pgbench.c.

References INT64CONST, and sign.

Referenced by doCustom().

{
    const char *ptr = str;
    int64       result = 0;
    int         sign = 1;

    /*
     * Do our own scan, rather than relying on sscanf which might be broken
     * for long long.
     */

    /* skip leading spaces */
    while (*ptr && isspace((unsigned char) *ptr))
        ptr++;

    /* handle sign */
    if (*ptr == '-')
    {
        ptr++;

        /*
         * Do an explicit check for INT64_MIN.  Ugly though this is, it's
         * cleaner than trying to get the loop below to handle it portably.
         */
        if (strncmp(ptr, "9223372036854775808", 19) == 0)
        {
            result = -INT64CONST(0x7fffffffffffffff) - 1;
            ptr += 19;
            goto gotdigits;
        }
        sign = -1;
    }
    else if (*ptr == '+')
        ptr++;

    /* require at least one digit */
    if (!isdigit((unsigned char) *ptr))
        fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);

    /* process digits */
    while (*ptr && isdigit((unsigned char) *ptr))
    {
        int64       tmp = result * 10 + (*ptr++ - '0');

        if ((tmp / 10) != result)       /* overflow? */
            fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
        result = tmp;
    }

gotdigits:

    /* allow trailing whitespace, but not other trailing chars */
    while (*ptr != '\0' && isspace((unsigned char) *ptr))
        ptr++;

    if (*ptr != '\0')
        fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);

    return ((sign < 0) ? -result : result);
}

static void * threadRun ( void *  arg  )  [static]

Definition at line 2658 of file pgbench.c.

References agg_vals_init(), CState::cnt, CState::con, TResult::conn_time, disconnect_all(), doConnect(), doCustom(), CState::ecnt, EINTR, getrand(), i, INSTR_TIME_ACCUM_DIFF, INSTR_TIME_GET_MICROSEC, INSTR_TIME_SET_CURRENT, INSTR_TIME_SET_ZERO, INSTR_TIME_SUBTRACT, INT64_MAX, logfile, META_COMMAND, TState::nstate, NULL, pg_malloc(), PQfinish(), PQsocket(), select, CState::sleeping, snprintf(), TState::start_time, CState::state, TState::state, strerror(), TState::tid, Command::type, CState::until, CState::use_file, and TResult::xacts.

Referenced by main().

{
    TState     *thread = (TState *) arg;
    CState     *state = thread->state;
    TResult    *result;
    FILE       *logfile = NULL; /* per-thread log file */
    instr_time  start,
                end;
    int         nstate = thread->nstate;
    int         remains = nstate;       /* number of remaining clients */
    int         i;

    AggVals     aggs;

    result = pg_malloc(sizeof(TResult));
    
    INSTR_TIME_SET_ZERO(result->conn_time);

    /* open log file if requested */
    if (use_log)
    {
        char        logpath[64];

        if (thread->tid == 0)
            snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
        else
            snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
        logfile = fopen(logpath, "w");

        if (logfile == NULL)
        {
            fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
            goto done;
        }
    }

    if (!is_connect)
    {
        /* make connections to the database */
        for (i = 0; i < nstate; i++)
        {
            if ((state[i].con = doConnect()) == NULL)
                goto done;
        }
    }

    /* time after thread and connections set up */
    INSTR_TIME_SET_CURRENT(result->conn_time);
    INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);

    agg_vals_init(&aggs, thread->start_time);
    
    /* send start up queries in async manner */
    for (i = 0; i < nstate; i++)
    {
        CState     *st = &state[i];
        Command   **commands = sql_files[st->use_file];
        int         prev_ecnt = st->ecnt;

        st->use_file = getrand(thread, 0, num_files - 1);
        if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
            remains--;          /* I've aborted */

        if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
        {
            fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
            remains--;          /* I've aborted */
            PQfinish(st->con);
            st->con = NULL;
        }
    }

    while (remains > 0)
    {
        fd_set      input_mask;
        int         maxsock;    /* max socket number to be waited */
        int64       now_usec = 0;
        int64       min_usec;

        FD_ZERO(&input_mask);

        maxsock = -1;
        min_usec = INT64_MAX;
        for (i = 0; i < nstate; i++)
        {
            CState     *st = &state[i];
            Command   **commands = sql_files[st->use_file];
            int         sock;

            if (st->sleeping)
            {
                int         this_usec;

                if (min_usec == INT64_MAX)
                {
                    instr_time  now;

                    INSTR_TIME_SET_CURRENT(now);
                    now_usec = INSTR_TIME_GET_MICROSEC(now);
                }

                this_usec = st->until - now_usec;
                if (min_usec > this_usec)
                    min_usec = this_usec;
            }
            else if (st->con == NULL)
            {
                continue;
            }
            else if (commands[st->state]->type == META_COMMAND)
            {
                min_usec = 0;   /* the connection is ready to run */
                break;
            }

            sock = PQsocket(st->con);
            if (sock < 0)
            {
                fprintf(stderr, "bad socket: %s\n", strerror(errno));
                goto done;
            }

            FD_SET(sock, &input_mask);

            if (maxsock < sock)
                maxsock = sock;
        }

        if (min_usec > 0 && maxsock != -1)
        {
            int         nsocks; /* return from select(2) */

            if (min_usec != INT64_MAX)
            {
                struct timeval timeout;

                timeout.tv_sec = min_usec / 1000000;
                timeout.tv_usec = min_usec % 1000000;
                nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
            }
            else
                nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
            if (nsocks < 0)
            {
                if (errno == EINTR)
                    continue;
                /* must be something wrong */
                fprintf(stderr, "select failed: %s\n", strerror(errno));
                goto done;
            }
        }

        /* ok, backend returns reply */
        for (i = 0; i < nstate; i++)
        {
            CState     *st = &state[i];
            Command   **commands = sql_files[st->use_file];
            int         prev_ecnt = st->ecnt;

            if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
                            || commands[st->state]->type == META_COMMAND))
            {
                if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
                    remains--;  /* I've aborted */
            }

            if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
            {
                fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
                remains--;      /* I've aborted */
                PQfinish(st->con);
                st->con = NULL;
            }
        }
    }

done:
    INSTR_TIME_SET_CURRENT(start);
    disconnect_all(state, nstate);
    result->xacts = 0;
    for (i = 0; i < nstate; i++)
        result->xacts += state[i].cnt;
    INSTR_TIME_SET_CURRENT(end);
    INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
    if (logfile)
        fclose(logfile);
    return result;
}

static void usage ( void   )  [static]

Definition at line 324 of file pgbench.c.

{
    printf("%s is a benchmarking tool for PostgreSQL.\n\n"
           "Usage:\n"
           "  %s [OPTION]... [DBNAME]\n"
           "\nInitialization options:\n"
           "  -i           invokes initialization mode\n"
           "  -n           do not run VACUUM after initialization\n"
           "  -F NUM       fill factor\n"
           "  -s NUM       scaling factor\n"
           "  -q           quiet logging (one message each 5 seconds)\n"
           "  --foreign-keys\n"
           "               create foreign key constraints between tables\n"
           "  --index-tablespace=TABLESPACE\n"
           "               create indexes in the specified tablespace\n"
           "  --tablespace=TABLESPACE\n"
           "               create tables in the specified tablespace\n"
           "  --unlogged-tables\n"
           "               create tables as unlogged tables\n"
           "\nBenchmarking options:\n"
        "  -c NUM       number of concurrent database clients (default: 1)\n"
           "  -C           establish new connection for each transaction\n"
           "  -D VARNAME=VALUE\n"
           "               define variable for use by custom script\n"
           "  -f FILENAME  read transaction script from FILENAME\n"
           "  -j NUM       number of threads (default: 1)\n"
           "  -l           write transaction times to log file\n"
           "  --sampling-rate NUM\n"
           "               fraction of transactions to log (e.g. 0.01 for 1%% sample)\n"
           "  --aggregate-interval NUM\n"
           "               aggregate data over NUM seconds\n"
           "  -M simple|extended|prepared\n"
           "               protocol for submitting queries to server (default: simple)\n"
           "  -n           do not run VACUUM before tests\n"
           "  -N           do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
           "  -r           report average latency per command\n"
           "  -s NUM       report this scale factor in output\n"
           "  -S           perform SELECT-only transactions\n"
     "  -t NUM       number of transactions each client runs (default: 10)\n"
           "  -T NUM       duration of benchmark test in seconds\n"
           "  -v           vacuum all four standard tables before tests\n"
           "\nCommon options:\n"
           "  -d             print debugging output\n"
           "  -h HOSTNAME    database server host or socket directory\n"
           "  -p PORT        database server port number\n"
           "  -U USERNAME    connect as specified database user\n"
           "  -V, --version  output version information, then exit\n"
           "  -?, --help     show this help, then exit\n"
           "\n"
           "Report bugs to <[email protected]>.\n",
           progname, progname);
}


Variable Documentation

Definition at line 165 of file pgbench.c.

char* dbName

Definition at line 173 of file pgbench.c.

Referenced by pg_database_size_name().

int debug = 0 [static]

Definition at line 276 of file pgbench.c.

int duration = 0

Definition at line 110 of file pgbench.c.

Referenced by pgss_ProcessUtility().

int fillfactor = 100

Definition at line 122 of file pgbench.c.

Referenced by default_reloptions(), gistbuild(), and gistoptions().

int foreign_keys = 0

Definition at line 127 of file pgbench.c.

char* index_tablespace = NULL

Definition at line 143 of file pgbench.c.

Definition at line 166 of file pgbench.c.

Definition at line 167 of file pgbench.c.

char* login = NULL

Definition at line 172 of file pgbench.c.

Referenced by main().

int main_pid

Definition at line 168 of file pgbench.c.

int num_commands = 0 [static]

Definition at line 275 of file pgbench.c.

int num_files [static]

Definition at line 274 of file pgbench.c.

int nxacts = 0

Definition at line 109 of file pgbench.c.

char* optarg

Definition at line 51 of file getopt.c.

int optind

Definition at line 49 of file getopt.c.

char* pghost = ""

Definition at line 170 of file pgbench.c.

Referenced by CloneArchive(), initialize_environment(), and main().

char* pgport = ""

Definition at line 171 of file pgbench.c.

Referenced by CloneArchive(), initialize_environment(), and main().

const char* progname

Definition at line 174 of file pgbench.c.

const char* QUERYMODE[] = {"simple", "extended", "prepared"} [static]

Definition at line 250 of file pgbench.c.

QueryMode querymode = QUERY_SIMPLE [static]

Definition at line 249 of file pgbench.c.

double sample_rate = 0.0

Definition at line 137 of file pgbench.c.

int scale = 1
char* select_only [static]
Initial value:
 {
    "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
    "\\setrandom aid 1 :naccounts\n"
    "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
}

Definition at line 313 of file pgbench.c.

char* simple_update [static]
Initial value:
 {
    "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
    "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
    "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
    "\\setrandom aid 1 :naccounts\n"
    "\\setrandom bid 1 :nbranches\n"
    "\\setrandom tid 1 :ntellers\n"
    "\\setrandom delta -5000 5000\n"
    "BEGIN;\n"
    "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
    "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
    "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
    "END;\n"
}

Definition at line 297 of file pgbench.c.

Command** sql_files[MAX_FILES] [static]

Definition at line 273 of file pgbench.c.

char* tablespace = NULL

Definition at line 142 of file pgbench.c.

Referenced by describeOneTableDetails(), dumpDatabase(), main(), and shdepLockAndCheckObject().

volatile bool timer_exceeded = false

Definition at line 176 of file pgbench.c.

char* tpc_b [static]
Initial value:
 {
    "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
    "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
    "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
    "\\setrandom aid 1 :naccounts\n"
    "\\setrandom bid 1 :nbranches\n"
    "\\setrandom tid 1 :ntellers\n"
    "\\setrandom delta -5000 5000\n"
    "BEGIN;\n"
    "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
    "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
    "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
    "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
    "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
    "END;\n"
}

Definition at line 279 of file pgbench.c.

int unlogged_tables = 0

Definition at line 132 of file pgbench.c.

Definition at line 163 of file pgbench.c.

Definition at line 164 of file pgbench.c.