Header And Logo

PostgreSQL
| The world's most advanced open source database.

pgbench.c

Go to the documentation of this file.
00001 /*
00002  * pgbench.c
00003  *
00004  * A simple benchmark program for PostgreSQL
00005  * Originally written by Tatsuo Ishii and enhanced by many contributors.
00006  *
00007  * contrib/pgbench/pgbench.c
00008  * Copyright (c) 2000-2013, PostgreSQL Global Development Group
00009  * ALL RIGHTS RESERVED;
00010  *
00011  * Permission to use, copy, modify, and distribute this software and its
00012  * documentation for any purpose, without fee, and without a written agreement
00013  * is hereby granted, provided that the above copyright notice and this
00014  * paragraph and the following two paragraphs appear in all copies.
00015  *
00016  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
00017  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
00018  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
00019  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
00020  * POSSIBILITY OF SUCH DAMAGE.
00021  *
00022  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
00023  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
00024  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
00025  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
00026  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
00027  *
00028  */
00029 
00030 #ifdef WIN32
00031 #define FD_SETSIZE 1024         /* set before winsock2.h is included */
00032 #endif   /* ! WIN32 */
00033 
00034 #include "postgres_fe.h"
00035 
00036 #include "getopt_long.h"
00037 #include "libpq-fe.h"
00038 #include "portability/instr_time.h"
00039 
00040 #include <ctype.h>
00041 #include <math.h>
00042 #include <signal.h>
00043 
00044 #ifndef WIN32
00045 #include <sys/time.h>
00046 #include <unistd.h>
00047 #endif   /* ! WIN32 */
00048 
00049 #ifdef HAVE_SYS_SELECT_H
00050 #include <sys/select.h>
00051 #endif
00052 
00053 #ifdef HAVE_SYS_RESOURCE_H
00054 #include <sys/resource.h>       /* for getrlimit */
00055 #endif
00056 
00057 #ifndef INT64_MAX
00058 #define INT64_MAX   INT64CONST(0x7FFFFFFFFFFFFFFF)
00059 #endif
00060 
00061 /*
00062  * Multi-platform pthread implementations
00063  */
00064 
00065 #ifdef WIN32
00066 /* Use native win32 threads on Windows */
00067 typedef struct win32_pthread *pthread_t;
00068 typedef int pthread_attr_t;
00069 
00070 static int  pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
00071 static int  pthread_join(pthread_t th, void **thread_return);
00072 #elif defined(ENABLE_THREAD_SAFETY)
00073 /* Use platform-dependent pthread capability */
00074 #include <pthread.h>
00075 #else
00076 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
00077 
00078 #include <sys/wait.h>
00079 
00080 #define pthread_t               pg_pthread_t
00081 #define pthread_attr_t          pg_pthread_attr_t
00082 #define pthread_create          pg_pthread_create
00083 #define pthread_join            pg_pthread_join
00084 
00085 typedef struct fork_pthread *pthread_t;
00086 typedef int pthread_attr_t;
00087 
00088 static int  pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
00089 static int  pthread_join(pthread_t th, void **thread_return);
00090 #endif
00091 
00092 extern char *optarg;
00093 extern int  optind;
00094 
00095 
00096 /********************************************************************
00097  * some configurable parameters */
00098 
00099 /* max number of clients allowed */
00100 #ifdef FD_SETSIZE
00101 #define MAXCLIENTS  (FD_SETSIZE - 10)
00102 #else
00103 #define MAXCLIENTS  1024
00104 #endif
00105 
00106 #define LOG_STEP_SECONDS    5   /* seconds between log messages */
00107 #define DEFAULT_NXACTS  10      /* default nxacts */
00108 
00109 int         nxacts = 0;         /* number of transactions per client */
00110 int         duration = 0;       /* duration in seconds */
00111 
00112 /*
00113  * scaling factor. for example, scale = 10 will make 1000000 tuples in
00114  * pgbench_accounts table.
00115  */
00116 int         scale = 1;
00117 
00118 /*
00119  * fillfactor. for example, fillfactor = 90 will use only 90 percent
00120  * space during inserts and leave 10 percent free.
00121  */
00122 int         fillfactor = 100;
00123 
00124 /*
00125  * create foreign key constraints on the tables?
00126  */
00127 int         foreign_keys = 0;
00128 
00129 /*
00130  * use unlogged tables?
00131  */
00132 int         unlogged_tables = 0;
00133 
00134 /*
00135  * log sampling rate (1.0 = log everything, 0.0 = option not given)
00136  */
00137 double      sample_rate = 0.0;
00138 
00139 /*
00140  * tablespace selection
00141  */
00142 char       *tablespace = NULL;
00143 char       *index_tablespace = NULL;
00144 
00145 /*
00146  * end of configurable parameters
00147  *********************************************************************/
00148 
00149 #define nbranches   1           /* Makes little sense to change this.  Change
00150                                  * -s instead */
00151 #define ntellers    10
00152 #define naccounts   100000
00153 
00154 /*
00155  * The scale factor at/beyond which 32bit integers are incapable of storing
00156  * 64bit values.
00157  *
00158  * Although the actual threshold is 21474, we use 20000 because it is easier to
00159  * document and remember, and isn't that far away from the real threshold.
00160  */
00161 #define SCALE_32BIT_THRESHOLD 20000
00162 
00163 bool        use_log;            /* log transaction latencies to a file */
00164 bool        use_quiet;          /* quiet logging onto stderr */
00165 int         agg_interval;       /* log aggregates instead of individual transactions */
00166 bool        is_connect;         /* establish connection for each transaction */
00167 bool        is_latencies;       /* report per-command latencies */
00168 int         main_pid;           /* main process id used in log filename */
00169 
00170 char       *pghost = "";
00171 char       *pgport = "";
00172 char       *login = NULL;
00173 char       *dbName;
00174 const char *progname;
00175 
00176 volatile bool timer_exceeded = false;   /* flag from signal handler */
00177 
00178 /* variable definitions */
00179 typedef struct
00180 {
00181     char       *name;           /* variable name */
00182     char       *value;          /* its value */
00183 } Variable;
00184 
00185 #define MAX_FILES       128     /* max number of SQL script files allowed */
00186 #define SHELL_COMMAND_SIZE  256 /* maximum size allowed for shell command */
00187 
00188 /*
00189  * structures used in custom query mode
00190  */
00191 
00192 typedef struct
00193 {
00194     PGconn     *con;            /* connection handle to DB */
00195     int         id;             /* client No. */
00196     int         state;          /* state No. */
00197     int         cnt;            /* xacts count */
00198     int         ecnt;           /* error count */
00199     int         listen;         /* 0 indicates that an async query has been
00200                                  * sent */
00201     int         sleeping;       /* 1 indicates that the client is napping */
00202     int64       until;          /* napping until (usec) */
00203     Variable   *variables;      /* array of variable definitions */
00204     int         nvariables;
00205     instr_time  txn_begin;      /* used for measuring transaction latencies */
00206     instr_time  stmt_begin;     /* used for measuring statement latencies */
00207     int         use_file;       /* index in sql_files for this client */
00208     bool        prepared[MAX_FILES];
00209 } CState;
00210 
00211 /*
00212  * Thread state and result
00213  */
00214 typedef struct
00215 {
00216     int         tid;            /* thread id */
00217     pthread_t   thread;         /* thread handle */
00218     CState     *state;          /* array of CState */
00219     int         nstate;         /* length of state[] */
00220     instr_time  start_time;     /* thread start time */
00221     instr_time *exec_elapsed;   /* time spent executing cmds (per Command) */
00222     int        *exec_count;     /* number of cmd executions (per Command) */
00223     unsigned short random_state[3];     /* separate randomness for each thread */
00224 } TState;
00225 
00226 #define INVALID_THREAD      ((pthread_t) 0)
00227 
00228 typedef struct
00229 {
00230     instr_time  conn_time;
00231     int         xacts;
00232 } TResult;
00233 
00234 /*
00235  * queries read from files
00236  */
00237 #define SQL_COMMAND     1
00238 #define META_COMMAND    2
00239 #define MAX_ARGS        10
00240 
00241 typedef enum QueryMode
00242 {
00243     QUERY_SIMPLE,               /* simple query */
00244     QUERY_EXTENDED,             /* extended query */
00245     QUERY_PREPARED,             /* extended query with prepared statements */
00246     NUM_QUERYMODE
00247 } QueryMode;
00248 
00249 static QueryMode querymode = QUERY_SIMPLE;
00250 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
00251 
00252 typedef struct
00253 {
00254     char       *line;           /* full text of command line */
00255     int         command_num;    /* unique index of this Command struct */
00256     int         type;           /* command type (SQL_COMMAND or META_COMMAND) */
00257     int         argc;           /* number of command words */
00258     char       *argv[MAX_ARGS]; /* command word list */
00259 } Command;
00260 
00261 typedef struct
00262 {
00263 
00264     long    start_time;         /* when does the interval start */
00265     int     cnt;                /* number of transactions */
00266     double  min_duration;       /* min/max durations */
00267     double  max_duration;
00268     double  sum;                /* sum(duration), sum(duration^2) - for estimates */
00269     double  sum2;
00270     
00271 } AggVals;
00272 
00273 static Command **sql_files[MAX_FILES];  /* SQL script files */
00274 static int  num_files;          /* number of script files */
00275 static int  num_commands = 0;   /* total number of Command structs */
00276 static int  debug = 0;          /* debug flag */
00277 
00278 /* default scenario */
00279 static char *tpc_b = {
00280     "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
00281     "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
00282     "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
00283     "\\setrandom aid 1 :naccounts\n"
00284     "\\setrandom bid 1 :nbranches\n"
00285     "\\setrandom tid 1 :ntellers\n"
00286     "\\setrandom delta -5000 5000\n"
00287     "BEGIN;\n"
00288     "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
00289     "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
00290     "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
00291     "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
00292     "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
00293     "END;\n"
00294 };
00295 
00296 /* -N case */
00297 static char *simple_update = {
00298     "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
00299     "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
00300     "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
00301     "\\setrandom aid 1 :naccounts\n"
00302     "\\setrandom bid 1 :nbranches\n"
00303     "\\setrandom tid 1 :ntellers\n"
00304     "\\setrandom delta -5000 5000\n"
00305     "BEGIN;\n"
00306     "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
00307     "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
00308     "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
00309     "END;\n"
00310 };
00311 
00312 /* -S case */
00313 static char *select_only = {
00314     "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
00315     "\\setrandom aid 1 :naccounts\n"
00316     "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
00317 };
00318 
00319 /* Function prototypes */
00320 static void setalarm(int seconds);
00321 static void *threadRun(void *arg);
00322 
00323 static void
00324 usage(void)
00325 {
00326     printf("%s is a benchmarking tool for PostgreSQL.\n\n"
00327            "Usage:\n"
00328            "  %s [OPTION]... [DBNAME]\n"
00329            "\nInitialization options:\n"
00330            "  -i           invokes initialization mode\n"
00331            "  -n           do not run VACUUM after initialization\n"
00332            "  -F NUM       fill factor\n"
00333            "  -s NUM       scaling factor\n"
00334            "  -q           quiet logging (one message each 5 seconds)\n"
00335            "  --foreign-keys\n"
00336            "               create foreign key constraints between tables\n"
00337            "  --index-tablespace=TABLESPACE\n"
00338            "               create indexes in the specified tablespace\n"
00339            "  --tablespace=TABLESPACE\n"
00340            "               create tables in the specified tablespace\n"
00341            "  --unlogged-tables\n"
00342            "               create tables as unlogged tables\n"
00343            "\nBenchmarking options:\n"
00344         "  -c NUM       number of concurrent database clients (default: 1)\n"
00345            "  -C           establish new connection for each transaction\n"
00346            "  -D VARNAME=VALUE\n"
00347            "               define variable for use by custom script\n"
00348            "  -f FILENAME  read transaction script from FILENAME\n"
00349            "  -j NUM       number of threads (default: 1)\n"
00350            "  -l           write transaction times to log file\n"
00351            "  --sampling-rate NUM\n"
00352            "               fraction of transactions to log (e.g. 0.01 for 1%% sample)\n"
00353            "  --aggregate-interval NUM\n"
00354            "               aggregate data over NUM seconds\n"
00355            "  -M simple|extended|prepared\n"
00356            "               protocol for submitting queries to server (default: simple)\n"
00357            "  -n           do not run VACUUM before tests\n"
00358            "  -N           do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
00359            "  -r           report average latency per command\n"
00360            "  -s NUM       report this scale factor in output\n"
00361            "  -S           perform SELECT-only transactions\n"
00362      "  -t NUM       number of transactions each client runs (default: 10)\n"
00363            "  -T NUM       duration of benchmark test in seconds\n"
00364            "  -v           vacuum all four standard tables before tests\n"
00365            "\nCommon options:\n"
00366            "  -d             print debugging output\n"
00367            "  -h HOSTNAME    database server host or socket directory\n"
00368            "  -p PORT        database server port number\n"
00369            "  -U USERNAME    connect as specified database user\n"
00370            "  -V, --version  output version information, then exit\n"
00371            "  -?, --help     show this help, then exit\n"
00372            "\n"
00373            "Report bugs to <[email protected]>.\n",
00374            progname, progname);
00375 }
00376 
00377 /*
00378  * strtoint64 -- convert a string to 64-bit integer
00379  *
00380  * This function is a modified version of scanint8() from
00381  * src/backend/utils/adt/int8.c.
00382  */
00383 static int64
00384 strtoint64(const char *str)
00385 {
00386     const char *ptr = str;
00387     int64       result = 0;
00388     int         sign = 1;
00389 
00390     /*
00391      * Do our own scan, rather than relying on sscanf which might be broken
00392      * for long long.
00393      */
00394 
00395     /* skip leading spaces */
00396     while (*ptr && isspace((unsigned char) *ptr))
00397         ptr++;
00398 
00399     /* handle sign */
00400     if (*ptr == '-')
00401     {
00402         ptr++;
00403 
00404         /*
00405          * Do an explicit check for INT64_MIN.  Ugly though this is, it's
00406          * cleaner than trying to get the loop below to handle it portably.
00407          */
00408         if (strncmp(ptr, "9223372036854775808", 19) == 0)
00409         {
00410             result = -INT64CONST(0x7fffffffffffffff) - 1;
00411             ptr += 19;
00412             goto gotdigits;
00413         }
00414         sign = -1;
00415     }
00416     else if (*ptr == '+')
00417         ptr++;
00418 
00419     /* require at least one digit */
00420     if (!isdigit((unsigned char) *ptr))
00421         fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
00422 
00423     /* process digits */
00424     while (*ptr && isdigit((unsigned char) *ptr))
00425     {
00426         int64       tmp = result * 10 + (*ptr++ - '0');
00427 
00428         if ((tmp / 10) != result)       /* overflow? */
00429             fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
00430         result = tmp;
00431     }
00432 
00433 gotdigits:
00434 
00435     /* allow trailing whitespace, but not other trailing chars */
00436     while (*ptr != '\0' && isspace((unsigned char) *ptr))
00437         ptr++;
00438 
00439     if (*ptr != '\0')
00440         fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
00441 
00442     return ((sign < 0) ? -result : result);
00443 }
00444 
00445 /* random number generator: uniform distribution from min to max inclusive */
00446 static int64
00447 getrand(TState *thread, int64 min, int64 max)
00448 {
00449     /*
00450      * Odd coding is so that min and max have approximately the same chance of
00451      * being selected as do numbers between them.
00452      *
00453      * pg_erand48() is thread-safe and concurrent, which is why we use it
00454      * rather than random(), which in glibc is non-reentrant, and therefore
00455      * protected by a mutex, and therefore a bottleneck on machines with many
00456      * CPUs.
00457      */
00458     return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
00459 }
00460 
00461 /* call PQexec() and exit() on failure */
00462 static void
00463 executeStatement(PGconn *con, const char *sql)
00464 {
00465     PGresult   *res;
00466 
00467     res = PQexec(con, sql);
00468     if (PQresultStatus(res) != PGRES_COMMAND_OK)
00469     {
00470         fprintf(stderr, "%s", PQerrorMessage(con));
00471         exit(1);
00472     }
00473     PQclear(res);
00474 }
00475 
00476 /* set up a connection to the backend */
00477 static PGconn *
00478 doConnect(void)
00479 {
00480     PGconn     *conn;
00481     static char *password = NULL;
00482     bool        new_pass;
00483 
00484     /*
00485      * Start the connection.  Loop until we have a password if requested by
00486      * backend.
00487      */
00488     do
00489     {
00490 #define PARAMS_ARRAY_SIZE   7
00491 
00492         const char *keywords[PARAMS_ARRAY_SIZE];
00493         const char *values[PARAMS_ARRAY_SIZE];
00494 
00495         keywords[0] = "host";
00496         values[0] = pghost;
00497         keywords[1] = "port";
00498         values[1] = pgport;
00499         keywords[2] = "user";
00500         values[2] = login;
00501         keywords[3] = "password";
00502         values[3] = password;
00503         keywords[4] = "dbname";
00504         values[4] = dbName;
00505         keywords[5] = "fallback_application_name";
00506         values[5] = progname;
00507         keywords[6] = NULL;
00508         values[6] = NULL;
00509 
00510         new_pass = false;
00511 
00512         conn = PQconnectdbParams(keywords, values, true);
00513 
00514         if (!conn)
00515         {
00516             fprintf(stderr, "Connection to database \"%s\" failed\n",
00517                     dbName);
00518             return NULL;
00519         }
00520 
00521         if (PQstatus(conn) == CONNECTION_BAD &&
00522             PQconnectionNeedsPassword(conn) &&
00523             password == NULL)
00524         {
00525             PQfinish(conn);
00526             password = simple_prompt("Password: ", 100, false);
00527             new_pass = true;
00528         }
00529     } while (new_pass);
00530 
00531     /* check to see that the backend connection was successfully made */
00532     if (PQstatus(conn) == CONNECTION_BAD)
00533     {
00534         fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
00535                 dbName, PQerrorMessage(conn));
00536         PQfinish(conn);
00537         return NULL;
00538     }
00539 
00540     return conn;
00541 }
00542 
00543 /* throw away response from backend */
00544 static void
00545 discard_response(CState *state)
00546 {
00547     PGresult   *res;
00548 
00549     do
00550     {
00551         res = PQgetResult(state->con);
00552         if (res)
00553             PQclear(res);
00554     } while (res);
00555 }
00556 
00557 static int
00558 compareVariables(const void *v1, const void *v2)
00559 {
00560     return strcmp(((const Variable *) v1)->name,
00561                   ((const Variable *) v2)->name);
00562 }
00563 
00564 static char *
00565 getVariable(CState *st, char *name)
00566 {
00567     Variable    key,
00568                *var;
00569 
00570     /* On some versions of Solaris, bsearch of zero items dumps core */
00571     if (st->nvariables <= 0)
00572         return NULL;
00573 
00574     key.name = name;
00575     var = (Variable *) bsearch((void *) &key,
00576                                (void *) st->variables,
00577                                st->nvariables,
00578                                sizeof(Variable),
00579                                compareVariables);
00580     if (var != NULL)
00581         return var->value;
00582     else
00583         return NULL;
00584 }
00585 
00586 /* check whether the name consists of alphabets, numerals and underscores. */
00587 static bool
00588 isLegalVariableName(const char *name)
00589 {
00590     int         i;
00591 
00592     for (i = 0; name[i] != '\0'; i++)
00593     {
00594         if (!isalnum((unsigned char) name[i]) && name[i] != '_')
00595             return false;
00596     }
00597 
00598     return true;
00599 }
00600 
00601 static int
00602 putVariable(CState *st, const char *context, char *name, char *value)
00603 {
00604     Variable    key,
00605                *var;
00606 
00607     key.name = name;
00608     /* On some versions of Solaris, bsearch of zero items dumps core */
00609     if (st->nvariables > 0)
00610         var = (Variable *) bsearch((void *) &key,
00611                                    (void *) st->variables,
00612                                    st->nvariables,
00613                                    sizeof(Variable),
00614                                    compareVariables);
00615     else
00616         var = NULL;
00617 
00618     if (var == NULL)
00619     {
00620         Variable   *newvars;
00621 
00622         /*
00623          * Check for the name only when declaring a new variable to avoid
00624          * overhead.
00625          */
00626         if (!isLegalVariableName(name))
00627         {
00628             fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
00629             return false;
00630         }
00631 
00632         if (st->variables)
00633             newvars = (Variable *) pg_realloc(st->variables,
00634                                     (st->nvariables + 1) * sizeof(Variable));
00635         else
00636             newvars = (Variable *) pg_malloc(sizeof(Variable));
00637 
00638         st->variables = newvars;
00639 
00640         var = &newvars[st->nvariables];
00641 
00642         var->name = pg_strdup(name);
00643         var->value = pg_strdup(value);
00644 
00645         st->nvariables++;
00646 
00647         qsort((void *) st->variables, st->nvariables, sizeof(Variable),
00648               compareVariables);
00649     }
00650     else
00651     {
00652         char       *val;
00653 
00654         /* dup then free, in case value is pointing at this variable */
00655         val = pg_strdup(value);
00656 
00657         free(var->value);
00658         var->value = val;
00659     }
00660 
00661     return true;
00662 }
00663 
00664 static char *
00665 parseVariable(const char *sql, int *eaten)
00666 {
00667     int         i = 0;
00668     char       *name;
00669 
00670     do
00671     {
00672         i++;
00673     } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
00674     if (i == 1)
00675         return NULL;
00676 
00677     name = pg_malloc(i);
00678     memcpy(name, &sql[1], i - 1);
00679     name[i - 1] = '\0';
00680 
00681     *eaten = i;
00682     return name;
00683 }
00684 
00685 static char *
00686 replaceVariable(char **sql, char *param, int len, char *value)
00687 {
00688     int         valueln = strlen(value);
00689 
00690     if (valueln > len)
00691     {
00692         size_t      offset = param - *sql;
00693 
00694         *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
00695         param = *sql + offset;
00696     }
00697 
00698     if (valueln != len)
00699         memmove(param + valueln, param + len, strlen(param + len) + 1);
00700     strncpy(param, value, valueln);
00701 
00702     return param + valueln;
00703 }
00704 
00705 static char *
00706 assignVariables(CState *st, char *sql)
00707 {
00708     char       *p,
00709                *name,
00710                *val;
00711 
00712     p = sql;
00713     while ((p = strchr(p, ':')) != NULL)
00714     {
00715         int         eaten;
00716 
00717         name = parseVariable(p, &eaten);
00718         if (name == NULL)
00719         {
00720             while (*p == ':')
00721             {
00722                 p++;
00723             }
00724             continue;
00725         }
00726 
00727         val = getVariable(st, name);
00728         free(name);
00729         if (val == NULL)
00730         {
00731             p++;
00732             continue;
00733         }
00734 
00735         p = replaceVariable(&sql, p, eaten, val);
00736     }
00737 
00738     return sql;
00739 }
00740 
00741 static void
00742 getQueryParams(CState *st, const Command *command, const char **params)
00743 {
00744     int         i;
00745 
00746     for (i = 0; i < command->argc - 1; i++)
00747         params[i] = getVariable(st, command->argv[i + 1]);
00748 }
00749 
00750 /*
00751  * Run a shell command. The result is assigned to the variable if not NULL.
00752  * Return true if succeeded, or false on error.
00753  */
00754 static bool
00755 runShellCommand(CState *st, char *variable, char **argv, int argc)
00756 {
00757     char        command[SHELL_COMMAND_SIZE];
00758     int         i,
00759                 len = 0;
00760     FILE       *fp;
00761     char        res[64];
00762     char       *endptr;
00763     int         retval;
00764 
00765     /*----------
00766      * Join arguments with whitespace separators. Arguments starting with
00767      * exactly one colon are treated as variables:
00768      *  name - append a string "name"
00769      *  :var - append a variable named 'var'
00770      *  ::name - append a string ":name"
00771      *----------
00772      */
00773     for (i = 0; i < argc; i++)
00774     {
00775         char       *arg;
00776         int         arglen;
00777 
00778         if (argv[i][0] != ':')
00779         {
00780             arg = argv[i];      /* a string literal */
00781         }
00782         else if (argv[i][1] == ':')
00783         {
00784             arg = argv[i] + 1;  /* a string literal starting with colons */
00785         }
00786         else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
00787         {
00788             fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
00789             return false;
00790         }
00791 
00792         arglen = strlen(arg);
00793         if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
00794         {
00795             fprintf(stderr, "%s: too long shell command\n", argv[0]);
00796             return false;
00797         }
00798 
00799         if (i > 0)
00800             command[len++] = ' ';
00801         memcpy(command + len, arg, arglen);
00802         len += arglen;
00803     }
00804 
00805     command[len] = '\0';
00806 
00807     /* Fast path for non-assignment case */
00808     if (variable == NULL)
00809     {
00810         if (system(command))
00811         {
00812             if (!timer_exceeded)
00813                 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
00814             return false;
00815         }
00816         return true;
00817     }
00818 
00819     /* Execute the command with pipe and read the standard output. */
00820     if ((fp = popen(command, "r")) == NULL)
00821     {
00822         fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
00823         return false;
00824     }
00825     if (fgets(res, sizeof(res), fp) == NULL)
00826     {
00827         if (!timer_exceeded)
00828             fprintf(stderr, "%s: cannot read the result\n", argv[0]);
00829         return false;
00830     }
00831     if (pclose(fp) < 0)
00832     {
00833         fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
00834         return false;
00835     }
00836 
00837     /* Check whether the result is an integer and assign it to the variable */
00838     retval = (int) strtol(res, &endptr, 10);
00839     while (*endptr != '\0' && isspace((unsigned char) *endptr))
00840         endptr++;
00841     if (*res == '\0' || *endptr != '\0')
00842     {
00843         fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
00844         return false;
00845     }
00846     snprintf(res, sizeof(res), "%d", retval);
00847     if (!putVariable(st, "setshell", variable, res))
00848         return false;
00849 
00850 #ifdef DEBUG
00851     printf("shell parameter name: %s, value: %s\n", argv[1], res);
00852 #endif
00853     return true;
00854 }
00855 
00856 #define MAX_PREPARE_NAME        32
00857 static void
00858 preparedStatementName(char *buffer, int file, int state)
00859 {
00860     sprintf(buffer, "P%d_%d", file, state);
00861 }
00862 
00863 static bool
00864 clientDone(CState *st, bool ok)
00865 {
00866     (void) ok;                  /* unused */
00867 
00868     if (st->con != NULL)
00869     {
00870         PQfinish(st->con);
00871         st->con = NULL;
00872     }
00873     return false;               /* always false */
00874 }
00875 
00876 static
00877 void agg_vals_init(AggVals * aggs, instr_time start)
00878 {
00879     /* basic counters */
00880     aggs->cnt = 0;      /* number of transactions */
00881     aggs->sum = 0;      /* SUM(duration) */
00882     aggs->sum2 = 0;     /* SUM(duration*duration) */
00883 
00884     /* min and max transaction duration */
00885     aggs->min_duration = 0;
00886     aggs->max_duration = 0;
00887 
00888     /* start of the current interval */
00889     aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
00890 }
00891 
00892 /* return false iff client should be disconnected */
00893 static bool
00894 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals * agg)
00895 {
00896     PGresult   *res;
00897     Command   **commands;
00898 
00899 top:
00900     commands = sql_files[st->use_file];
00901 
00902     if (st->sleeping)
00903     {                           /* are we sleeping? */
00904         instr_time  now;
00905 
00906         INSTR_TIME_SET_CURRENT(now);
00907         if (st->until <= INSTR_TIME_GET_MICROSEC(now))
00908             st->sleeping = 0;   /* Done sleeping, go ahead with next command */
00909         else
00910             return true;        /* Still sleeping, nothing to do here */
00911     }
00912 
00913     if (st->listen)
00914     {                           /* are we receiver? */
00915         if (commands[st->state]->type == SQL_COMMAND)
00916         {
00917             if (debug)
00918                 fprintf(stderr, "client %d receiving\n", st->id);
00919             if (!PQconsumeInput(st->con))
00920             {                   /* there's something wrong */
00921                 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
00922                 return clientDone(st, false);
00923             }
00924             if (PQisBusy(st->con))
00925                 return true;    /* don't have the whole result yet */
00926         }
00927 
00928         /*
00929          * command finished: accumulate per-command execution times in
00930          * thread-local data structure, if per-command latencies are requested
00931          */
00932         if (is_latencies)
00933         {
00934             instr_time  now;
00935             int         cnum = commands[st->state]->command_num;
00936 
00937             INSTR_TIME_SET_CURRENT(now);
00938             INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
00939                                   now, st->stmt_begin);
00940             thread->exec_count[cnum]++;
00941         }
00942 
00943         /*
00944          * if transaction finished, record the time it took in the log
00945          */
00946         if (logfile && commands[st->state + 1] == NULL)
00947         {
00948             instr_time  now;
00949             instr_time  diff;
00950             double      usec;
00951 
00952             /*
00953              * write the log entry if this row belongs to the random sample,
00954              * or no sampling rate was given which means log everything.
00955              */
00956             if (sample_rate == 0.0 ||
00957                 pg_erand48(thread->random_state) <= sample_rate)
00958             {
00959                 INSTR_TIME_SET_CURRENT(now);
00960                 diff = now;
00961                 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
00962                 usec = (double) INSTR_TIME_GET_MICROSEC(diff);
00963 
00964                 /* should we aggregate the results or not? */
00965                 if (agg_interval > 0)
00966                 {
00967                     /* are we still in the same interval? if yes, accumulate the
00968                     * values (print them otherwise) */
00969                     if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
00970                     {
00971                         agg->cnt += 1;
00972                         agg->sum  += usec;
00973                         agg->sum2 += usec * usec;
00974 
00975                         /* first in this aggregation interval */
00976                         if ((agg->cnt == 1) || (usec < agg->min_duration))
00977                             agg->min_duration =  usec;
00978 
00979                         if ((agg->cnt == 1) || (usec > agg->max_duration))
00980                             agg->max_duration = usec;
00981                     }
00982                     else
00983                     {
00984                         /* Loop until we reach the interval of the current transaction (and
00985                          * print all the empty intervals in between). */
00986                         while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
00987                         {
00988                             /* This is a non-Windows branch (thanks to the ifdef in usage), so
00989                              * we don't need to handle this in a special way (see below). */
00990                             fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n",
00991                                     agg->start_time, agg->cnt, agg->sum, agg->sum2,
00992                                     agg->min_duration, agg->max_duration);
00993 
00994                             /* move to the next inteval */
00995                             agg->start_time = agg->start_time + agg_interval;
00996 
00997                             /* reset for "no transaction" intervals */
00998                             agg->cnt = 0;
00999                             agg->min_duration = 0;
01000                             agg->max_duration = 0;
01001                             agg->sum = 0;
01002                             agg->sum2 = 0;
01003                         }
01004 
01005                         /* and now update the reset values (include the current) */
01006                         agg->cnt = 1;
01007                         agg->min_duration = usec;
01008                         agg->max_duration = usec;
01009                         agg->sum = usec;
01010                         agg->sum2 = usec * usec;
01011                     }
01012                 }
01013                 else
01014                 {
01015                     /* no, print raw transactions */
01016 #ifndef WIN32
01017                     /* This is more than we really ought to know about instr_time */
01018                     fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
01019                             st->id, st->cnt, usec, st->use_file,
01020                             (long) now.tv_sec, (long) now.tv_usec);
01021 #else
01022                     /* On Windows, instr_time doesn't provide a timestamp anyway */
01023                     fprintf(logfile, "%d %d %.0f %d 0 0\n",
01024                             st->id, st->cnt, usec, st->use_file);
01025 #endif
01026                 }
01027             }
01028         }
01029 
01030         if (commands[st->state]->type == SQL_COMMAND)
01031         {
01032             /*
01033              * Read and discard the query result; note this is not included in
01034              * the statement latency numbers.
01035              */
01036             res = PQgetResult(st->con);
01037             switch (PQresultStatus(res))
01038             {
01039                 case PGRES_COMMAND_OK:
01040                 case PGRES_TUPLES_OK:
01041                     break;      /* OK */
01042                 default:
01043                     fprintf(stderr, "Client %d aborted in state %d: %s",
01044                             st->id, st->state, PQerrorMessage(st->con));
01045                     PQclear(res);
01046                     return clientDone(st, false);
01047             }
01048             PQclear(res);
01049             discard_response(st);
01050         }
01051 
01052         if (commands[st->state + 1] == NULL)
01053         {
01054             if (is_connect)
01055             {
01056                 PQfinish(st->con);
01057                 st->con = NULL;
01058             }
01059 
01060             ++st->cnt;
01061             if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
01062                 return clientDone(st, true);    /* exit success */
01063         }
01064 
01065         /* increment state counter */
01066         st->state++;
01067         if (commands[st->state] == NULL)
01068         {
01069             st->state = 0;
01070             st->use_file = (int) getrand(thread, 0, num_files - 1);
01071             commands = sql_files[st->use_file];
01072         }
01073     }
01074 
01075     if (st->con == NULL)
01076     {
01077         instr_time  start,
01078                     end;
01079 
01080         INSTR_TIME_SET_CURRENT(start);
01081         if ((st->con = doConnect()) == NULL)
01082         {
01083             fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
01084             return clientDone(st, false);
01085         }
01086         INSTR_TIME_SET_CURRENT(end);
01087         INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
01088     }
01089 
01090     /* Record transaction start time if logging is enabled */
01091     if (logfile && st->state == 0)
01092         INSTR_TIME_SET_CURRENT(st->txn_begin);
01093 
01094     /* Record statement start time if per-command latencies are requested */
01095     if (is_latencies)
01096         INSTR_TIME_SET_CURRENT(st->stmt_begin);
01097 
01098     if (commands[st->state]->type == SQL_COMMAND)
01099     {
01100         const Command *command = commands[st->state];
01101         int         r;
01102 
01103         if (querymode == QUERY_SIMPLE)
01104         {
01105             char       *sql;
01106 
01107             sql = pg_strdup(command->argv[0]);
01108             sql = assignVariables(st, sql);
01109 
01110             if (debug)
01111                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
01112             r = PQsendQuery(st->con, sql);
01113             free(sql);
01114         }
01115         else if (querymode == QUERY_EXTENDED)
01116         {
01117             const char *sql = command->argv[0];
01118             const char *params[MAX_ARGS];
01119 
01120             getQueryParams(st, command, params);
01121 
01122             if (debug)
01123                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
01124             r = PQsendQueryParams(st->con, sql, command->argc - 1,
01125                                   NULL, params, NULL, NULL, 0);
01126         }
01127         else if (querymode == QUERY_PREPARED)
01128         {
01129             char        name[MAX_PREPARE_NAME];
01130             const char *params[MAX_ARGS];
01131 
01132             if (!st->prepared[st->use_file])
01133             {
01134                 int         j;
01135 
01136                 for (j = 0; commands[j] != NULL; j++)
01137                 {
01138                     PGresult   *res;
01139                     char        name[MAX_PREPARE_NAME];
01140 
01141                     if (commands[j]->type != SQL_COMMAND)
01142                         continue;
01143                     preparedStatementName(name, st->use_file, j);
01144                     res = PQprepare(st->con, name,
01145                           commands[j]->argv[0], commands[j]->argc - 1, NULL);
01146                     if (PQresultStatus(res) != PGRES_COMMAND_OK)
01147                         fprintf(stderr, "%s", PQerrorMessage(st->con));
01148                     PQclear(res);
01149                 }
01150                 st->prepared[st->use_file] = true;
01151             }
01152 
01153             getQueryParams(st, command, params);
01154             preparedStatementName(name, st->use_file, st->state);
01155 
01156             if (debug)
01157                 fprintf(stderr, "client %d sending %s\n", st->id, name);
01158             r = PQsendQueryPrepared(st->con, name, command->argc - 1,
01159                                     params, NULL, NULL, 0);
01160         }
01161         else    /* unknown sql mode */
01162             r = 0;
01163 
01164         if (r == 0)
01165         {
01166             if (debug)
01167                 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
01168             st->ecnt++;
01169         }
01170         else
01171             st->listen = 1;     /* flags that should be listened */
01172     }
01173     else if (commands[st->state]->type == META_COMMAND)
01174     {
01175         int         argc = commands[st->state]->argc,
01176                     i;
01177         char      **argv = commands[st->state]->argv;
01178 
01179         if (debug)
01180         {
01181             fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
01182             for (i = 1; i < argc; i++)
01183                 fprintf(stderr, " %s", argv[i]);
01184             fprintf(stderr, "\n");
01185         }
01186 
01187         if (pg_strcasecmp(argv[0], "setrandom") == 0)
01188         {
01189             char       *var;
01190             int64       min,
01191                         max;
01192             char        res[64];
01193 
01194             if (*argv[2] == ':')
01195             {
01196                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
01197                 {
01198                     fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
01199                     st->ecnt++;
01200                     return true;
01201                 }
01202                 min = strtoint64(var);
01203             }
01204             else
01205                 min = strtoint64(argv[2]);
01206 
01207 #ifdef NOT_USED
01208             if (min < 0)
01209             {
01210                 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
01211                 st->ecnt++;
01212                 return;
01213             }
01214 #endif
01215 
01216             if (*argv[3] == ':')
01217             {
01218                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
01219                 {
01220                     fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
01221                     st->ecnt++;
01222                     return true;
01223                 }
01224                 max = strtoint64(var);
01225             }
01226             else
01227                 max = strtoint64(argv[3]);
01228 
01229             if (max < min)
01230             {
01231                 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
01232                 st->ecnt++;
01233                 return true;
01234             }
01235 
01236             /*
01237              * getrand() needs to be able to subtract max from min and add
01238              * one to the result without overflowing.  Since we know max > min,
01239              * we can detect overflow just by checking for a negative result.
01240              * But we must check both that the subtraction doesn't overflow,
01241              * and that adding one to the result doesn't overflow either.
01242              */
01243             if (max - min < 0 || (max - min) + 1 < 0)
01244             {
01245                 fprintf(stderr, "%s: range too large\n", argv[0]);
01246                 st->ecnt++;
01247                 return true;
01248             }
01249 
01250 #ifdef DEBUG
01251             printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
01252 #endif
01253             snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
01254 
01255             if (!putVariable(st, argv[0], argv[1], res))
01256             {
01257                 st->ecnt++;
01258                 return true;
01259             }
01260 
01261             st->listen = 1;
01262         }
01263         else if (pg_strcasecmp(argv[0], "set") == 0)
01264         {
01265             char       *var;
01266             int64       ope1,
01267                         ope2;
01268             char        res[64];
01269 
01270             if (*argv[2] == ':')
01271             {
01272                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
01273                 {
01274                     fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
01275                     st->ecnt++;
01276                     return true;
01277                 }
01278                 ope1 = strtoint64(var);
01279             }
01280             else
01281                 ope1 = strtoint64(argv[2]);
01282 
01283             if (argc < 5)
01284                 snprintf(res, sizeof(res), INT64_FORMAT, ope1);
01285             else
01286             {
01287                 if (*argv[4] == ':')
01288                 {
01289                     if ((var = getVariable(st, argv[4] + 1)) == NULL)
01290                     {
01291                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
01292                         st->ecnt++;
01293                         return true;
01294                     }
01295                     ope2 = strtoint64(var);
01296                 }
01297                 else
01298                     ope2 = strtoint64(argv[4]);
01299 
01300                 if (strcmp(argv[3], "+") == 0)
01301                     snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
01302                 else if (strcmp(argv[3], "-") == 0)
01303                     snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
01304                 else if (strcmp(argv[3], "*") == 0)
01305                     snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
01306                 else if (strcmp(argv[3], "/") == 0)
01307                 {
01308                     if (ope2 == 0)
01309                     {
01310                         fprintf(stderr, "%s: division by zero\n", argv[0]);
01311                         st->ecnt++;
01312                         return true;
01313                     }
01314                     snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
01315                 }
01316                 else
01317                 {
01318                     fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
01319                     st->ecnt++;
01320                     return true;
01321                 }
01322             }
01323 
01324             if (!putVariable(st, argv[0], argv[1], res))
01325             {
01326                 st->ecnt++;
01327                 return true;
01328             }
01329 
01330             st->listen = 1;
01331         }
01332         else if (pg_strcasecmp(argv[0], "sleep") == 0)
01333         {
01334             char       *var;
01335             int         usec;
01336             instr_time  now;
01337 
01338             if (*argv[1] == ':')
01339             {
01340                 if ((var = getVariable(st, argv[1] + 1)) == NULL)
01341                 {
01342                     fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
01343                     st->ecnt++;
01344                     return true;
01345                 }
01346                 usec = atoi(var);
01347             }
01348             else
01349                 usec = atoi(argv[1]);
01350 
01351             if (argc > 2)
01352             {
01353                 if (pg_strcasecmp(argv[2], "ms") == 0)
01354                     usec *= 1000;
01355                 else if (pg_strcasecmp(argv[2], "s") == 0)
01356                     usec *= 1000000;
01357             }
01358             else
01359                 usec *= 1000000;
01360 
01361             INSTR_TIME_SET_CURRENT(now);
01362             st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
01363             st->sleeping = 1;
01364 
01365             st->listen = 1;
01366         }
01367         else if (pg_strcasecmp(argv[0], "setshell") == 0)
01368         {
01369             bool        ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
01370 
01371             if (timer_exceeded) /* timeout */
01372                 return clientDone(st, true);
01373             else if (!ret)      /* on error */
01374             {
01375                 st->ecnt++;
01376                 return true;
01377             }
01378             else    /* succeeded */
01379                 st->listen = 1;
01380         }
01381         else if (pg_strcasecmp(argv[0], "shell") == 0)
01382         {
01383             bool        ret = runShellCommand(st, NULL, argv + 1, argc - 1);
01384 
01385             if (timer_exceeded) /* timeout */
01386                 return clientDone(st, true);
01387             else if (!ret)      /* on error */
01388             {
01389                 st->ecnt++;
01390                 return true;
01391             }
01392             else    /* succeeded */
01393                 st->listen = 1;
01394         }
01395         goto top;
01396     }
01397 
01398     return true;
01399 }
01400 
01401 /* discard connections */
01402 static void
01403 disconnect_all(CState *state, int length)
01404 {
01405     int         i;
01406 
01407     for (i = 0; i < length; i++)
01408     {
01409         if (state[i].con)
01410         {
01411             PQfinish(state[i].con);
01412             state[i].con = NULL;
01413         }
01414     }
01415 }
01416 
01417 /* create tables and setup data */
01418 static void
01419 init(bool is_no_vacuum)
01420 {
01421 
01422 /* The scale factor at/beyond which 32bit integers are incapable of storing
01423  * 64bit values.
01424  *
01425  * Although the actual threshold is 21474, we use 20000 because it is easier to
01426  * document and remember, and isn't that far away from the real threshold.
01427  */
01428 #define SCALE_32BIT_THRESHOLD 20000
01429 
01430     /*
01431      * Note: TPC-B requires at least 100 bytes per row, and the "filler"
01432      * fields in these table declarations were intended to comply with that.
01433      * But because they default to NULLs, they don't actually take any space.
01434      * We could fix that by giving them non-null default values. However, that
01435      * would completely break comparability of pgbench results with prior
01436      * versions.  Since pgbench has never pretended to be fully TPC-B
01437      * compliant anyway, we stick with the historical behavior.
01438      */
01439     struct ddlinfo
01440     {
01441         char       *table;
01442         char       *cols;
01443         int         declare_fillfactor;
01444     };
01445     struct ddlinfo DDLs[] = {
01446         {
01447             "pgbench_history",
01448             scale >= SCALE_32BIT_THRESHOLD
01449                 ? "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)"
01450                 : "tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",
01451             0
01452         },
01453         {
01454             "pgbench_tellers",
01455             "tid int not null,bid int,tbalance int,filler char(84)",
01456             1
01457         },
01458         {
01459             "pgbench_accounts",
01460             scale >= SCALE_32BIT_THRESHOLD
01461                 ? "aid bigint not null,bid int,abalance int,filler char(84)"
01462                 : "aid    int not null,bid int,abalance int,filler char(84)",
01463             1
01464         },
01465         {
01466             "pgbench_branches",
01467             "bid int not null,bbalance int,filler char(88)",
01468             1
01469         }
01470     };
01471     static char *DDLAFTERs[] = {
01472         "alter table pgbench_branches add primary key (bid)",
01473         "alter table pgbench_tellers add primary key (tid)",
01474         "alter table pgbench_accounts add primary key (aid)"
01475     };
01476     static char *DDLKEYs[] = {
01477         "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
01478         "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
01479         "alter table pgbench_history add foreign key (bid) references pgbench_branches",
01480         "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
01481         "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
01482     };
01483 
01484     PGconn     *con;
01485     PGresult   *res;
01486     char        sql[256];
01487     int         i;
01488     int64       k;
01489 
01490     /* used to track elapsed time and estimate of the remaining time */
01491     instr_time  start, diff;
01492     double      elapsed_sec, remaining_sec;
01493     int         log_interval = 1;
01494 
01495     if ((con = doConnect()) == NULL)
01496         exit(1);
01497 
01498     for (i = 0; i < lengthof(DDLs); i++)
01499     {
01500         char        opts[256];
01501         char        buffer[256];
01502         struct ddlinfo *ddl = &DDLs[i];
01503 
01504         /* Remove old table, if it exists. */
01505         snprintf(buffer, 256, "drop table if exists %s", ddl->table);
01506         executeStatement(con, buffer);
01507 
01508         /* Construct new create table statement. */
01509         opts[0] = '\0';
01510         if (ddl->declare_fillfactor)
01511             snprintf(opts + strlen(opts), 256 - strlen(opts),
01512                      " with (fillfactor=%d)", fillfactor);
01513         if (tablespace != NULL)
01514         {
01515             char       *escape_tablespace;
01516 
01517             escape_tablespace = PQescapeIdentifier(con, tablespace,
01518                                                    strlen(tablespace));
01519             snprintf(opts + strlen(opts), 256 - strlen(opts),
01520                      " tablespace %s", escape_tablespace);
01521             PQfreemem(escape_tablespace);
01522         }
01523         snprintf(buffer, 256, "create%s table %s(%s)%s",
01524                  unlogged_tables ? " unlogged" : "",
01525                  ddl->table, ddl->cols, opts);
01526 
01527         executeStatement(con, buffer);
01528     }
01529 
01530     executeStatement(con, "begin");
01531 
01532     for (i = 0; i < nbranches * scale; i++)
01533     {
01534         snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
01535         executeStatement(con, sql);
01536     }
01537 
01538     for (i = 0; i < ntellers * scale; i++)
01539     {
01540         snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
01541                  i + 1, i / ntellers + 1);
01542         executeStatement(con, sql);
01543     }
01544 
01545     executeStatement(con, "commit");
01546 
01547     /*
01548      * fill the pgbench_accounts table with some data
01549      */
01550     fprintf(stderr, "creating tables...\n");
01551 
01552     executeStatement(con, "begin");
01553     executeStatement(con, "truncate pgbench_accounts");
01554 
01555     res = PQexec(con, "copy pgbench_accounts from stdin");
01556     if (PQresultStatus(res) != PGRES_COPY_IN)
01557     {
01558         fprintf(stderr, "%s", PQerrorMessage(con));
01559         exit(1);
01560     }
01561     PQclear(res);
01562 
01563     INSTR_TIME_SET_CURRENT(start);
01564 
01565     for (k = 0; k < (int64) naccounts * scale; k++)
01566     {
01567         int64       j = k + 1;
01568 
01569         snprintf(sql, 256, INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n", j, k / naccounts + 1, 0);
01570         if (PQputline(con, sql))
01571         {
01572             fprintf(stderr, "PQputline failed\n");
01573             exit(1);
01574         }
01575 
01576         /* If we want to stick with the original logging, print a message each
01577          * 100k inserted rows. */
01578         if ((! use_quiet) && (j % 100000 == 0))
01579         {
01580             INSTR_TIME_SET_CURRENT(diff);
01581             INSTR_TIME_SUBTRACT(diff, start);
01582 
01583             elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
01584             remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
01585 
01586             fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
01587                             j, (int64)naccounts * scale,
01588                             (int) (((int64) j * 100) / (naccounts * scale)),
01589                             elapsed_sec, remaining_sec);
01590         }
01591         /* let's not call the timing for each row, but only each 100 rows */
01592         else if (use_quiet && (j % 100 == 0))
01593         {
01594             INSTR_TIME_SET_CURRENT(diff);
01595             INSTR_TIME_SUBTRACT(diff, start);
01596 
01597             elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
01598             remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
01599 
01600             /* have we reached the next interval (or end)? */
01601             if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) {
01602 
01603                 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
01604                         j, (int64)naccounts * scale,
01605                         (int) (((int64) j * 100) / (naccounts * scale)), elapsed_sec, remaining_sec);
01606 
01607                 /* skip to the next interval */
01608                 log_interval = (int)ceil(elapsed_sec/LOG_STEP_SECONDS);
01609             }
01610         }
01611 
01612     }
01613     if (PQputline(con, "\\.\n"))
01614     {
01615         fprintf(stderr, "very last PQputline failed\n");
01616         exit(1);
01617     }
01618     if (PQendcopy(con))
01619     {
01620         fprintf(stderr, "PQendcopy failed\n");
01621         exit(1);
01622     }
01623     executeStatement(con, "commit");
01624 
01625     /* vacuum */
01626     if (!is_no_vacuum)
01627     {
01628         fprintf(stderr, "vacuum...\n");
01629         executeStatement(con, "vacuum analyze pgbench_branches");
01630         executeStatement(con, "vacuum analyze pgbench_tellers");
01631         executeStatement(con, "vacuum analyze pgbench_accounts");
01632         executeStatement(con, "vacuum analyze pgbench_history");
01633     }
01634 
01635     /*
01636      * create indexes
01637      */
01638     fprintf(stderr, "set primary keys...\n");
01639     for (i = 0; i < lengthof(DDLAFTERs); i++)
01640     {
01641         char        buffer[256];
01642 
01643         strncpy(buffer, DDLAFTERs[i], 256);
01644 
01645         if (index_tablespace != NULL)
01646         {
01647             char       *escape_tablespace;
01648 
01649             escape_tablespace = PQescapeIdentifier(con, index_tablespace,
01650                                                    strlen(index_tablespace));
01651             snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
01652                      " using index tablespace %s", escape_tablespace);
01653             PQfreemem(escape_tablespace);
01654         }
01655 
01656         executeStatement(con, buffer);
01657     }
01658 
01659     /*
01660      * create foreign keys
01661      */
01662     if (foreign_keys)
01663     {
01664         fprintf(stderr, "set foreign keys...\n");
01665         for (i = 0; i < lengthof(DDLKEYs); i++)
01666         {
01667             executeStatement(con, DDLKEYs[i]);
01668         }
01669     }
01670 
01671 
01672     fprintf(stderr, "done.\n");
01673     PQfinish(con);
01674 }
01675 
01676 /*
01677  * Parse the raw sql and replace :param to $n.
01678  */
01679 static bool
01680 parseQuery(Command *cmd, const char *raw_sql)
01681 {
01682     char       *sql,
01683                *p;
01684 
01685     sql = pg_strdup(raw_sql);
01686     cmd->argc = 1;
01687 
01688     p = sql;
01689     while ((p = strchr(p, ':')) != NULL)
01690     {
01691         char        var[12];
01692         char       *name;
01693         int         eaten;
01694 
01695         name = parseVariable(p, &eaten);
01696         if (name == NULL)
01697         {
01698             while (*p == ':')
01699             {
01700                 p++;
01701             }
01702             continue;
01703         }
01704 
01705         if (cmd->argc >= MAX_ARGS)
01706         {
01707             fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
01708             return false;
01709         }
01710 
01711         sprintf(var, "$%d", cmd->argc);
01712         p = replaceVariable(&sql, p, eaten, var);
01713 
01714         cmd->argv[cmd->argc] = name;
01715         cmd->argc++;
01716     }
01717 
01718     cmd->argv[0] = sql;
01719     return true;
01720 }
01721 
01722 /* Parse a command; return a Command struct, or NULL if it's a comment */
01723 static Command *
01724 process_commands(char *buf)
01725 {
01726     const char  delim[] = " \f\n\r\t\v";
01727 
01728     Command    *my_commands;
01729     int         j;
01730     char       *p,
01731                *tok;
01732 
01733     /* Make the string buf end at the next newline */
01734     if ((p = strchr(buf, '\n')) != NULL)
01735         *p = '\0';
01736 
01737     /* Skip leading whitespace */
01738     p = buf;
01739     while (isspace((unsigned char) *p))
01740         p++;
01741 
01742     /* If the line is empty or actually a comment, we're done */
01743     if (*p == '\0' || strncmp(p, "--", 2) == 0)
01744         return NULL;
01745 
01746     /* Allocate and initialize Command structure */
01747     my_commands = (Command *) pg_malloc(sizeof(Command));
01748     my_commands->line = pg_strdup(buf);
01749     my_commands->command_num = num_commands++;
01750     my_commands->type = 0;      /* until set */
01751     my_commands->argc = 0;
01752 
01753     if (*p == '\\')
01754     {
01755         my_commands->type = META_COMMAND;
01756 
01757         j = 0;
01758         tok = strtok(++p, delim);
01759 
01760         while (tok != NULL)
01761         {
01762             my_commands->argv[j++] = pg_strdup(tok);
01763             my_commands->argc++;
01764             tok = strtok(NULL, delim);
01765         }
01766 
01767         if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
01768         {
01769             if (my_commands->argc < 4)
01770             {
01771                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
01772                 exit(1);
01773             }
01774 
01775             for (j = 4; j < my_commands->argc; j++)
01776                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
01777                         my_commands->argv[0], my_commands->argv[j]);
01778         }
01779         else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
01780         {
01781             if (my_commands->argc < 3)
01782             {
01783                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
01784                 exit(1);
01785             }
01786 
01787             for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
01788                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
01789                         my_commands->argv[0], my_commands->argv[j]);
01790         }
01791         else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
01792         {
01793             if (my_commands->argc < 2)
01794             {
01795                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
01796                 exit(1);
01797             }
01798 
01799             /*
01800              * Split argument into number and unit to allow "sleep 1ms" etc.
01801              * We don't have to terminate the number argument with null
01802              * because it will be parsed with atoi, which ignores trailing
01803              * non-digit characters.
01804              */
01805             if (my_commands->argv[1][0] != ':')
01806             {
01807                 char       *c = my_commands->argv[1];
01808 
01809                 while (isdigit((unsigned char) *c))
01810                     c++;
01811                 if (*c)
01812                 {
01813                     my_commands->argv[2] = c;
01814                     if (my_commands->argc < 3)
01815                         my_commands->argc = 3;
01816                 }
01817             }
01818 
01819             if (my_commands->argc >= 3)
01820             {
01821                 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
01822                     pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
01823                     pg_strcasecmp(my_commands->argv[2], "s") != 0)
01824                 {
01825                     fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
01826                             my_commands->argv[0], my_commands->argv[2]);
01827                     exit(1);
01828                 }
01829             }
01830 
01831             for (j = 3; j < my_commands->argc; j++)
01832                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
01833                         my_commands->argv[0], my_commands->argv[j]);
01834         }
01835         else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
01836         {
01837             if (my_commands->argc < 3)
01838             {
01839                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
01840                 exit(1);
01841             }
01842         }
01843         else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
01844         {
01845             if (my_commands->argc < 1)
01846             {
01847                 fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
01848                 exit(1);
01849             }
01850         }
01851         else
01852         {
01853             fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
01854             exit(1);
01855         }
01856     }
01857     else
01858     {
01859         my_commands->type = SQL_COMMAND;
01860 
01861         switch (querymode)
01862         {
01863             case QUERY_SIMPLE:
01864                 my_commands->argv[0] = pg_strdup(p);
01865                 my_commands->argc++;
01866                 break;
01867             case QUERY_EXTENDED:
01868             case QUERY_PREPARED:
01869                 if (!parseQuery(my_commands, p))
01870                     exit(1);
01871                 break;
01872             default:
01873                 exit(1);
01874         }
01875     }
01876 
01877     return my_commands;
01878 }
01879 
01880 static int
01881 process_file(char *filename)
01882 {
01883 #define COMMANDS_ALLOC_NUM 128
01884 
01885     Command   **my_commands;
01886     FILE       *fd;
01887     int         lineno;
01888     char        buf[BUFSIZ];
01889     int         alloc_num;
01890 
01891     if (num_files >= MAX_FILES)
01892     {
01893         fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
01894         exit(1);
01895     }
01896 
01897     alloc_num = COMMANDS_ALLOC_NUM;
01898     my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
01899 
01900     if (strcmp(filename, "-") == 0)
01901         fd = stdin;
01902     else if ((fd = fopen(filename, "r")) == NULL)
01903     {
01904         fprintf(stderr, "%s: %s\n", filename, strerror(errno));
01905         return false;
01906     }
01907 
01908     lineno = 0;
01909 
01910     while (fgets(buf, sizeof(buf), fd) != NULL)
01911     {
01912         Command    *command;
01913 
01914         command = process_commands(buf);
01915         if (command == NULL)
01916             continue;
01917 
01918         my_commands[lineno] = command;
01919         lineno++;
01920 
01921         if (lineno >= alloc_num)
01922         {
01923             alloc_num += COMMANDS_ALLOC_NUM;
01924             my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
01925         }
01926     }
01927     fclose(fd);
01928 
01929     my_commands[lineno] = NULL;
01930 
01931     sql_files[num_files++] = my_commands;
01932 
01933     return true;
01934 }
01935 
01936 static Command **
01937 process_builtin(char *tb)
01938 {
01939 #define COMMANDS_ALLOC_NUM 128
01940 
01941     Command   **my_commands;
01942     int         lineno;
01943     char        buf[BUFSIZ];
01944     int         alloc_num;
01945 
01946     alloc_num = COMMANDS_ALLOC_NUM;
01947     my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
01948 
01949     lineno = 0;
01950 
01951     for (;;)
01952     {
01953         char       *p;
01954         Command    *command;
01955 
01956         p = buf;
01957         while (*tb && *tb != '\n')
01958             *p++ = *tb++;
01959 
01960         if (*tb == '\0')
01961             break;
01962 
01963         if (*tb == '\n')
01964             tb++;
01965 
01966         *p = '\0';
01967 
01968         command = process_commands(buf);
01969         if (command == NULL)
01970             continue;
01971 
01972         my_commands[lineno] = command;
01973         lineno++;
01974 
01975         if (lineno >= alloc_num)
01976         {
01977             alloc_num += COMMANDS_ALLOC_NUM;
01978             my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
01979         }
01980     }
01981 
01982     my_commands[lineno] = NULL;
01983 
01984     return my_commands;
01985 }
01986 
01987 /* print out results */
01988 static void
01989 printResults(int ttype, int normal_xacts, int nclients,
01990              TState *threads, int nthreads,
01991              instr_time total_time, instr_time conn_total_time)
01992 {
01993     double      time_include,
01994                 tps_include,
01995                 tps_exclude;
01996     char       *s;
01997 
01998     time_include = INSTR_TIME_GET_DOUBLE(total_time);
01999     tps_include = normal_xacts / time_include;
02000     tps_exclude = normal_xacts / (time_include -
02001                         (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
02002 
02003     if (ttype == 0)
02004         s = "TPC-B (sort of)";
02005     else if (ttype == 2)
02006         s = "Update only pgbench_accounts";
02007     else if (ttype == 1)
02008         s = "SELECT only";
02009     else
02010         s = "Custom query";
02011 
02012     printf("transaction type: %s\n", s);
02013     printf("scaling factor: %d\n", scale);
02014     printf("query mode: %s\n", QUERYMODE[querymode]);
02015     printf("number of clients: %d\n", nclients);
02016     printf("number of threads: %d\n", nthreads);
02017     if (duration <= 0)
02018     {
02019         printf("number of transactions per client: %d\n", nxacts);
02020         printf("number of transactions actually processed: %d/%d\n",
02021                normal_xacts, nxacts * nclients);
02022     }
02023     else
02024     {
02025         printf("duration: %d s\n", duration);
02026         printf("number of transactions actually processed: %d\n",
02027                normal_xacts);
02028     }
02029     printf("tps = %f (including connections establishing)\n", tps_include);
02030     printf("tps = %f (excluding connections establishing)\n", tps_exclude);
02031 
02032     /* Report per-command latencies */
02033     if (is_latencies)
02034     {
02035         int         i;
02036 
02037         for (i = 0; i < num_files; i++)
02038         {
02039             Command   **commands;
02040 
02041             if (num_files > 1)
02042                 printf("statement latencies in milliseconds, file %d:\n", i + 1);
02043             else
02044                 printf("statement latencies in milliseconds:\n");
02045 
02046             for (commands = sql_files[i]; *commands != NULL; commands++)
02047             {
02048                 Command    *command = *commands;
02049                 int         cnum = command->command_num;
02050                 double      total_time;
02051                 instr_time  total_exec_elapsed;
02052                 int         total_exec_count;
02053                 int         t;
02054 
02055                 /* Accumulate per-thread data for command */
02056                 INSTR_TIME_SET_ZERO(total_exec_elapsed);
02057                 total_exec_count = 0;
02058                 for (t = 0; t < nthreads; t++)
02059                 {
02060                     TState     *thread = &threads[t];
02061 
02062                     INSTR_TIME_ADD(total_exec_elapsed,
02063                                    thread->exec_elapsed[cnum]);
02064                     total_exec_count += thread->exec_count[cnum];
02065                 }
02066 
02067                 if (total_exec_count > 0)
02068                     total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
02069                 else
02070                     total_time = 0.0;
02071 
02072                 printf("\t%f\t%s\n", total_time, command->line);
02073             }
02074         }
02075     }
02076 }
02077 
02078 
02079 int
02080 main(int argc, char **argv)
02081 {
02082     static struct option long_options[] = {
02083         {"foreign-keys", no_argument, &foreign_keys, 1},
02084         {"index-tablespace", required_argument, NULL, 3},
02085         {"tablespace", required_argument, NULL, 2},
02086         {"unlogged-tables", no_argument, &unlogged_tables, 1},
02087         {"sampling-rate", required_argument, NULL, 4},
02088         {"aggregate-interval", required_argument, NULL, 5},
02089         {NULL, 0, NULL, 0}
02090     };
02091 
02092     int         c;
02093     int         nclients = 1;   /* default number of simulated clients */
02094     int         nthreads = 1;   /* default number of threads */
02095     int         is_init_mode = 0;       /* initialize mode? */
02096     int         is_no_vacuum = 0;       /* no vacuum at all before testing? */
02097     int         do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
02098     int         ttype = 0;      /* transaction type. 0: TPC-B, 1: SELECT only,
02099                                  * 2: skip update of branches and tellers */
02100     int         optindex;
02101     char       *filename = NULL;
02102     bool        scale_given = false;
02103 
02104     CState     *state;          /* status of clients */
02105     TState     *threads;        /* array of thread */
02106 
02107     instr_time  start_time;     /* start up time */
02108     instr_time  total_time;
02109     instr_time  conn_total_time;
02110     int         total_xacts;
02111 
02112     int         i;
02113 
02114 #ifdef HAVE_GETRLIMIT
02115     struct rlimit rlim;
02116 #endif
02117 
02118     PGconn     *con;
02119     PGresult   *res;
02120     char       *env;
02121 
02122     char        val[64];
02123 
02124     progname = get_progname(argv[0]);
02125 
02126     if (argc > 1)
02127     {
02128         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
02129         {
02130             usage();
02131             exit(0);
02132         }
02133         if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
02134         {
02135             puts("pgbench (PostgreSQL) " PG_VERSION);
02136             exit(0);
02137         }
02138     }
02139 
02140 #ifdef WIN32
02141     /* stderr is buffered on Win32. */
02142     setvbuf(stderr, NULL, _IONBF, 0);
02143 #endif
02144 
02145     if ((env = getenv("PGHOST")) != NULL && *env != '\0')
02146         pghost = env;
02147     if ((env = getenv("PGPORT")) != NULL && *env != '\0')
02148         pgport = env;
02149     else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
02150         login = env;
02151 
02152     state = (CState *) pg_malloc(sizeof(CState));
02153     memset(state, 0, sizeof(CState));
02154 
02155     while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
02156     {
02157         switch (c)
02158         {
02159             case 'i':
02160                 is_init_mode++;
02161                 break;
02162             case 'h':
02163                 pghost = pg_strdup(optarg);
02164                 break;
02165             case 'n':
02166                 is_no_vacuum++;
02167                 break;
02168             case 'v':
02169                 do_vacuum_accounts++;
02170                 break;
02171             case 'p':
02172                 pgport = pg_strdup(optarg);
02173                 break;
02174             case 'd':
02175                 debug++;
02176                 break;
02177             case 'S':
02178                 ttype = 1;
02179                 break;
02180             case 'N':
02181                 ttype = 2;
02182                 break;
02183             case 'c':
02184                 nclients = atoi(optarg);
02185                 if (nclients <= 0 || nclients > MAXCLIENTS)
02186                 {
02187                     fprintf(stderr, "invalid number of clients: %d\n", nclients);
02188                     exit(1);
02189                 }
02190 #ifdef HAVE_GETRLIMIT
02191 #ifdef RLIMIT_NOFILE            /* most platforms use RLIMIT_NOFILE */
02192                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
02193 #else                           /* but BSD doesn't ... */
02194                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
02195 #endif   /* RLIMIT_NOFILE */
02196                 {
02197                     fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
02198                     exit(1);
02199                 }
02200                 if (rlim.rlim_cur <= (nclients + 2))
02201                 {
02202                     fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
02203                     fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
02204                     exit(1);
02205                 }
02206 #endif   /* HAVE_GETRLIMIT */
02207                 break;
02208             case 'j':           /* jobs */
02209                 nthreads = atoi(optarg);
02210                 if (nthreads <= 0)
02211                 {
02212                     fprintf(stderr, "invalid number of threads: %d\n", nthreads);
02213                     exit(1);
02214                 }
02215                 break;
02216             case 'C':
02217                 is_connect = true;
02218                 break;
02219             case 'r':
02220                 is_latencies = true;
02221                 break;
02222             case 's':
02223                 scale_given = true;
02224                 scale = atoi(optarg);
02225                 if (scale <= 0)
02226                 {
02227                     fprintf(stderr, "invalid scaling factor: %d\n", scale);
02228                     exit(1);
02229                 }
02230                 break;
02231             case 't':
02232                 if (duration > 0)
02233                 {
02234                     fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
02235                     exit(1);
02236                 }
02237                 nxacts = atoi(optarg);
02238                 if (nxacts <= 0)
02239                 {
02240                     fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
02241                     exit(1);
02242                 }
02243                 break;
02244             case 'T':
02245                 if (nxacts > 0)
02246                 {
02247                     fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
02248                     exit(1);
02249                 }
02250                 duration = atoi(optarg);
02251                 if (duration <= 0)
02252                 {
02253                     fprintf(stderr, "invalid duration: %d\n", duration);
02254                     exit(1);
02255                 }
02256                 break;
02257             case 'U':
02258                 login = pg_strdup(optarg);
02259                 break;
02260             case 'l':
02261                 use_log = true;
02262                 break;
02263             case 'q':
02264                 use_quiet = true;
02265                 break;
02266             case 'f':
02267                 ttype = 3;
02268                 filename = pg_strdup(optarg);
02269                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
02270                     exit(1);
02271                 break;
02272             case 'D':
02273                 {
02274                     char       *p;
02275 
02276                     if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
02277                     {
02278                         fprintf(stderr, "invalid variable definition: %s\n", optarg);
02279                         exit(1);
02280                     }
02281 
02282                     *p++ = '\0';
02283                     if (!putVariable(&state[0], "option", optarg, p))
02284                         exit(1);
02285                 }
02286                 break;
02287             case 'F':
02288                 fillfactor = atoi(optarg);
02289                 if ((fillfactor < 10) || (fillfactor > 100))
02290                 {
02291                     fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
02292                     exit(1);
02293                 }
02294                 break;
02295             case 'M':
02296                 if (num_files > 0)
02297                 {
02298                     fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
02299                     exit(1);
02300                 }
02301                 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
02302                     if (strcmp(optarg, QUERYMODE[querymode]) == 0)
02303                         break;
02304                 if (querymode >= NUM_QUERYMODE)
02305                 {
02306                     fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
02307                     exit(1);
02308                 }
02309                 break;
02310             case 0:
02311                 /* This covers long options which take no argument. */
02312                 break;
02313             case 2:             /* tablespace */
02314                 tablespace = pg_strdup(optarg);
02315                 break;
02316             case 3:             /* index-tablespace */
02317                 index_tablespace = pg_strdup(optarg);
02318                 break;
02319             case 4:
02320                 sample_rate = atof(optarg);
02321                 if (sample_rate <= 0.0 || sample_rate > 1.0)
02322                 {
02323                     fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
02324                     exit(1);
02325                 }
02326                 break;
02327             case 5:
02328 #ifdef WIN32
02329                 fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
02330                 exit(1);
02331 #else
02332                 agg_interval = atoi(optarg);
02333                 if (agg_interval <= 0)
02334                 {
02335                     fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
02336                     exit(1);
02337                 }
02338 #endif
02339                 break;
02340             default:
02341                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
02342                 exit(1);
02343                 break;
02344         }
02345     }
02346 
02347     if (argc > optind)
02348         dbName = argv[optind];
02349     else
02350     {
02351         if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
02352             dbName = env;
02353         else if (login != NULL && *login != '\0')
02354             dbName = login;
02355         else
02356             dbName = "";
02357     }
02358 
02359     if (is_init_mode)
02360     {
02361         init(is_no_vacuum);
02362         exit(0);
02363     }
02364 
02365     /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
02366     if (nxacts <= 0 && duration <= 0)
02367         nxacts = DEFAULT_NXACTS;
02368 
02369     if (nclients % nthreads != 0)
02370     {
02371         fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
02372         exit(1);
02373     }
02374 
02375     /* --sampling-rate may be used only with -l */
02376     if (sample_rate > 0.0 && !use_log)
02377     {
02378         fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
02379         exit(1);
02380     }
02381 
02382     /* -q may be used only with -i */
02383     if (use_quiet && !is_init_mode)
02384     {
02385         fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
02386         exit(1);
02387     }
02388 
02389     /* --sampling-rate may must not be used with --aggregate-interval */
02390     if (sample_rate > 0.0 && agg_interval > 0)
02391     {
02392         fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
02393         exit(1);
02394     }
02395 
02396     if (agg_interval > 0 && (! use_log)) {
02397         fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
02398         exit(1);
02399     }
02400 
02401     if ((duration > 0) && (agg_interval > duration)) {
02402         fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
02403         exit(1);
02404     }
02405 
02406     if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0)) {
02407         fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
02408         exit(1);
02409     }
02410 
02411     /*
02412      * is_latencies only works with multiple threads in thread-based
02413      * implementations, not fork-based ones, because it supposes that the
02414      * parent can see changes made to the per-thread execution stats by child
02415      * threads.  It seems useful enough to accept despite this limitation, but
02416      * perhaps we should FIXME someday (by passing the stats data back up
02417      * through the parent-to-child pipes).
02418      */
02419 #ifndef ENABLE_THREAD_SAFETY
02420     if (is_latencies && nthreads > 1)
02421     {
02422         fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
02423         exit(1);
02424     }
02425 #endif
02426 
02427     /*
02428      * save main process id in the global variable because process id will be
02429      * changed after fork.
02430      */
02431     main_pid = (int) getpid();
02432 
02433     if (nclients > 1)
02434     {
02435         state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
02436         memset(state + 1, 0, sizeof(CState) * (nclients - 1));
02437 
02438         /* copy any -D switch values to all clients */
02439         for (i = 1; i < nclients; i++)
02440         {
02441             int         j;
02442 
02443             state[i].id = i;
02444             for (j = 0; j < state[0].nvariables; j++)
02445             {
02446                 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
02447                     exit(1);
02448             }
02449         }
02450     }
02451 
02452     if (debug)
02453     {
02454         if (duration <= 0)
02455             printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
02456                    pghost, pgport, nclients, nxacts, dbName);
02457         else
02458             printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
02459                    pghost, pgport, nclients, duration, dbName);
02460     }
02461 
02462     /* opening connection... */
02463     con = doConnect();
02464     if (con == NULL)
02465         exit(1);
02466 
02467     if (PQstatus(con) == CONNECTION_BAD)
02468     {
02469         fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
02470         fprintf(stderr, "%s", PQerrorMessage(con));
02471         exit(1);
02472     }
02473 
02474     if (ttype != 3)
02475     {
02476         /*
02477          * get the scaling factor that should be same as count(*) from
02478          * pgbench_branches if this is not a custom query
02479          */
02480         res = PQexec(con, "select count(*) from pgbench_branches");
02481         if (PQresultStatus(res) != PGRES_TUPLES_OK)
02482         {
02483             fprintf(stderr, "%s", PQerrorMessage(con));
02484             exit(1);
02485         }
02486         scale = atoi(PQgetvalue(res, 0, 0));
02487         if (scale < 0)
02488         {
02489             fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
02490             exit(1);
02491         }
02492         PQclear(res);
02493 
02494         /* warn if we override user-given -s switch */
02495         if (scale_given)
02496             fprintf(stderr,
02497             "Scale option ignored, using pgbench_branches table count = %d\n",
02498                     scale);
02499     }
02500 
02501     /*
02502      * :scale variables normally get -s or database scale, but don't override
02503      * an explicit -D switch
02504      */
02505     if (getVariable(&state[0], "scale") == NULL)
02506     {
02507         snprintf(val, sizeof(val), "%d", scale);
02508         for (i = 0; i < nclients; i++)
02509         {
02510             if (!putVariable(&state[i], "startup", "scale", val))
02511                 exit(1);
02512         }
02513     }
02514 
02515     if (!is_no_vacuum)
02516     {
02517         fprintf(stderr, "starting vacuum...");
02518         executeStatement(con, "vacuum pgbench_branches");
02519         executeStatement(con, "vacuum pgbench_tellers");
02520         executeStatement(con, "truncate pgbench_history");
02521         fprintf(stderr, "end.\n");
02522 
02523         if (do_vacuum_accounts)
02524         {
02525             fprintf(stderr, "starting vacuum pgbench_accounts...");
02526             executeStatement(con, "vacuum analyze pgbench_accounts");
02527             fprintf(stderr, "end.\n");
02528         }
02529     }
02530     PQfinish(con);
02531 
02532     /* set random seed */
02533     INSTR_TIME_SET_CURRENT(start_time);
02534     srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
02535 
02536     /* process builtin SQL scripts */
02537     switch (ttype)
02538     {
02539         case 0:
02540             sql_files[0] = process_builtin(tpc_b);
02541             num_files = 1;
02542             break;
02543 
02544         case 1:
02545             sql_files[0] = process_builtin(select_only);
02546             num_files = 1;
02547             break;
02548 
02549         case 2:
02550             sql_files[0] = process_builtin(simple_update);
02551             num_files = 1;
02552             break;
02553 
02554         default:
02555             break;
02556     }
02557 
02558     /* set up thread data structures */
02559     threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
02560     for (i = 0; i < nthreads; i++)
02561     {
02562         TState     *thread = &threads[i];
02563 
02564         thread->tid = i;
02565         thread->state = &state[nclients / nthreads * i];
02566         thread->nstate = nclients / nthreads;
02567         thread->random_state[0] = random();
02568         thread->random_state[1] = random();
02569         thread->random_state[2] = random();
02570 
02571         if (is_latencies)
02572         {
02573             /* Reserve memory for the thread to store per-command latencies */
02574             int         t;
02575 
02576             thread->exec_elapsed = (instr_time *)
02577                 pg_malloc(sizeof(instr_time) * num_commands);
02578             thread->exec_count = (int *)
02579                 pg_malloc(sizeof(int) * num_commands);
02580 
02581             for (t = 0; t < num_commands; t++)
02582             {
02583                 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
02584                 thread->exec_count[t] = 0;
02585             }
02586         }
02587         else
02588         {
02589             thread->exec_elapsed = NULL;
02590             thread->exec_count = NULL;
02591         }
02592     }
02593 
02594     /* get start up time */
02595     INSTR_TIME_SET_CURRENT(start_time);
02596 
02597     /* set alarm if duration is specified. */
02598     if (duration > 0)
02599         setalarm(duration);
02600 
02601     /* start threads */
02602     for (i = 0; i < nthreads; i++)
02603     {
02604         TState     *thread = &threads[i];
02605 
02606         INSTR_TIME_SET_CURRENT(thread->start_time);
02607 
02608         /* the first thread (i = 0) is executed by main thread */
02609         if (i > 0)
02610         {
02611             int         err = pthread_create(&thread->thread, NULL, threadRun, thread);
02612 
02613             if (err != 0 || thread->thread == INVALID_THREAD)
02614             {
02615                 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
02616                 exit(1);
02617             }
02618         }
02619         else
02620         {
02621             thread->thread = INVALID_THREAD;
02622         }
02623     }
02624 
02625     /* wait for threads and accumulate results */
02626     total_xacts = 0;
02627     INSTR_TIME_SET_ZERO(conn_total_time);
02628     for (i = 0; i < nthreads; i++)
02629     {
02630         void       *ret = NULL;
02631 
02632         if (threads[i].thread == INVALID_THREAD)
02633             ret = threadRun(&threads[i]);
02634         else
02635             pthread_join(threads[i].thread, &ret);
02636 
02637         if (ret != NULL)
02638         {
02639             TResult    *r = (TResult *) ret;
02640 
02641             total_xacts += r->xacts;
02642             INSTR_TIME_ADD(conn_total_time, r->conn_time);
02643             free(ret);
02644         }
02645     }
02646     disconnect_all(state, nclients);
02647 
02648     /* get end time */
02649     INSTR_TIME_SET_CURRENT(total_time);
02650     INSTR_TIME_SUBTRACT(total_time, start_time);
02651     printResults(ttype, total_xacts, nclients, threads, nthreads,
02652                  total_time, conn_total_time);
02653 
02654     return 0;
02655 }
02656 
02657 static void *
02658 threadRun(void *arg)
02659 {
02660     TState     *thread = (TState *) arg;
02661     CState     *state = thread->state;
02662     TResult    *result;
02663     FILE       *logfile = NULL; /* per-thread log file */
02664     instr_time  start,
02665                 end;
02666     int         nstate = thread->nstate;
02667     int         remains = nstate;       /* number of remaining clients */
02668     int         i;
02669 
02670     AggVals     aggs;
02671 
02672     result = pg_malloc(sizeof(TResult));
02673     
02674     INSTR_TIME_SET_ZERO(result->conn_time);
02675 
02676     /* open log file if requested */
02677     if (use_log)
02678     {
02679         char        logpath[64];
02680 
02681         if (thread->tid == 0)
02682             snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
02683         else
02684             snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
02685         logfile = fopen(logpath, "w");
02686 
02687         if (logfile == NULL)
02688         {
02689             fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
02690             goto done;
02691         }
02692     }
02693 
02694     if (!is_connect)
02695     {
02696         /* make connections to the database */
02697         for (i = 0; i < nstate; i++)
02698         {
02699             if ((state[i].con = doConnect()) == NULL)
02700                 goto done;
02701         }
02702     }
02703 
02704     /* time after thread and connections set up */
02705     INSTR_TIME_SET_CURRENT(result->conn_time);
02706     INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
02707 
02708     agg_vals_init(&aggs, thread->start_time);
02709     
02710     /* send start up queries in async manner */
02711     for (i = 0; i < nstate; i++)
02712     {
02713         CState     *st = &state[i];
02714         Command   **commands = sql_files[st->use_file];
02715         int         prev_ecnt = st->ecnt;
02716 
02717         st->use_file = getrand(thread, 0, num_files - 1);
02718         if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
02719             remains--;          /* I've aborted */
02720 
02721         if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
02722         {
02723             fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
02724             remains--;          /* I've aborted */
02725             PQfinish(st->con);
02726             st->con = NULL;
02727         }
02728     }
02729 
02730     while (remains > 0)
02731     {
02732         fd_set      input_mask;
02733         int         maxsock;    /* max socket number to be waited */
02734         int64       now_usec = 0;
02735         int64       min_usec;
02736 
02737         FD_ZERO(&input_mask);
02738 
02739         maxsock = -1;
02740         min_usec = INT64_MAX;
02741         for (i = 0; i < nstate; i++)
02742         {
02743             CState     *st = &state[i];
02744             Command   **commands = sql_files[st->use_file];
02745             int         sock;
02746 
02747             if (st->sleeping)
02748             {
02749                 int         this_usec;
02750 
02751                 if (min_usec == INT64_MAX)
02752                 {
02753                     instr_time  now;
02754 
02755                     INSTR_TIME_SET_CURRENT(now);
02756                     now_usec = INSTR_TIME_GET_MICROSEC(now);
02757                 }
02758 
02759                 this_usec = st->until - now_usec;
02760                 if (min_usec > this_usec)
02761                     min_usec = this_usec;
02762             }
02763             else if (st->con == NULL)
02764             {
02765                 continue;
02766             }
02767             else if (commands[st->state]->type == META_COMMAND)
02768             {
02769                 min_usec = 0;   /* the connection is ready to run */
02770                 break;
02771             }
02772 
02773             sock = PQsocket(st->con);
02774             if (sock < 0)
02775             {
02776                 fprintf(stderr, "bad socket: %s\n", strerror(errno));
02777                 goto done;
02778             }
02779 
02780             FD_SET(sock, &input_mask);
02781 
02782             if (maxsock < sock)
02783                 maxsock = sock;
02784         }
02785 
02786         if (min_usec > 0 && maxsock != -1)
02787         {
02788             int         nsocks; /* return from select(2) */
02789 
02790             if (min_usec != INT64_MAX)
02791             {
02792                 struct timeval timeout;
02793 
02794                 timeout.tv_sec = min_usec / 1000000;
02795                 timeout.tv_usec = min_usec % 1000000;
02796                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
02797             }
02798             else
02799                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
02800             if (nsocks < 0)
02801             {
02802                 if (errno == EINTR)
02803                     continue;
02804                 /* must be something wrong */
02805                 fprintf(stderr, "select failed: %s\n", strerror(errno));
02806                 goto done;
02807             }
02808         }
02809 
02810         /* ok, backend returns reply */
02811         for (i = 0; i < nstate; i++)
02812         {
02813             CState     *st = &state[i];
02814             Command   **commands = sql_files[st->use_file];
02815             int         prev_ecnt = st->ecnt;
02816 
02817             if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
02818                             || commands[st->state]->type == META_COMMAND))
02819             {
02820                 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
02821                     remains--;  /* I've aborted */
02822             }
02823 
02824             if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
02825             {
02826                 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
02827                 remains--;      /* I've aborted */
02828                 PQfinish(st->con);
02829                 st->con = NULL;
02830             }
02831         }
02832     }
02833 
02834 done:
02835     INSTR_TIME_SET_CURRENT(start);
02836     disconnect_all(state, nstate);
02837     result->xacts = 0;
02838     for (i = 0; i < nstate; i++)
02839         result->xacts += state[i].cnt;
02840     INSTR_TIME_SET_CURRENT(end);
02841     INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
02842     if (logfile)
02843         fclose(logfile);
02844     return result;
02845 }
02846 
02847 /*
02848  * Support for duration option: set timer_exceeded after so many seconds.
02849  */
02850 
02851 #ifndef WIN32
02852 
02853 static void
02854 handle_sig_alarm(SIGNAL_ARGS)
02855 {
02856     timer_exceeded = true;
02857 }
02858 
02859 static void
02860 setalarm(int seconds)
02861 {
02862     pqsignal(SIGALRM, handle_sig_alarm);
02863     alarm(seconds);
02864 }
02865 
02866 #ifndef ENABLE_THREAD_SAFETY
02867 
02868 /*
02869  * implements pthread using fork.
02870  */
02871 
02872 typedef struct fork_pthread
02873 {
02874     pid_t       pid;
02875     int         pipes[2];
02876 }   fork_pthread;
02877 
02878 static int
02879 pthread_create(pthread_t *thread,
02880                pthread_attr_t *attr,
02881                void *(*start_routine) (void *),
02882                void *arg)
02883 {
02884     fork_pthread *th;
02885     void       *ret;
02886 
02887     th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
02888     if (pipe(th->pipes) < 0)
02889     {
02890         free(th);
02891         return errno;
02892     }
02893 
02894     th->pid = fork();
02895     if (th->pid == -1)          /* error */
02896     {
02897         free(th);
02898         return errno;
02899     }
02900     if (th->pid != 0)           /* in parent process */
02901     {
02902         close(th->pipes[1]);
02903         *thread = th;
02904         return 0;
02905     }
02906 
02907     /* in child process */
02908     close(th->pipes[0]);
02909 
02910     /* set alarm again because the child does not inherit timers */
02911     if (duration > 0)
02912         setalarm(duration);
02913 
02914     ret = start_routine(arg);
02915     write(th->pipes[1], ret, sizeof(TResult));
02916     close(th->pipes[1]);
02917     free(th);
02918     exit(0);
02919 }
02920 
02921 static int
02922 pthread_join(pthread_t th, void **thread_return)
02923 {
02924     int         status;
02925 
02926     while (waitpid(th->pid, &status, 0) != th->pid)
02927     {
02928         if (errno != EINTR)
02929             return errno;
02930     }
02931 
02932     if (thread_return != NULL)
02933     {
02934         /* assume result is TResult */
02935         *thread_return = pg_malloc(sizeof(TResult));
02936         if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
02937         {
02938             free(*thread_return);
02939             *thread_return = NULL;
02940         }
02941     }
02942     close(th->pipes[0]);
02943 
02944     free(th);
02945     return 0;
02946 }
02947 #endif
02948 #else                           /* WIN32 */
02949 
02950 static VOID CALLBACK
02951 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
02952 {
02953     timer_exceeded = true;
02954 }
02955 
02956 static void
02957 setalarm(int seconds)
02958 {
02959     HANDLE      queue;
02960     HANDLE      timer;
02961 
02962     /* This function will be called at most once, so we can cheat a bit. */
02963     queue = CreateTimerQueue();
02964     if (seconds > ((DWORD) -1) / 1000 ||
02965         !CreateTimerQueueTimer(&timer, queue,
02966                                win32_timer_callback, NULL, seconds * 1000, 0,
02967                                WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
02968     {
02969         fprintf(stderr, "Failed to set timer\n");
02970         exit(1);
02971     }
02972 }
02973 
02974 /* partial pthread implementation for Windows */
02975 
02976 typedef struct win32_pthread
02977 {
02978     HANDLE      handle;
02979     void       *(*routine) (void *);
02980     void       *arg;
02981     void       *result;
02982 } win32_pthread;
02983 
02984 static unsigned __stdcall
02985 win32_pthread_run(void *arg)
02986 {
02987     win32_pthread *th = (win32_pthread *) arg;
02988 
02989     th->result = th->routine(th->arg);
02990 
02991     return 0;
02992 }
02993 
02994 static int
02995 pthread_create(pthread_t *thread,
02996                pthread_attr_t *attr,
02997                void *(*start_routine) (void *),
02998                void *arg)
02999 {
03000     int         save_errno;
03001     win32_pthread *th;
03002 
03003     th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
03004     th->routine = start_routine;
03005     th->arg = arg;
03006     th->result = NULL;
03007 
03008     th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
03009     if (th->handle == NULL)
03010     {
03011         save_errno = errno;
03012         free(th);
03013         return save_errno;
03014     }
03015 
03016     *thread = th;
03017     return 0;
03018 }
03019 
03020 static int
03021 pthread_join(pthread_t th, void **thread_return)
03022 {
03023     if (th == NULL || th->handle == NULL)
03024         return errno = EINVAL;
03025 
03026     if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
03027     {
03028         _dosmaperr(GetLastError());
03029         return errno;
03030     }
03031 
03032     if (thread_return)
03033         *thread_return = th->result;
03034 
03035     CloseHandle(th->handle);
03036     free(th);
03037     return 0;
03038 }
03039 
03040 #endif   /* WIN32 */