00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifdef WIN32
00031 #define FD_SETSIZE 1024
00032 #endif
00033
00034 #include "postgres_fe.h"
00035
00036 #include "getopt_long.h"
00037 #include "libpq-fe.h"
00038 #include "portability/instr_time.h"
00039
00040 #include <ctype.h>
00041 #include <math.h>
00042 #include <signal.h>
00043
00044 #ifndef WIN32
00045 #include <sys/time.h>
00046 #include <unistd.h>
00047 #endif
00048
00049 #ifdef HAVE_SYS_SELECT_H
00050 #include <sys/select.h>
00051 #endif
00052
00053 #ifdef HAVE_SYS_RESOURCE_H
00054 #include <sys/resource.h>
00055 #endif
00056
00057 #ifndef INT64_MAX
00058 #define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF)
00059 #endif
00060
00061
00062
00063
00064
00065 #ifdef WIN32
00066
00067 typedef struct win32_pthread *pthread_t;
00068 typedef int pthread_attr_t;
00069
00070 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
00071 static int pthread_join(pthread_t th, void **thread_return);
00072 #elif defined(ENABLE_THREAD_SAFETY)
00073
00074 #include <pthread.h>
00075 #else
00076
00077
00078 #include <sys/wait.h>
00079
00080 #define pthread_t pg_pthread_t
00081 #define pthread_attr_t pg_pthread_attr_t
00082 #define pthread_create pg_pthread_create
00083 #define pthread_join pg_pthread_join
00084
00085 typedef struct fork_pthread *pthread_t;
00086 typedef int pthread_attr_t;
00087
00088 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
00089 static int pthread_join(pthread_t th, void **thread_return);
00090 #endif
00091
00092 extern char *optarg;
00093 extern int optind;
00094
00095
00096
00097
00098
00099
00100 #ifdef FD_SETSIZE
00101 #define MAXCLIENTS (FD_SETSIZE - 10)
00102 #else
00103 #define MAXCLIENTS 1024
00104 #endif
00105
00106 #define LOG_STEP_SECONDS 5
00107 #define DEFAULT_NXACTS 10
00108
00109 int nxacts = 0;
00110 int duration = 0;
00111
00112
00113
00114
00115
00116 int scale = 1;
00117
00118
00119
00120
00121
00122 int fillfactor = 100;
00123
00124
00125
00126
00127 int foreign_keys = 0;
00128
00129
00130
00131
00132 int unlogged_tables = 0;
00133
00134
00135
00136
00137 double sample_rate = 0.0;
00138
00139
00140
00141
00142 char *tablespace = NULL;
00143 char *index_tablespace = NULL;
00144
00145
00146
00147
00148
00149 #define nbranches 1
00150
00151 #define ntellers 10
00152 #define naccounts 100000
00153
00154
00155
00156
00157
00158
00159
00160
00161 #define SCALE_32BIT_THRESHOLD 20000
00162
00163 bool use_log;
00164 bool use_quiet;
00165 int agg_interval;
00166 bool is_connect;
00167 bool is_latencies;
00168 int main_pid;
00169
00170 char *pghost = "";
00171 char *pgport = "";
00172 char *login = NULL;
00173 char *dbName;
00174 const char *progname;
00175
00176 volatile bool timer_exceeded = false;
00177
00178
00179 typedef struct
00180 {
00181 char *name;
00182 char *value;
00183 } Variable;
00184
00185 #define MAX_FILES 128
00186 #define SHELL_COMMAND_SIZE 256
00187
00188
00189
00190
00191
00192 typedef struct
00193 {
00194 PGconn *con;
00195 int id;
00196 int state;
00197 int cnt;
00198 int ecnt;
00199 int listen;
00200
00201 int sleeping;
00202 int64 until;
00203 Variable *variables;
00204 int nvariables;
00205 instr_time txn_begin;
00206 instr_time stmt_begin;
00207 int use_file;
00208 bool prepared[MAX_FILES];
00209 } CState;
00210
00211
00212
00213
00214 typedef struct
00215 {
00216 int tid;
00217 pthread_t thread;
00218 CState *state;
00219 int nstate;
00220 instr_time start_time;
00221 instr_time *exec_elapsed;
00222 int *exec_count;
00223 unsigned short random_state[3];
00224 } TState;
00225
00226 #define INVALID_THREAD ((pthread_t) 0)
00227
00228 typedef struct
00229 {
00230 instr_time conn_time;
00231 int xacts;
00232 } TResult;
00233
00234
00235
00236
00237 #define SQL_COMMAND 1
00238 #define META_COMMAND 2
00239 #define MAX_ARGS 10
00240
00241 typedef enum QueryMode
00242 {
00243 QUERY_SIMPLE,
00244 QUERY_EXTENDED,
00245 QUERY_PREPARED,
00246 NUM_QUERYMODE
00247 } QueryMode;
00248
00249 static QueryMode querymode = QUERY_SIMPLE;
00250 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
00251
00252 typedef struct
00253 {
00254 char *line;
00255 int command_num;
00256 int type;
00257 int argc;
00258 char *argv[MAX_ARGS];
00259 } Command;
00260
00261 typedef struct
00262 {
00263
00264 long start_time;
00265 int cnt;
00266 double min_duration;
00267 double max_duration;
00268 double sum;
00269 double sum2;
00270
00271 } AggVals;
00272
00273 static Command **sql_files[MAX_FILES];
00274 static int num_files;
00275 static int num_commands = 0;
00276 static int debug = 0;
00277
00278
00279 static char *tpc_b = {
00280 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
00281 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
00282 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
00283 "\\setrandom aid 1 :naccounts\n"
00284 "\\setrandom bid 1 :nbranches\n"
00285 "\\setrandom tid 1 :ntellers\n"
00286 "\\setrandom delta -5000 5000\n"
00287 "BEGIN;\n"
00288 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
00289 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
00290 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
00291 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
00292 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
00293 "END;\n"
00294 };
00295
00296
00297 static char *simple_update = {
00298 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
00299 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
00300 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
00301 "\\setrandom aid 1 :naccounts\n"
00302 "\\setrandom bid 1 :nbranches\n"
00303 "\\setrandom tid 1 :ntellers\n"
00304 "\\setrandom delta -5000 5000\n"
00305 "BEGIN;\n"
00306 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
00307 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
00308 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
00309 "END;\n"
00310 };
00311
00312
00313 static char *select_only = {
00314 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
00315 "\\setrandom aid 1 :naccounts\n"
00316 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
00317 };
00318
00319
00320 static void setalarm(int seconds);
00321 static void *threadRun(void *arg);
00322
00323 static void
00324 usage(void)
00325 {
00326 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
00327 "Usage:\n"
00328 " %s [OPTION]... [DBNAME]\n"
00329 "\nInitialization options:\n"
00330 " -i invokes initialization mode\n"
00331 " -n do not run VACUUM after initialization\n"
00332 " -F NUM fill factor\n"
00333 " -s NUM scaling factor\n"
00334 " -q quiet logging (one message each 5 seconds)\n"
00335 " --foreign-keys\n"
00336 " create foreign key constraints between tables\n"
00337 " --index-tablespace=TABLESPACE\n"
00338 " create indexes in the specified tablespace\n"
00339 " --tablespace=TABLESPACE\n"
00340 " create tables in the specified tablespace\n"
00341 " --unlogged-tables\n"
00342 " create tables as unlogged tables\n"
00343 "\nBenchmarking options:\n"
00344 " -c NUM number of concurrent database clients (default: 1)\n"
00345 " -C establish new connection for each transaction\n"
00346 " -D VARNAME=VALUE\n"
00347 " define variable for use by custom script\n"
00348 " -f FILENAME read transaction script from FILENAME\n"
00349 " -j NUM number of threads (default: 1)\n"
00350 " -l write transaction times to log file\n"
00351 " --sampling-rate NUM\n"
00352 " fraction of transactions to log (e.g. 0.01 for 1%% sample)\n"
00353 " --aggregate-interval NUM\n"
00354 " aggregate data over NUM seconds\n"
00355 " -M simple|extended|prepared\n"
00356 " protocol for submitting queries to server (default: simple)\n"
00357 " -n do not run VACUUM before tests\n"
00358 " -N do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
00359 " -r report average latency per command\n"
00360 " -s NUM report this scale factor in output\n"
00361 " -S perform SELECT-only transactions\n"
00362 " -t NUM number of transactions each client runs (default: 10)\n"
00363 " -T NUM duration of benchmark test in seconds\n"
00364 " -v vacuum all four standard tables before tests\n"
00365 "\nCommon options:\n"
00366 " -d print debugging output\n"
00367 " -h HOSTNAME database server host or socket directory\n"
00368 " -p PORT database server port number\n"
00369 " -U USERNAME connect as specified database user\n"
00370 " -V, --version output version information, then exit\n"
00371 " -?, --help show this help, then exit\n"
00372 "\n"
00373 "Report bugs to <[email protected]>.\n",
00374 progname, progname);
00375 }
00376
00377
00378
00379
00380
00381
00382
00383 static int64
00384 strtoint64(const char *str)
00385 {
00386 const char *ptr = str;
00387 int64 result = 0;
00388 int sign = 1;
00389
00390
00391
00392
00393
00394
00395
00396 while (*ptr && isspace((unsigned char) *ptr))
00397 ptr++;
00398
00399
00400 if (*ptr == '-')
00401 {
00402 ptr++;
00403
00404
00405
00406
00407
00408 if (strncmp(ptr, "9223372036854775808", 19) == 0)
00409 {
00410 result = -INT64CONST(0x7fffffffffffffff) - 1;
00411 ptr += 19;
00412 goto gotdigits;
00413 }
00414 sign = -1;
00415 }
00416 else if (*ptr == '+')
00417 ptr++;
00418
00419
00420 if (!isdigit((unsigned char) *ptr))
00421 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
00422
00423
00424 while (*ptr && isdigit((unsigned char) *ptr))
00425 {
00426 int64 tmp = result * 10 + (*ptr++ - '0');
00427
00428 if ((tmp / 10) != result)
00429 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
00430 result = tmp;
00431 }
00432
00433 gotdigits:
00434
00435
00436 while (*ptr != '\0' && isspace((unsigned char) *ptr))
00437 ptr++;
00438
00439 if (*ptr != '\0')
00440 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
00441
00442 return ((sign < 0) ? -result : result);
00443 }
00444
00445
00446 static int64
00447 getrand(TState *thread, int64 min, int64 max)
00448 {
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
00459 }
00460
00461
00462 static void
00463 executeStatement(PGconn *con, const char *sql)
00464 {
00465 PGresult *res;
00466
00467 res = PQexec(con, sql);
00468 if (PQresultStatus(res) != PGRES_COMMAND_OK)
00469 {
00470 fprintf(stderr, "%s", PQerrorMessage(con));
00471 exit(1);
00472 }
00473 PQclear(res);
00474 }
00475
00476
00477 static PGconn *
00478 doConnect(void)
00479 {
00480 PGconn *conn;
00481 static char *password = NULL;
00482 bool new_pass;
00483
00484
00485
00486
00487
00488 do
00489 {
00490 #define PARAMS_ARRAY_SIZE 7
00491
00492 const char *keywords[PARAMS_ARRAY_SIZE];
00493 const char *values[PARAMS_ARRAY_SIZE];
00494
00495 keywords[0] = "host";
00496 values[0] = pghost;
00497 keywords[1] = "port";
00498 values[1] = pgport;
00499 keywords[2] = "user";
00500 values[2] = login;
00501 keywords[3] = "password";
00502 values[3] = password;
00503 keywords[4] = "dbname";
00504 values[4] = dbName;
00505 keywords[5] = "fallback_application_name";
00506 values[5] = progname;
00507 keywords[6] = NULL;
00508 values[6] = NULL;
00509
00510 new_pass = false;
00511
00512 conn = PQconnectdbParams(keywords, values, true);
00513
00514 if (!conn)
00515 {
00516 fprintf(stderr, "Connection to database \"%s\" failed\n",
00517 dbName);
00518 return NULL;
00519 }
00520
00521 if (PQstatus(conn) == CONNECTION_BAD &&
00522 PQconnectionNeedsPassword(conn) &&
00523 password == NULL)
00524 {
00525 PQfinish(conn);
00526 password = simple_prompt("Password: ", 100, false);
00527 new_pass = true;
00528 }
00529 } while (new_pass);
00530
00531
00532 if (PQstatus(conn) == CONNECTION_BAD)
00533 {
00534 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
00535 dbName, PQerrorMessage(conn));
00536 PQfinish(conn);
00537 return NULL;
00538 }
00539
00540 return conn;
00541 }
00542
00543
00544 static void
00545 discard_response(CState *state)
00546 {
00547 PGresult *res;
00548
00549 do
00550 {
00551 res = PQgetResult(state->con);
00552 if (res)
00553 PQclear(res);
00554 } while (res);
00555 }
00556
00557 static int
00558 compareVariables(const void *v1, const void *v2)
00559 {
00560 return strcmp(((const Variable *) v1)->name,
00561 ((const Variable *) v2)->name);
00562 }
00563
00564 static char *
00565 getVariable(CState *st, char *name)
00566 {
00567 Variable key,
00568 *var;
00569
00570
00571 if (st->nvariables <= 0)
00572 return NULL;
00573
00574 key.name = name;
00575 var = (Variable *) bsearch((void *) &key,
00576 (void *) st->variables,
00577 st->nvariables,
00578 sizeof(Variable),
00579 compareVariables);
00580 if (var != NULL)
00581 return var->value;
00582 else
00583 return NULL;
00584 }
00585
00586
00587 static bool
00588 isLegalVariableName(const char *name)
00589 {
00590 int i;
00591
00592 for (i = 0; name[i] != '\0'; i++)
00593 {
00594 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
00595 return false;
00596 }
00597
00598 return true;
00599 }
00600
00601 static int
00602 putVariable(CState *st, const char *context, char *name, char *value)
00603 {
00604 Variable key,
00605 *var;
00606
00607 key.name = name;
00608
00609 if (st->nvariables > 0)
00610 var = (Variable *) bsearch((void *) &key,
00611 (void *) st->variables,
00612 st->nvariables,
00613 sizeof(Variable),
00614 compareVariables);
00615 else
00616 var = NULL;
00617
00618 if (var == NULL)
00619 {
00620 Variable *newvars;
00621
00622
00623
00624
00625
00626 if (!isLegalVariableName(name))
00627 {
00628 fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
00629 return false;
00630 }
00631
00632 if (st->variables)
00633 newvars = (Variable *) pg_realloc(st->variables,
00634 (st->nvariables + 1) * sizeof(Variable));
00635 else
00636 newvars = (Variable *) pg_malloc(sizeof(Variable));
00637
00638 st->variables = newvars;
00639
00640 var = &newvars[st->nvariables];
00641
00642 var->name = pg_strdup(name);
00643 var->value = pg_strdup(value);
00644
00645 st->nvariables++;
00646
00647 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
00648 compareVariables);
00649 }
00650 else
00651 {
00652 char *val;
00653
00654
00655 val = pg_strdup(value);
00656
00657 free(var->value);
00658 var->value = val;
00659 }
00660
00661 return true;
00662 }
00663
00664 static char *
00665 parseVariable(const char *sql, int *eaten)
00666 {
00667 int i = 0;
00668 char *name;
00669
00670 do
00671 {
00672 i++;
00673 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
00674 if (i == 1)
00675 return NULL;
00676
00677 name = pg_malloc(i);
00678 memcpy(name, &sql[1], i - 1);
00679 name[i - 1] = '\0';
00680
00681 *eaten = i;
00682 return name;
00683 }
00684
00685 static char *
00686 replaceVariable(char **sql, char *param, int len, char *value)
00687 {
00688 int valueln = strlen(value);
00689
00690 if (valueln > len)
00691 {
00692 size_t offset = param - *sql;
00693
00694 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
00695 param = *sql + offset;
00696 }
00697
00698 if (valueln != len)
00699 memmove(param + valueln, param + len, strlen(param + len) + 1);
00700 strncpy(param, value, valueln);
00701
00702 return param + valueln;
00703 }
00704
00705 static char *
00706 assignVariables(CState *st, char *sql)
00707 {
00708 char *p,
00709 *name,
00710 *val;
00711
00712 p = sql;
00713 while ((p = strchr(p, ':')) != NULL)
00714 {
00715 int eaten;
00716
00717 name = parseVariable(p, &eaten);
00718 if (name == NULL)
00719 {
00720 while (*p == ':')
00721 {
00722 p++;
00723 }
00724 continue;
00725 }
00726
00727 val = getVariable(st, name);
00728 free(name);
00729 if (val == NULL)
00730 {
00731 p++;
00732 continue;
00733 }
00734
00735 p = replaceVariable(&sql, p, eaten, val);
00736 }
00737
00738 return sql;
00739 }
00740
00741 static void
00742 getQueryParams(CState *st, const Command *command, const char **params)
00743 {
00744 int i;
00745
00746 for (i = 0; i < command->argc - 1; i++)
00747 params[i] = getVariable(st, command->argv[i + 1]);
00748 }
00749
00750
00751
00752
00753
00754 static bool
00755 runShellCommand(CState *st, char *variable, char **argv, int argc)
00756 {
00757 char command[SHELL_COMMAND_SIZE];
00758 int i,
00759 len = 0;
00760 FILE *fp;
00761 char res[64];
00762 char *endptr;
00763 int retval;
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773 for (i = 0; i < argc; i++)
00774 {
00775 char *arg;
00776 int arglen;
00777
00778 if (argv[i][0] != ':')
00779 {
00780 arg = argv[i];
00781 }
00782 else if (argv[i][1] == ':')
00783 {
00784 arg = argv[i] + 1;
00785 }
00786 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
00787 {
00788 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
00789 return false;
00790 }
00791
00792 arglen = strlen(arg);
00793 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
00794 {
00795 fprintf(stderr, "%s: too long shell command\n", argv[0]);
00796 return false;
00797 }
00798
00799 if (i > 0)
00800 command[len++] = ' ';
00801 memcpy(command + len, arg, arglen);
00802 len += arglen;
00803 }
00804
00805 command[len] = '\0';
00806
00807
00808 if (variable == NULL)
00809 {
00810 if (system(command))
00811 {
00812 if (!timer_exceeded)
00813 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
00814 return false;
00815 }
00816 return true;
00817 }
00818
00819
00820 if ((fp = popen(command, "r")) == NULL)
00821 {
00822 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
00823 return false;
00824 }
00825 if (fgets(res, sizeof(res), fp) == NULL)
00826 {
00827 if (!timer_exceeded)
00828 fprintf(stderr, "%s: cannot read the result\n", argv[0]);
00829 return false;
00830 }
00831 if (pclose(fp) < 0)
00832 {
00833 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
00834 return false;
00835 }
00836
00837
00838 retval = (int) strtol(res, &endptr, 10);
00839 while (*endptr != '\0' && isspace((unsigned char) *endptr))
00840 endptr++;
00841 if (*res == '\0' || *endptr != '\0')
00842 {
00843 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
00844 return false;
00845 }
00846 snprintf(res, sizeof(res), "%d", retval);
00847 if (!putVariable(st, "setshell", variable, res))
00848 return false;
00849
00850 #ifdef DEBUG
00851 printf("shell parameter name: %s, value: %s\n", argv[1], res);
00852 #endif
00853 return true;
00854 }
00855
00856 #define MAX_PREPARE_NAME 32
00857 static void
00858 preparedStatementName(char *buffer, int file, int state)
00859 {
00860 sprintf(buffer, "P%d_%d", file, state);
00861 }
00862
00863 static bool
00864 clientDone(CState *st, bool ok)
00865 {
00866 (void) ok;
00867
00868 if (st->con != NULL)
00869 {
00870 PQfinish(st->con);
00871 st->con = NULL;
00872 }
00873 return false;
00874 }
00875
00876 static
00877 void agg_vals_init(AggVals * aggs, instr_time start)
00878 {
00879
00880 aggs->cnt = 0;
00881 aggs->sum = 0;
00882 aggs->sum2 = 0;
00883
00884
00885 aggs->min_duration = 0;
00886 aggs->max_duration = 0;
00887
00888
00889 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
00890 }
00891
00892
00893 static bool
00894 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals * agg)
00895 {
00896 PGresult *res;
00897 Command **commands;
00898
00899 top:
00900 commands = sql_files[st->use_file];
00901
00902 if (st->sleeping)
00903 {
00904 instr_time now;
00905
00906 INSTR_TIME_SET_CURRENT(now);
00907 if (st->until <= INSTR_TIME_GET_MICROSEC(now))
00908 st->sleeping = 0;
00909 else
00910 return true;
00911 }
00912
00913 if (st->listen)
00914 {
00915 if (commands[st->state]->type == SQL_COMMAND)
00916 {
00917 if (debug)
00918 fprintf(stderr, "client %d receiving\n", st->id);
00919 if (!PQconsumeInput(st->con))
00920 {
00921 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
00922 return clientDone(st, false);
00923 }
00924 if (PQisBusy(st->con))
00925 return true;
00926 }
00927
00928
00929
00930
00931
00932 if (is_latencies)
00933 {
00934 instr_time now;
00935 int cnum = commands[st->state]->command_num;
00936
00937 INSTR_TIME_SET_CURRENT(now);
00938 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
00939 now, st->stmt_begin);
00940 thread->exec_count[cnum]++;
00941 }
00942
00943
00944
00945
00946 if (logfile && commands[st->state + 1] == NULL)
00947 {
00948 instr_time now;
00949 instr_time diff;
00950 double usec;
00951
00952
00953
00954
00955
00956 if (sample_rate == 0.0 ||
00957 pg_erand48(thread->random_state) <= sample_rate)
00958 {
00959 INSTR_TIME_SET_CURRENT(now);
00960 diff = now;
00961 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
00962 usec = (double) INSTR_TIME_GET_MICROSEC(diff);
00963
00964
00965 if (agg_interval > 0)
00966 {
00967
00968
00969 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
00970 {
00971 agg->cnt += 1;
00972 agg->sum += usec;
00973 agg->sum2 += usec * usec;
00974
00975
00976 if ((agg->cnt == 1) || (usec < agg->min_duration))
00977 agg->min_duration = usec;
00978
00979 if ((agg->cnt == 1) || (usec > agg->max_duration))
00980 agg->max_duration = usec;
00981 }
00982 else
00983 {
00984
00985
00986 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
00987 {
00988
00989
00990 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n",
00991 agg->start_time, agg->cnt, agg->sum, agg->sum2,
00992 agg->min_duration, agg->max_duration);
00993
00994
00995 agg->start_time = agg->start_time + agg_interval;
00996
00997
00998 agg->cnt = 0;
00999 agg->min_duration = 0;
01000 agg->max_duration = 0;
01001 agg->sum = 0;
01002 agg->sum2 = 0;
01003 }
01004
01005
01006 agg->cnt = 1;
01007 agg->min_duration = usec;
01008 agg->max_duration = usec;
01009 agg->sum = usec;
01010 agg->sum2 = usec * usec;
01011 }
01012 }
01013 else
01014 {
01015
01016 #ifndef WIN32
01017
01018 fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
01019 st->id, st->cnt, usec, st->use_file,
01020 (long) now.tv_sec, (long) now.tv_usec);
01021 #else
01022
01023 fprintf(logfile, "%d %d %.0f %d 0 0\n",
01024 st->id, st->cnt, usec, st->use_file);
01025 #endif
01026 }
01027 }
01028 }
01029
01030 if (commands[st->state]->type == SQL_COMMAND)
01031 {
01032
01033
01034
01035
01036 res = PQgetResult(st->con);
01037 switch (PQresultStatus(res))
01038 {
01039 case PGRES_COMMAND_OK:
01040 case PGRES_TUPLES_OK:
01041 break;
01042 default:
01043 fprintf(stderr, "Client %d aborted in state %d: %s",
01044 st->id, st->state, PQerrorMessage(st->con));
01045 PQclear(res);
01046 return clientDone(st, false);
01047 }
01048 PQclear(res);
01049 discard_response(st);
01050 }
01051
01052 if (commands[st->state + 1] == NULL)
01053 {
01054 if (is_connect)
01055 {
01056 PQfinish(st->con);
01057 st->con = NULL;
01058 }
01059
01060 ++st->cnt;
01061 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
01062 return clientDone(st, true);
01063 }
01064
01065
01066 st->state++;
01067 if (commands[st->state] == NULL)
01068 {
01069 st->state = 0;
01070 st->use_file = (int) getrand(thread, 0, num_files - 1);
01071 commands = sql_files[st->use_file];
01072 }
01073 }
01074
01075 if (st->con == NULL)
01076 {
01077 instr_time start,
01078 end;
01079
01080 INSTR_TIME_SET_CURRENT(start);
01081 if ((st->con = doConnect()) == NULL)
01082 {
01083 fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
01084 return clientDone(st, false);
01085 }
01086 INSTR_TIME_SET_CURRENT(end);
01087 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
01088 }
01089
01090
01091 if (logfile && st->state == 0)
01092 INSTR_TIME_SET_CURRENT(st->txn_begin);
01093
01094
01095 if (is_latencies)
01096 INSTR_TIME_SET_CURRENT(st->stmt_begin);
01097
01098 if (commands[st->state]->type == SQL_COMMAND)
01099 {
01100 const Command *command = commands[st->state];
01101 int r;
01102
01103 if (querymode == QUERY_SIMPLE)
01104 {
01105 char *sql;
01106
01107 sql = pg_strdup(command->argv[0]);
01108 sql = assignVariables(st, sql);
01109
01110 if (debug)
01111 fprintf(stderr, "client %d sending %s\n", st->id, sql);
01112 r = PQsendQuery(st->con, sql);
01113 free(sql);
01114 }
01115 else if (querymode == QUERY_EXTENDED)
01116 {
01117 const char *sql = command->argv[0];
01118 const char *params[MAX_ARGS];
01119
01120 getQueryParams(st, command, params);
01121
01122 if (debug)
01123 fprintf(stderr, "client %d sending %s\n", st->id, sql);
01124 r = PQsendQueryParams(st->con, sql, command->argc - 1,
01125 NULL, params, NULL, NULL, 0);
01126 }
01127 else if (querymode == QUERY_PREPARED)
01128 {
01129 char name[MAX_PREPARE_NAME];
01130 const char *params[MAX_ARGS];
01131
01132 if (!st->prepared[st->use_file])
01133 {
01134 int j;
01135
01136 for (j = 0; commands[j] != NULL; j++)
01137 {
01138 PGresult *res;
01139 char name[MAX_PREPARE_NAME];
01140
01141 if (commands[j]->type != SQL_COMMAND)
01142 continue;
01143 preparedStatementName(name, st->use_file, j);
01144 res = PQprepare(st->con, name,
01145 commands[j]->argv[0], commands[j]->argc - 1, NULL);
01146 if (PQresultStatus(res) != PGRES_COMMAND_OK)
01147 fprintf(stderr, "%s", PQerrorMessage(st->con));
01148 PQclear(res);
01149 }
01150 st->prepared[st->use_file] = true;
01151 }
01152
01153 getQueryParams(st, command, params);
01154 preparedStatementName(name, st->use_file, st->state);
01155
01156 if (debug)
01157 fprintf(stderr, "client %d sending %s\n", st->id, name);
01158 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
01159 params, NULL, NULL, 0);
01160 }
01161 else
01162 r = 0;
01163
01164 if (r == 0)
01165 {
01166 if (debug)
01167 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
01168 st->ecnt++;
01169 }
01170 else
01171 st->listen = 1;
01172 }
01173 else if (commands[st->state]->type == META_COMMAND)
01174 {
01175 int argc = commands[st->state]->argc,
01176 i;
01177 char **argv = commands[st->state]->argv;
01178
01179 if (debug)
01180 {
01181 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
01182 for (i = 1; i < argc; i++)
01183 fprintf(stderr, " %s", argv[i]);
01184 fprintf(stderr, "\n");
01185 }
01186
01187 if (pg_strcasecmp(argv[0], "setrandom") == 0)
01188 {
01189 char *var;
01190 int64 min,
01191 max;
01192 char res[64];
01193
01194 if (*argv[2] == ':')
01195 {
01196 if ((var = getVariable(st, argv[2] + 1)) == NULL)
01197 {
01198 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
01199 st->ecnt++;
01200 return true;
01201 }
01202 min = strtoint64(var);
01203 }
01204 else
01205 min = strtoint64(argv[2]);
01206
01207 #ifdef NOT_USED
01208 if (min < 0)
01209 {
01210 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
01211 st->ecnt++;
01212 return;
01213 }
01214 #endif
01215
01216 if (*argv[3] == ':')
01217 {
01218 if ((var = getVariable(st, argv[3] + 1)) == NULL)
01219 {
01220 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
01221 st->ecnt++;
01222 return true;
01223 }
01224 max = strtoint64(var);
01225 }
01226 else
01227 max = strtoint64(argv[3]);
01228
01229 if (max < min)
01230 {
01231 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
01232 st->ecnt++;
01233 return true;
01234 }
01235
01236
01237
01238
01239
01240
01241
01242
01243 if (max - min < 0 || (max - min) + 1 < 0)
01244 {
01245 fprintf(stderr, "%s: range too large\n", argv[0]);
01246 st->ecnt++;
01247 return true;
01248 }
01249
01250 #ifdef DEBUG
01251 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
01252 #endif
01253 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
01254
01255 if (!putVariable(st, argv[0], argv[1], res))
01256 {
01257 st->ecnt++;
01258 return true;
01259 }
01260
01261 st->listen = 1;
01262 }
01263 else if (pg_strcasecmp(argv[0], "set") == 0)
01264 {
01265 char *var;
01266 int64 ope1,
01267 ope2;
01268 char res[64];
01269
01270 if (*argv[2] == ':')
01271 {
01272 if ((var = getVariable(st, argv[2] + 1)) == NULL)
01273 {
01274 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
01275 st->ecnt++;
01276 return true;
01277 }
01278 ope1 = strtoint64(var);
01279 }
01280 else
01281 ope1 = strtoint64(argv[2]);
01282
01283 if (argc < 5)
01284 snprintf(res, sizeof(res), INT64_FORMAT, ope1);
01285 else
01286 {
01287 if (*argv[4] == ':')
01288 {
01289 if ((var = getVariable(st, argv[4] + 1)) == NULL)
01290 {
01291 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
01292 st->ecnt++;
01293 return true;
01294 }
01295 ope2 = strtoint64(var);
01296 }
01297 else
01298 ope2 = strtoint64(argv[4]);
01299
01300 if (strcmp(argv[3], "+") == 0)
01301 snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
01302 else if (strcmp(argv[3], "-") == 0)
01303 snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
01304 else if (strcmp(argv[3], "*") == 0)
01305 snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
01306 else if (strcmp(argv[3], "/") == 0)
01307 {
01308 if (ope2 == 0)
01309 {
01310 fprintf(stderr, "%s: division by zero\n", argv[0]);
01311 st->ecnt++;
01312 return true;
01313 }
01314 snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
01315 }
01316 else
01317 {
01318 fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
01319 st->ecnt++;
01320 return true;
01321 }
01322 }
01323
01324 if (!putVariable(st, argv[0], argv[1], res))
01325 {
01326 st->ecnt++;
01327 return true;
01328 }
01329
01330 st->listen = 1;
01331 }
01332 else if (pg_strcasecmp(argv[0], "sleep") == 0)
01333 {
01334 char *var;
01335 int usec;
01336 instr_time now;
01337
01338 if (*argv[1] == ':')
01339 {
01340 if ((var = getVariable(st, argv[1] + 1)) == NULL)
01341 {
01342 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
01343 st->ecnt++;
01344 return true;
01345 }
01346 usec = atoi(var);
01347 }
01348 else
01349 usec = atoi(argv[1]);
01350
01351 if (argc > 2)
01352 {
01353 if (pg_strcasecmp(argv[2], "ms") == 0)
01354 usec *= 1000;
01355 else if (pg_strcasecmp(argv[2], "s") == 0)
01356 usec *= 1000000;
01357 }
01358 else
01359 usec *= 1000000;
01360
01361 INSTR_TIME_SET_CURRENT(now);
01362 st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
01363 st->sleeping = 1;
01364
01365 st->listen = 1;
01366 }
01367 else if (pg_strcasecmp(argv[0], "setshell") == 0)
01368 {
01369 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
01370
01371 if (timer_exceeded)
01372 return clientDone(st, true);
01373 else if (!ret)
01374 {
01375 st->ecnt++;
01376 return true;
01377 }
01378 else
01379 st->listen = 1;
01380 }
01381 else if (pg_strcasecmp(argv[0], "shell") == 0)
01382 {
01383 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
01384
01385 if (timer_exceeded)
01386 return clientDone(st, true);
01387 else if (!ret)
01388 {
01389 st->ecnt++;
01390 return true;
01391 }
01392 else
01393 st->listen = 1;
01394 }
01395 goto top;
01396 }
01397
01398 return true;
01399 }
01400
01401
01402 static void
01403 disconnect_all(CState *state, int length)
01404 {
01405 int i;
01406
01407 for (i = 0; i < length; i++)
01408 {
01409 if (state[i].con)
01410 {
01411 PQfinish(state[i].con);
01412 state[i].con = NULL;
01413 }
01414 }
01415 }
01416
01417
01418 static void
01419 init(bool is_no_vacuum)
01420 {
01421
01422
01423
01424
01425
01426
01427
01428 #define SCALE_32BIT_THRESHOLD 20000
01429
01430
01431
01432
01433
01434
01435
01436
01437
01438
01439 struct ddlinfo
01440 {
01441 char *table;
01442 char *cols;
01443 int declare_fillfactor;
01444 };
01445 struct ddlinfo DDLs[] = {
01446 {
01447 "pgbench_history",
01448 scale >= SCALE_32BIT_THRESHOLD
01449 ? "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)"
01450 : "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
01451 0
01452 },
01453 {
01454 "pgbench_tellers",
01455 "tid int not null,bid int,tbalance int,filler char(84)",
01456 1
01457 },
01458 {
01459 "pgbench_accounts",
01460 scale >= SCALE_32BIT_THRESHOLD
01461 ? "aid bigint not null,bid int,abalance int,filler char(84)"
01462 : "aid int not null,bid int,abalance int,filler char(84)",
01463 1
01464 },
01465 {
01466 "pgbench_branches",
01467 "bid int not null,bbalance int,filler char(88)",
01468 1
01469 }
01470 };
01471 static char *DDLAFTERs[] = {
01472 "alter table pgbench_branches add primary key (bid)",
01473 "alter table pgbench_tellers add primary key (tid)",
01474 "alter table pgbench_accounts add primary key (aid)"
01475 };
01476 static char *DDLKEYs[] = {
01477 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
01478 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
01479 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
01480 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
01481 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
01482 };
01483
01484 PGconn *con;
01485 PGresult *res;
01486 char sql[256];
01487 int i;
01488 int64 k;
01489
01490
01491 instr_time start, diff;
01492 double elapsed_sec, remaining_sec;
01493 int log_interval = 1;
01494
01495 if ((con = doConnect()) == NULL)
01496 exit(1);
01497
01498 for (i = 0; i < lengthof(DDLs); i++)
01499 {
01500 char opts[256];
01501 char buffer[256];
01502 struct ddlinfo *ddl = &DDLs[i];
01503
01504
01505 snprintf(buffer, 256, "drop table if exists %s", ddl->table);
01506 executeStatement(con, buffer);
01507
01508
01509 opts[0] = '\0';
01510 if (ddl->declare_fillfactor)
01511 snprintf(opts + strlen(opts), 256 - strlen(opts),
01512 " with (fillfactor=%d)", fillfactor);
01513 if (tablespace != NULL)
01514 {
01515 char *escape_tablespace;
01516
01517 escape_tablespace = PQescapeIdentifier(con, tablespace,
01518 strlen(tablespace));
01519 snprintf(opts + strlen(opts), 256 - strlen(opts),
01520 " tablespace %s", escape_tablespace);
01521 PQfreemem(escape_tablespace);
01522 }
01523 snprintf(buffer, 256, "create%s table %s(%s)%s",
01524 unlogged_tables ? " unlogged" : "",
01525 ddl->table, ddl->cols, opts);
01526
01527 executeStatement(con, buffer);
01528 }
01529
01530 executeStatement(con, "begin");
01531
01532 for (i = 0; i < nbranches * scale; i++)
01533 {
01534 snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
01535 executeStatement(con, sql);
01536 }
01537
01538 for (i = 0; i < ntellers * scale; i++)
01539 {
01540 snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
01541 i + 1, i / ntellers + 1);
01542 executeStatement(con, sql);
01543 }
01544
01545 executeStatement(con, "commit");
01546
01547
01548
01549
01550 fprintf(stderr, "creating tables...\n");
01551
01552 executeStatement(con, "begin");
01553 executeStatement(con, "truncate pgbench_accounts");
01554
01555 res = PQexec(con, "copy pgbench_accounts from stdin");
01556 if (PQresultStatus(res) != PGRES_COPY_IN)
01557 {
01558 fprintf(stderr, "%s", PQerrorMessage(con));
01559 exit(1);
01560 }
01561 PQclear(res);
01562
01563 INSTR_TIME_SET_CURRENT(start);
01564
01565 for (k = 0; k < (int64) naccounts * scale; k++)
01566 {
01567 int64 j = k + 1;
01568
01569 snprintf(sql, 256, INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n", j, k / naccounts + 1, 0);
01570 if (PQputline(con, sql))
01571 {
01572 fprintf(stderr, "PQputline failed\n");
01573 exit(1);
01574 }
01575
01576
01577
01578 if ((! use_quiet) && (j % 100000 == 0))
01579 {
01580 INSTR_TIME_SET_CURRENT(diff);
01581 INSTR_TIME_SUBTRACT(diff, start);
01582
01583 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
01584 remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
01585
01586 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
01587 j, (int64)naccounts * scale,
01588 (int) (((int64) j * 100) / (naccounts * scale)),
01589 elapsed_sec, remaining_sec);
01590 }
01591
01592 else if (use_quiet && (j % 100 == 0))
01593 {
01594 INSTR_TIME_SET_CURRENT(diff);
01595 INSTR_TIME_SUBTRACT(diff, start);
01596
01597 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
01598 remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
01599
01600
01601 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) {
01602
01603 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
01604 j, (int64)naccounts * scale,
01605 (int) (((int64) j * 100) / (naccounts * scale)), elapsed_sec, remaining_sec);
01606
01607
01608 log_interval = (int)ceil(elapsed_sec/LOG_STEP_SECONDS);
01609 }
01610 }
01611
01612 }
01613 if (PQputline(con, "\\.\n"))
01614 {
01615 fprintf(stderr, "very last PQputline failed\n");
01616 exit(1);
01617 }
01618 if (PQendcopy(con))
01619 {
01620 fprintf(stderr, "PQendcopy failed\n");
01621 exit(1);
01622 }
01623 executeStatement(con, "commit");
01624
01625
01626 if (!is_no_vacuum)
01627 {
01628 fprintf(stderr, "vacuum...\n");
01629 executeStatement(con, "vacuum analyze pgbench_branches");
01630 executeStatement(con, "vacuum analyze pgbench_tellers");
01631 executeStatement(con, "vacuum analyze pgbench_accounts");
01632 executeStatement(con, "vacuum analyze pgbench_history");
01633 }
01634
01635
01636
01637
01638 fprintf(stderr, "set primary keys...\n");
01639 for (i = 0; i < lengthof(DDLAFTERs); i++)
01640 {
01641 char buffer[256];
01642
01643 strncpy(buffer, DDLAFTERs[i], 256);
01644
01645 if (index_tablespace != NULL)
01646 {
01647 char *escape_tablespace;
01648
01649 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
01650 strlen(index_tablespace));
01651 snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
01652 " using index tablespace %s", escape_tablespace);
01653 PQfreemem(escape_tablespace);
01654 }
01655
01656 executeStatement(con, buffer);
01657 }
01658
01659
01660
01661
01662 if (foreign_keys)
01663 {
01664 fprintf(stderr, "set foreign keys...\n");
01665 for (i = 0; i < lengthof(DDLKEYs); i++)
01666 {
01667 executeStatement(con, DDLKEYs[i]);
01668 }
01669 }
01670
01671
01672 fprintf(stderr, "done.\n");
01673 PQfinish(con);
01674 }
01675
01676
01677
01678
01679 static bool
01680 parseQuery(Command *cmd, const char *raw_sql)
01681 {
01682 char *sql,
01683 *p;
01684
01685 sql = pg_strdup(raw_sql);
01686 cmd->argc = 1;
01687
01688 p = sql;
01689 while ((p = strchr(p, ':')) != NULL)
01690 {
01691 char var[12];
01692 char *name;
01693 int eaten;
01694
01695 name = parseVariable(p, &eaten);
01696 if (name == NULL)
01697 {
01698 while (*p == ':')
01699 {
01700 p++;
01701 }
01702 continue;
01703 }
01704
01705 if (cmd->argc >= MAX_ARGS)
01706 {
01707 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
01708 return false;
01709 }
01710
01711 sprintf(var, "$%d", cmd->argc);
01712 p = replaceVariable(&sql, p, eaten, var);
01713
01714 cmd->argv[cmd->argc] = name;
01715 cmd->argc++;
01716 }
01717
01718 cmd->argv[0] = sql;
01719 return true;
01720 }
01721
01722
01723 static Command *
01724 process_commands(char *buf)
01725 {
01726 const char delim[] = " \f\n\r\t\v";
01727
01728 Command *my_commands;
01729 int j;
01730 char *p,
01731 *tok;
01732
01733
01734 if ((p = strchr(buf, '\n')) != NULL)
01735 *p = '\0';
01736
01737
01738 p = buf;
01739 while (isspace((unsigned char) *p))
01740 p++;
01741
01742
01743 if (*p == '\0' || strncmp(p, "--", 2) == 0)
01744 return NULL;
01745
01746
01747 my_commands = (Command *) pg_malloc(sizeof(Command));
01748 my_commands->line = pg_strdup(buf);
01749 my_commands->command_num = num_commands++;
01750 my_commands->type = 0;
01751 my_commands->argc = 0;
01752
01753 if (*p == '\\')
01754 {
01755 my_commands->type = META_COMMAND;
01756
01757 j = 0;
01758 tok = strtok(++p, delim);
01759
01760 while (tok != NULL)
01761 {
01762 my_commands->argv[j++] = pg_strdup(tok);
01763 my_commands->argc++;
01764 tok = strtok(NULL, delim);
01765 }
01766
01767 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
01768 {
01769 if (my_commands->argc < 4)
01770 {
01771 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
01772 exit(1);
01773 }
01774
01775 for (j = 4; j < my_commands->argc; j++)
01776 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
01777 my_commands->argv[0], my_commands->argv[j]);
01778 }
01779 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
01780 {
01781 if (my_commands->argc < 3)
01782 {
01783 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
01784 exit(1);
01785 }
01786
01787 for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
01788 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
01789 my_commands->argv[0], my_commands->argv[j]);
01790 }
01791 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
01792 {
01793 if (my_commands->argc < 2)
01794 {
01795 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
01796 exit(1);
01797 }
01798
01799
01800
01801
01802
01803
01804
01805 if (my_commands->argv[1][0] != ':')
01806 {
01807 char *c = my_commands->argv[1];
01808
01809 while (isdigit((unsigned char) *c))
01810 c++;
01811 if (*c)
01812 {
01813 my_commands->argv[2] = c;
01814 if (my_commands->argc < 3)
01815 my_commands->argc = 3;
01816 }
01817 }
01818
01819 if (my_commands->argc >= 3)
01820 {
01821 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
01822 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
01823 pg_strcasecmp(my_commands->argv[2], "s") != 0)
01824 {
01825 fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
01826 my_commands->argv[0], my_commands->argv[2]);
01827 exit(1);
01828 }
01829 }
01830
01831 for (j = 3; j < my_commands->argc; j++)
01832 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
01833 my_commands->argv[0], my_commands->argv[j]);
01834 }
01835 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
01836 {
01837 if (my_commands->argc < 3)
01838 {
01839 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
01840 exit(1);
01841 }
01842 }
01843 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
01844 {
01845 if (my_commands->argc < 1)
01846 {
01847 fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
01848 exit(1);
01849 }
01850 }
01851 else
01852 {
01853 fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
01854 exit(1);
01855 }
01856 }
01857 else
01858 {
01859 my_commands->type = SQL_COMMAND;
01860
01861 switch (querymode)
01862 {
01863 case QUERY_SIMPLE:
01864 my_commands->argv[0] = pg_strdup(p);
01865 my_commands->argc++;
01866 break;
01867 case QUERY_EXTENDED:
01868 case QUERY_PREPARED:
01869 if (!parseQuery(my_commands, p))
01870 exit(1);
01871 break;
01872 default:
01873 exit(1);
01874 }
01875 }
01876
01877 return my_commands;
01878 }
01879
01880 static int
01881 process_file(char *filename)
01882 {
01883 #define COMMANDS_ALLOC_NUM 128
01884
01885 Command **my_commands;
01886 FILE *fd;
01887 int lineno;
01888 char buf[BUFSIZ];
01889 int alloc_num;
01890
01891 if (num_files >= MAX_FILES)
01892 {
01893 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
01894 exit(1);
01895 }
01896
01897 alloc_num = COMMANDS_ALLOC_NUM;
01898 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
01899
01900 if (strcmp(filename, "-") == 0)
01901 fd = stdin;
01902 else if ((fd = fopen(filename, "r")) == NULL)
01903 {
01904 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
01905 return false;
01906 }
01907
01908 lineno = 0;
01909
01910 while (fgets(buf, sizeof(buf), fd) != NULL)
01911 {
01912 Command *command;
01913
01914 command = process_commands(buf);
01915 if (command == NULL)
01916 continue;
01917
01918 my_commands[lineno] = command;
01919 lineno++;
01920
01921 if (lineno >= alloc_num)
01922 {
01923 alloc_num += COMMANDS_ALLOC_NUM;
01924 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
01925 }
01926 }
01927 fclose(fd);
01928
01929 my_commands[lineno] = NULL;
01930
01931 sql_files[num_files++] = my_commands;
01932
01933 return true;
01934 }
01935
01936 static Command **
01937 process_builtin(char *tb)
01938 {
01939 #define COMMANDS_ALLOC_NUM 128
01940
01941 Command **my_commands;
01942 int lineno;
01943 char buf[BUFSIZ];
01944 int alloc_num;
01945
01946 alloc_num = COMMANDS_ALLOC_NUM;
01947 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
01948
01949 lineno = 0;
01950
01951 for (;;)
01952 {
01953 char *p;
01954 Command *command;
01955
01956 p = buf;
01957 while (*tb && *tb != '\n')
01958 *p++ = *tb++;
01959
01960 if (*tb == '\0')
01961 break;
01962
01963 if (*tb == '\n')
01964 tb++;
01965
01966 *p = '\0';
01967
01968 command = process_commands(buf);
01969 if (command == NULL)
01970 continue;
01971
01972 my_commands[lineno] = command;
01973 lineno++;
01974
01975 if (lineno >= alloc_num)
01976 {
01977 alloc_num += COMMANDS_ALLOC_NUM;
01978 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
01979 }
01980 }
01981
01982 my_commands[lineno] = NULL;
01983
01984 return my_commands;
01985 }
01986
01987
01988 static void
01989 printResults(int ttype, int normal_xacts, int nclients,
01990 TState *threads, int nthreads,
01991 instr_time total_time, instr_time conn_total_time)
01992 {
01993 double time_include,
01994 tps_include,
01995 tps_exclude;
01996 char *s;
01997
01998 time_include = INSTR_TIME_GET_DOUBLE(total_time);
01999 tps_include = normal_xacts / time_include;
02000 tps_exclude = normal_xacts / (time_include -
02001 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
02002
02003 if (ttype == 0)
02004 s = "TPC-B (sort of)";
02005 else if (ttype == 2)
02006 s = "Update only pgbench_accounts";
02007 else if (ttype == 1)
02008 s = "SELECT only";
02009 else
02010 s = "Custom query";
02011
02012 printf("transaction type: %s\n", s);
02013 printf("scaling factor: %d\n", scale);
02014 printf("query mode: %s\n", QUERYMODE[querymode]);
02015 printf("number of clients: %d\n", nclients);
02016 printf("number of threads: %d\n", nthreads);
02017 if (duration <= 0)
02018 {
02019 printf("number of transactions per client: %d\n", nxacts);
02020 printf("number of transactions actually processed: %d/%d\n",
02021 normal_xacts, nxacts * nclients);
02022 }
02023 else
02024 {
02025 printf("duration: %d s\n", duration);
02026 printf("number of transactions actually processed: %d\n",
02027 normal_xacts);
02028 }
02029 printf("tps = %f (including connections establishing)\n", tps_include);
02030 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
02031
02032
02033 if (is_latencies)
02034 {
02035 int i;
02036
02037 for (i = 0; i < num_files; i++)
02038 {
02039 Command **commands;
02040
02041 if (num_files > 1)
02042 printf("statement latencies in milliseconds, file %d:\n", i + 1);
02043 else
02044 printf("statement latencies in milliseconds:\n");
02045
02046 for (commands = sql_files[i]; *commands != NULL; commands++)
02047 {
02048 Command *command = *commands;
02049 int cnum = command->command_num;
02050 double total_time;
02051 instr_time total_exec_elapsed;
02052 int total_exec_count;
02053 int t;
02054
02055
02056 INSTR_TIME_SET_ZERO(total_exec_elapsed);
02057 total_exec_count = 0;
02058 for (t = 0; t < nthreads; t++)
02059 {
02060 TState *thread = &threads[t];
02061
02062 INSTR_TIME_ADD(total_exec_elapsed,
02063 thread->exec_elapsed[cnum]);
02064 total_exec_count += thread->exec_count[cnum];
02065 }
02066
02067 if (total_exec_count > 0)
02068 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
02069 else
02070 total_time = 0.0;
02071
02072 printf("\t%f\t%s\n", total_time, command->line);
02073 }
02074 }
02075 }
02076 }
02077
02078
02079 int
02080 main(int argc, char **argv)
02081 {
02082 static struct option long_options[] = {
02083 {"foreign-keys", no_argument, &foreign_keys, 1},
02084 {"index-tablespace", required_argument, NULL, 3},
02085 {"tablespace", required_argument, NULL, 2},
02086 {"unlogged-tables", no_argument, &unlogged_tables, 1},
02087 {"sampling-rate", required_argument, NULL, 4},
02088 {"aggregate-interval", required_argument, NULL, 5},
02089 {NULL, 0, NULL, 0}
02090 };
02091
02092 int c;
02093 int nclients = 1;
02094 int nthreads = 1;
02095 int is_init_mode = 0;
02096 int is_no_vacuum = 0;
02097 int do_vacuum_accounts = 0;
02098 int ttype = 0;
02099
02100 int optindex;
02101 char *filename = NULL;
02102 bool scale_given = false;
02103
02104 CState *state;
02105 TState *threads;
02106
02107 instr_time start_time;
02108 instr_time total_time;
02109 instr_time conn_total_time;
02110 int total_xacts;
02111
02112 int i;
02113
02114 #ifdef HAVE_GETRLIMIT
02115 struct rlimit rlim;
02116 #endif
02117
02118 PGconn *con;
02119 PGresult *res;
02120 char *env;
02121
02122 char val[64];
02123
02124 progname = get_progname(argv[0]);
02125
02126 if (argc > 1)
02127 {
02128 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
02129 {
02130 usage();
02131 exit(0);
02132 }
02133 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
02134 {
02135 puts("pgbench (PostgreSQL) " PG_VERSION);
02136 exit(0);
02137 }
02138 }
02139
02140 #ifdef WIN32
02141
02142 setvbuf(stderr, NULL, _IONBF, 0);
02143 #endif
02144
02145 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
02146 pghost = env;
02147 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
02148 pgport = env;
02149 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
02150 login = env;
02151
02152 state = (CState *) pg_malloc(sizeof(CState));
02153 memset(state, 0, sizeof(CState));
02154
02155 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
02156 {
02157 switch (c)
02158 {
02159 case 'i':
02160 is_init_mode++;
02161 break;
02162 case 'h':
02163 pghost = pg_strdup(optarg);
02164 break;
02165 case 'n':
02166 is_no_vacuum++;
02167 break;
02168 case 'v':
02169 do_vacuum_accounts++;
02170 break;
02171 case 'p':
02172 pgport = pg_strdup(optarg);
02173 break;
02174 case 'd':
02175 debug++;
02176 break;
02177 case 'S':
02178 ttype = 1;
02179 break;
02180 case 'N':
02181 ttype = 2;
02182 break;
02183 case 'c':
02184 nclients = atoi(optarg);
02185 if (nclients <= 0 || nclients > MAXCLIENTS)
02186 {
02187 fprintf(stderr, "invalid number of clients: %d\n", nclients);
02188 exit(1);
02189 }
02190 #ifdef HAVE_GETRLIMIT
02191 #ifdef RLIMIT_NOFILE
02192 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
02193 #else
02194 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
02195 #endif
02196 {
02197 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
02198 exit(1);
02199 }
02200 if (rlim.rlim_cur <= (nclients + 2))
02201 {
02202 fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
02203 fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
02204 exit(1);
02205 }
02206 #endif
02207 break;
02208 case 'j':
02209 nthreads = atoi(optarg);
02210 if (nthreads <= 0)
02211 {
02212 fprintf(stderr, "invalid number of threads: %d\n", nthreads);
02213 exit(1);
02214 }
02215 break;
02216 case 'C':
02217 is_connect = true;
02218 break;
02219 case 'r':
02220 is_latencies = true;
02221 break;
02222 case 's':
02223 scale_given = true;
02224 scale = atoi(optarg);
02225 if (scale <= 0)
02226 {
02227 fprintf(stderr, "invalid scaling factor: %d\n", scale);
02228 exit(1);
02229 }
02230 break;
02231 case 't':
02232 if (duration > 0)
02233 {
02234 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
02235 exit(1);
02236 }
02237 nxacts = atoi(optarg);
02238 if (nxacts <= 0)
02239 {
02240 fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
02241 exit(1);
02242 }
02243 break;
02244 case 'T':
02245 if (nxacts > 0)
02246 {
02247 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
02248 exit(1);
02249 }
02250 duration = atoi(optarg);
02251 if (duration <= 0)
02252 {
02253 fprintf(stderr, "invalid duration: %d\n", duration);
02254 exit(1);
02255 }
02256 break;
02257 case 'U':
02258 login = pg_strdup(optarg);
02259 break;
02260 case 'l':
02261 use_log = true;
02262 break;
02263 case 'q':
02264 use_quiet = true;
02265 break;
02266 case 'f':
02267 ttype = 3;
02268 filename = pg_strdup(optarg);
02269 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
02270 exit(1);
02271 break;
02272 case 'D':
02273 {
02274 char *p;
02275
02276 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
02277 {
02278 fprintf(stderr, "invalid variable definition: %s\n", optarg);
02279 exit(1);
02280 }
02281
02282 *p++ = '\0';
02283 if (!putVariable(&state[0], "option", optarg, p))
02284 exit(1);
02285 }
02286 break;
02287 case 'F':
02288 fillfactor = atoi(optarg);
02289 if ((fillfactor < 10) || (fillfactor > 100))
02290 {
02291 fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
02292 exit(1);
02293 }
02294 break;
02295 case 'M':
02296 if (num_files > 0)
02297 {
02298 fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
02299 exit(1);
02300 }
02301 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
02302 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
02303 break;
02304 if (querymode >= NUM_QUERYMODE)
02305 {
02306 fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
02307 exit(1);
02308 }
02309 break;
02310 case 0:
02311
02312 break;
02313 case 2:
02314 tablespace = pg_strdup(optarg);
02315 break;
02316 case 3:
02317 index_tablespace = pg_strdup(optarg);
02318 break;
02319 case 4:
02320 sample_rate = atof(optarg);
02321 if (sample_rate <= 0.0 || sample_rate > 1.0)
02322 {
02323 fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
02324 exit(1);
02325 }
02326 break;
02327 case 5:
02328 #ifdef WIN32
02329 fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
02330 exit(1);
02331 #else
02332 agg_interval = atoi(optarg);
02333 if (agg_interval <= 0)
02334 {
02335 fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
02336 exit(1);
02337 }
02338 #endif
02339 break;
02340 default:
02341 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
02342 exit(1);
02343 break;
02344 }
02345 }
02346
02347 if (argc > optind)
02348 dbName = argv[optind];
02349 else
02350 {
02351 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
02352 dbName = env;
02353 else if (login != NULL && *login != '\0')
02354 dbName = login;
02355 else
02356 dbName = "";
02357 }
02358
02359 if (is_init_mode)
02360 {
02361 init(is_no_vacuum);
02362 exit(0);
02363 }
02364
02365
02366 if (nxacts <= 0 && duration <= 0)
02367 nxacts = DEFAULT_NXACTS;
02368
02369 if (nclients % nthreads != 0)
02370 {
02371 fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
02372 exit(1);
02373 }
02374
02375
02376 if (sample_rate > 0.0 && !use_log)
02377 {
02378 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
02379 exit(1);
02380 }
02381
02382
02383 if (use_quiet && !is_init_mode)
02384 {
02385 fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
02386 exit(1);
02387 }
02388
02389
02390 if (sample_rate > 0.0 && agg_interval > 0)
02391 {
02392 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
02393 exit(1);
02394 }
02395
02396 if (agg_interval > 0 && (! use_log)) {
02397 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
02398 exit(1);
02399 }
02400
02401 if ((duration > 0) && (agg_interval > duration)) {
02402 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
02403 exit(1);
02404 }
02405
02406 if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0)) {
02407 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
02408 exit(1);
02409 }
02410
02411
02412
02413
02414
02415
02416
02417
02418
02419 #ifndef ENABLE_THREAD_SAFETY
02420 if (is_latencies && nthreads > 1)
02421 {
02422 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
02423 exit(1);
02424 }
02425 #endif
02426
02427
02428
02429
02430
02431 main_pid = (int) getpid();
02432
02433 if (nclients > 1)
02434 {
02435 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
02436 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
02437
02438
02439 for (i = 1; i < nclients; i++)
02440 {
02441 int j;
02442
02443 state[i].id = i;
02444 for (j = 0; j < state[0].nvariables; j++)
02445 {
02446 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
02447 exit(1);
02448 }
02449 }
02450 }
02451
02452 if (debug)
02453 {
02454 if (duration <= 0)
02455 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
02456 pghost, pgport, nclients, nxacts, dbName);
02457 else
02458 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
02459 pghost, pgport, nclients, duration, dbName);
02460 }
02461
02462
02463 con = doConnect();
02464 if (con == NULL)
02465 exit(1);
02466
02467 if (PQstatus(con) == CONNECTION_BAD)
02468 {
02469 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
02470 fprintf(stderr, "%s", PQerrorMessage(con));
02471 exit(1);
02472 }
02473
02474 if (ttype != 3)
02475 {
02476
02477
02478
02479
02480 res = PQexec(con, "select count(*) from pgbench_branches");
02481 if (PQresultStatus(res) != PGRES_TUPLES_OK)
02482 {
02483 fprintf(stderr, "%s", PQerrorMessage(con));
02484 exit(1);
02485 }
02486 scale = atoi(PQgetvalue(res, 0, 0));
02487 if (scale < 0)
02488 {
02489 fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
02490 exit(1);
02491 }
02492 PQclear(res);
02493
02494
02495 if (scale_given)
02496 fprintf(stderr,
02497 "Scale option ignored, using pgbench_branches table count = %d\n",
02498 scale);
02499 }
02500
02501
02502
02503
02504
02505 if (getVariable(&state[0], "scale") == NULL)
02506 {
02507 snprintf(val, sizeof(val), "%d", scale);
02508 for (i = 0; i < nclients; i++)
02509 {
02510 if (!putVariable(&state[i], "startup", "scale", val))
02511 exit(1);
02512 }
02513 }
02514
02515 if (!is_no_vacuum)
02516 {
02517 fprintf(stderr, "starting vacuum...");
02518 executeStatement(con, "vacuum pgbench_branches");
02519 executeStatement(con, "vacuum pgbench_tellers");
02520 executeStatement(con, "truncate pgbench_history");
02521 fprintf(stderr, "end.\n");
02522
02523 if (do_vacuum_accounts)
02524 {
02525 fprintf(stderr, "starting vacuum pgbench_accounts...");
02526 executeStatement(con, "vacuum analyze pgbench_accounts");
02527 fprintf(stderr, "end.\n");
02528 }
02529 }
02530 PQfinish(con);
02531
02532
02533 INSTR_TIME_SET_CURRENT(start_time);
02534 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
02535
02536
02537 switch (ttype)
02538 {
02539 case 0:
02540 sql_files[0] = process_builtin(tpc_b);
02541 num_files = 1;
02542 break;
02543
02544 case 1:
02545 sql_files[0] = process_builtin(select_only);
02546 num_files = 1;
02547 break;
02548
02549 case 2:
02550 sql_files[0] = process_builtin(simple_update);
02551 num_files = 1;
02552 break;
02553
02554 default:
02555 break;
02556 }
02557
02558
02559 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
02560 for (i = 0; i < nthreads; i++)
02561 {
02562 TState *thread = &threads[i];
02563
02564 thread->tid = i;
02565 thread->state = &state[nclients / nthreads * i];
02566 thread->nstate = nclients / nthreads;
02567 thread->random_state[0] = random();
02568 thread->random_state[1] = random();
02569 thread->random_state[2] = random();
02570
02571 if (is_latencies)
02572 {
02573
02574 int t;
02575
02576 thread->exec_elapsed = (instr_time *)
02577 pg_malloc(sizeof(instr_time) * num_commands);
02578 thread->exec_count = (int *)
02579 pg_malloc(sizeof(int) * num_commands);
02580
02581 for (t = 0; t < num_commands; t++)
02582 {
02583 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
02584 thread->exec_count[t] = 0;
02585 }
02586 }
02587 else
02588 {
02589 thread->exec_elapsed = NULL;
02590 thread->exec_count = NULL;
02591 }
02592 }
02593
02594
02595 INSTR_TIME_SET_CURRENT(start_time);
02596
02597
02598 if (duration > 0)
02599 setalarm(duration);
02600
02601
02602 for (i = 0; i < nthreads; i++)
02603 {
02604 TState *thread = &threads[i];
02605
02606 INSTR_TIME_SET_CURRENT(thread->start_time);
02607
02608
02609 if (i > 0)
02610 {
02611 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
02612
02613 if (err != 0 || thread->thread == INVALID_THREAD)
02614 {
02615 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
02616 exit(1);
02617 }
02618 }
02619 else
02620 {
02621 thread->thread = INVALID_THREAD;
02622 }
02623 }
02624
02625
02626 total_xacts = 0;
02627 INSTR_TIME_SET_ZERO(conn_total_time);
02628 for (i = 0; i < nthreads; i++)
02629 {
02630 void *ret = NULL;
02631
02632 if (threads[i].thread == INVALID_THREAD)
02633 ret = threadRun(&threads[i]);
02634 else
02635 pthread_join(threads[i].thread, &ret);
02636
02637 if (ret != NULL)
02638 {
02639 TResult *r = (TResult *) ret;
02640
02641 total_xacts += r->xacts;
02642 INSTR_TIME_ADD(conn_total_time, r->conn_time);
02643 free(ret);
02644 }
02645 }
02646 disconnect_all(state, nclients);
02647
02648
02649 INSTR_TIME_SET_CURRENT(total_time);
02650 INSTR_TIME_SUBTRACT(total_time, start_time);
02651 printResults(ttype, total_xacts, nclients, threads, nthreads,
02652 total_time, conn_total_time);
02653
02654 return 0;
02655 }
02656
02657 static void *
02658 threadRun(void *arg)
02659 {
02660 TState *thread = (TState *) arg;
02661 CState *state = thread->state;
02662 TResult *result;
02663 FILE *logfile = NULL;
02664 instr_time start,
02665 end;
02666 int nstate = thread->nstate;
02667 int remains = nstate;
02668 int i;
02669
02670 AggVals aggs;
02671
02672 result = pg_malloc(sizeof(TResult));
02673
02674 INSTR_TIME_SET_ZERO(result->conn_time);
02675
02676
02677 if (use_log)
02678 {
02679 char logpath[64];
02680
02681 if (thread->tid == 0)
02682 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
02683 else
02684 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
02685 logfile = fopen(logpath, "w");
02686
02687 if (logfile == NULL)
02688 {
02689 fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
02690 goto done;
02691 }
02692 }
02693
02694 if (!is_connect)
02695 {
02696
02697 for (i = 0; i < nstate; i++)
02698 {
02699 if ((state[i].con = doConnect()) == NULL)
02700 goto done;
02701 }
02702 }
02703
02704
02705 INSTR_TIME_SET_CURRENT(result->conn_time);
02706 INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
02707
02708 agg_vals_init(&aggs, thread->start_time);
02709
02710
02711 for (i = 0; i < nstate; i++)
02712 {
02713 CState *st = &state[i];
02714 Command **commands = sql_files[st->use_file];
02715 int prev_ecnt = st->ecnt;
02716
02717 st->use_file = getrand(thread, 0, num_files - 1);
02718 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
02719 remains--;
02720
02721 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
02722 {
02723 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
02724 remains--;
02725 PQfinish(st->con);
02726 st->con = NULL;
02727 }
02728 }
02729
02730 while (remains > 0)
02731 {
02732 fd_set input_mask;
02733 int maxsock;
02734 int64 now_usec = 0;
02735 int64 min_usec;
02736
02737 FD_ZERO(&input_mask);
02738
02739 maxsock = -1;
02740 min_usec = INT64_MAX;
02741 for (i = 0; i < nstate; i++)
02742 {
02743 CState *st = &state[i];
02744 Command **commands = sql_files[st->use_file];
02745 int sock;
02746
02747 if (st->sleeping)
02748 {
02749 int this_usec;
02750
02751 if (min_usec == INT64_MAX)
02752 {
02753 instr_time now;
02754
02755 INSTR_TIME_SET_CURRENT(now);
02756 now_usec = INSTR_TIME_GET_MICROSEC(now);
02757 }
02758
02759 this_usec = st->until - now_usec;
02760 if (min_usec > this_usec)
02761 min_usec = this_usec;
02762 }
02763 else if (st->con == NULL)
02764 {
02765 continue;
02766 }
02767 else if (commands[st->state]->type == META_COMMAND)
02768 {
02769 min_usec = 0;
02770 break;
02771 }
02772
02773 sock = PQsocket(st->con);
02774 if (sock < 0)
02775 {
02776 fprintf(stderr, "bad socket: %s\n", strerror(errno));
02777 goto done;
02778 }
02779
02780 FD_SET(sock, &input_mask);
02781
02782 if (maxsock < sock)
02783 maxsock = sock;
02784 }
02785
02786 if (min_usec > 0 && maxsock != -1)
02787 {
02788 int nsocks;
02789
02790 if (min_usec != INT64_MAX)
02791 {
02792 struct timeval timeout;
02793
02794 timeout.tv_sec = min_usec / 1000000;
02795 timeout.tv_usec = min_usec % 1000000;
02796 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
02797 }
02798 else
02799 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
02800 if (nsocks < 0)
02801 {
02802 if (errno == EINTR)
02803 continue;
02804
02805 fprintf(stderr, "select failed: %s\n", strerror(errno));
02806 goto done;
02807 }
02808 }
02809
02810
02811 for (i = 0; i < nstate; i++)
02812 {
02813 CState *st = &state[i];
02814 Command **commands = sql_files[st->use_file];
02815 int prev_ecnt = st->ecnt;
02816
02817 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
02818 || commands[st->state]->type == META_COMMAND))
02819 {
02820 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
02821 remains--;
02822 }
02823
02824 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
02825 {
02826 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
02827 remains--;
02828 PQfinish(st->con);
02829 st->con = NULL;
02830 }
02831 }
02832 }
02833
02834 done:
02835 INSTR_TIME_SET_CURRENT(start);
02836 disconnect_all(state, nstate);
02837 result->xacts = 0;
02838 for (i = 0; i < nstate; i++)
02839 result->xacts += state[i].cnt;
02840 INSTR_TIME_SET_CURRENT(end);
02841 INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
02842 if (logfile)
02843 fclose(logfile);
02844 return result;
02845 }
02846
02847
02848
02849
02850
02851 #ifndef WIN32
02852
02853 static void
02854 handle_sig_alarm(SIGNAL_ARGS)
02855 {
02856 timer_exceeded = true;
02857 }
02858
02859 static void
02860 setalarm(int seconds)
02861 {
02862 pqsignal(SIGALRM, handle_sig_alarm);
02863 alarm(seconds);
02864 }
02865
02866 #ifndef ENABLE_THREAD_SAFETY
02867
02868
02869
02870
02871
02872 typedef struct fork_pthread
02873 {
02874 pid_t pid;
02875 int pipes[2];
02876 } fork_pthread;
02877
02878 static int
02879 pthread_create(pthread_t *thread,
02880 pthread_attr_t *attr,
02881 void *(*start_routine) (void *),
02882 void *arg)
02883 {
02884 fork_pthread *th;
02885 void *ret;
02886
02887 th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
02888 if (pipe(th->pipes) < 0)
02889 {
02890 free(th);
02891 return errno;
02892 }
02893
02894 th->pid = fork();
02895 if (th->pid == -1)
02896 {
02897 free(th);
02898 return errno;
02899 }
02900 if (th->pid != 0)
02901 {
02902 close(th->pipes[1]);
02903 *thread = th;
02904 return 0;
02905 }
02906
02907
02908 close(th->pipes[0]);
02909
02910
02911 if (duration > 0)
02912 setalarm(duration);
02913
02914 ret = start_routine(arg);
02915 write(th->pipes[1], ret, sizeof(TResult));
02916 close(th->pipes[1]);
02917 free(th);
02918 exit(0);
02919 }
02920
02921 static int
02922 pthread_join(pthread_t th, void **thread_return)
02923 {
02924 int status;
02925
02926 while (waitpid(th->pid, &status, 0) != th->pid)
02927 {
02928 if (errno != EINTR)
02929 return errno;
02930 }
02931
02932 if (thread_return != NULL)
02933 {
02934
02935 *thread_return = pg_malloc(sizeof(TResult));
02936 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
02937 {
02938 free(*thread_return);
02939 *thread_return = NULL;
02940 }
02941 }
02942 close(th->pipes[0]);
02943
02944 free(th);
02945 return 0;
02946 }
02947 #endif
02948 #else
02949
02950 static VOID CALLBACK
02951 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
02952 {
02953 timer_exceeded = true;
02954 }
02955
02956 static void
02957 setalarm(int seconds)
02958 {
02959 HANDLE queue;
02960 HANDLE timer;
02961
02962
02963 queue = CreateTimerQueue();
02964 if (seconds > ((DWORD) -1) / 1000 ||
02965 !CreateTimerQueueTimer(&timer, queue,
02966 win32_timer_callback, NULL, seconds * 1000, 0,
02967 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
02968 {
02969 fprintf(stderr, "Failed to set timer\n");
02970 exit(1);
02971 }
02972 }
02973
02974
02975
02976 typedef struct win32_pthread
02977 {
02978 HANDLE handle;
02979 void *(*routine) (void *);
02980 void *arg;
02981 void *result;
02982 } win32_pthread;
02983
02984 static unsigned __stdcall
02985 win32_pthread_run(void *arg)
02986 {
02987 win32_pthread *th = (win32_pthread *) arg;
02988
02989 th->result = th->routine(th->arg);
02990
02991 return 0;
02992 }
02993
02994 static int
02995 pthread_create(pthread_t *thread,
02996 pthread_attr_t *attr,
02997 void *(*start_routine) (void *),
02998 void *arg)
02999 {
03000 int save_errno;
03001 win32_pthread *th;
03002
03003 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
03004 th->routine = start_routine;
03005 th->arg = arg;
03006 th->result = NULL;
03007
03008 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
03009 if (th->handle == NULL)
03010 {
03011 save_errno = errno;
03012 free(th);
03013 return save_errno;
03014 }
03015
03016 *thread = th;
03017 return 0;
03018 }
03019
03020 static int
03021 pthread_join(pthread_t th, void **thread_return)
03022 {
03023 if (th == NULL || th->handle == NULL)
03024 return errno = EINVAL;
03025
03026 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
03027 {
03028 _dosmaperr(GetLastError());
03029 return errno;
03030 }
03031
03032 if (thread_return)
03033 *thread_return = th->result;
03034
03035 CloseHandle(th->handle);
03036 free(th);
03037 return 0;
03038 }
03039
03040 #endif