Header And Logo

PostgreSQL
| The world's most advanced open source database.

isolationtester.c

Go to the documentation of this file.
00001 /*
00002  * src/test/isolation/isolationtester.c
00003  *
00004  * isolationtester.c
00005  *      Runs an isolation test specified by a spec file.
00006  */
00007 
00008 #include "postgres_fe.h"
00009 
00010 #ifdef WIN32
00011 #include <windows.h>
00012 #endif
00013 
00014 #ifndef WIN32
00015 #include <sys/time.h>
00016 #include <unistd.h>
00017 
00018 #ifdef HAVE_GETOPT_H
00019 #include <getopt.h>
00020 #endif
00021 #else
00022 int         getopt(int argc, char *const argv[], const char *optstring);
00023 #endif   /* ! WIN32 */
00024 
00025 #ifdef HAVE_SYS_SELECT_H
00026 #include <sys/select.h>
00027 #endif
00028 
00029 #include "libpq-fe.h"
00030 #include "pqexpbuffer.h"
00031 
00032 #include "isolationtester.h"
00033 
00034 extern int  optind;
00035 
00036 #define PREP_WAITING "isolationtester_waiting"
00037 
00038 /*
00039  * conns[0] is the global setup, teardown, and watchdog connection.  Additional
00040  * connections represent spec-defined sessions.
00041  */
00042 static PGconn **conns = NULL;
00043 static const char **backend_pids = NULL;
00044 static int  nconns = 0;
00045 
00046 /* In dry run only output permutations to be run by the tester. */
00047 static int  dry_run = false;
00048 
00049 static void run_testspec(TestSpec * testspec);
00050 static void run_all_permutations(TestSpec * testspec);
00051 static void run_all_permutations_recurse(TestSpec * testspec, int nsteps,
00052                              Step ** steps);
00053 static void run_named_permutations(TestSpec * testspec);
00054 static void run_permutation(TestSpec * testspec, int nsteps, Step ** steps);
00055 
00056 #define STEP_NONBLOCK   0x1     /* return 0 as soon as cmd waits for a lock */
00057 #define STEP_RETRY      0x2     /* this is a retry of a previously-waiting cmd */
00058 static bool try_complete_step(Step * step, int flags);
00059 
00060 static int  step_qsort_cmp(const void *a, const void *b);
00061 static int  step_bsearch_cmp(const void *a, const void *b);
00062 
00063 static void printResultSet(PGresult *res);
00064 
00065 /* close all connections and exit */
00066 static void
00067 exit_nicely(void)
00068 {
00069     int         i;
00070 
00071     for (i = 0; i < nconns; i++)
00072         PQfinish(conns[i]);
00073     fflush(stderr);
00074     fflush(stdout);
00075     exit(1);
00076 }
00077 
00078 int
00079 main(int argc, char **argv)
00080 {
00081     const char *conninfo;
00082     TestSpec   *testspec;
00083     int         i;
00084     PGresult   *res;
00085     PQExpBufferData wait_query;
00086     int         opt;
00087 
00088     while ((opt = getopt(argc, argv, "n")) != -1)
00089     {
00090         switch (opt)
00091         {
00092             case 'n':
00093                 dry_run = true;
00094                 break;
00095             default:
00096                 fprintf(stderr, "Usage: isolationtester [-n] [CONNINFO]\n");
00097                 return EXIT_FAILURE;
00098         }
00099     }
00100 
00101     /*
00102      * If the user supplies a non-option parameter on the command line, use it
00103      * as the conninfo string; otherwise default to setting dbname=postgres
00104      * and using environment variables or defaults for all other connection
00105      * parameters.
00106      */
00107     if (argc > optind)
00108         conninfo = argv[optind];
00109     else
00110         conninfo = "dbname = postgres";
00111 
00112     /* Read the test spec from stdin */
00113     spec_yyparse();
00114     testspec = &parseresult;
00115 
00116     /*
00117      * In dry-run mode, just print the permutations that would be run, and
00118      * exit.
00119      */
00120     if (dry_run)
00121     {
00122         run_testspec(testspec);
00123         return 0;
00124     }
00125 
00126     printf("Parsed test spec with %d sessions\n", testspec->nsessions);
00127 
00128     /*
00129      * Establish connections to the database, one for each session and an
00130      * extra for lock wait detection and global work.
00131      */
00132     nconns = 1 + testspec->nsessions;
00133     conns = calloc(nconns, sizeof(PGconn *));
00134     backend_pids = calloc(nconns, sizeof(*backend_pids));
00135     for (i = 0; i < nconns; i++)
00136     {
00137         conns[i] = PQconnectdb(conninfo);
00138         if (PQstatus(conns[i]) != CONNECTION_OK)
00139         {
00140             fprintf(stderr, "Connection %d to database failed: %s",
00141                     i, PQerrorMessage(conns[i]));
00142             exit_nicely();
00143         }
00144 
00145         /*
00146          * Suppress NOTIFY messages, which otherwise pop into results at odd
00147          * places.
00148          */
00149         res = PQexec(conns[i], "SET client_min_messages = warning;");
00150         if (PQresultStatus(res) != PGRES_COMMAND_OK)
00151         {
00152             fprintf(stderr, "message level setup failed: %s", PQerrorMessage(conns[i]));
00153             exit_nicely();
00154         }
00155         PQclear(res);
00156 
00157         /* Get the backend pid for lock wait checking. */
00158         res = PQexec(conns[i], "SELECT pg_backend_pid()");
00159         if (PQresultStatus(res) == PGRES_TUPLES_OK)
00160         {
00161             if (PQntuples(res) == 1 && PQnfields(res) == 1)
00162                 backend_pids[i] = strdup(PQgetvalue(res, 0, 0));
00163             else
00164             {
00165                 fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column",
00166                         PQntuples(res), PQnfields(res));
00167                 exit_nicely();
00168             }
00169         }
00170         else
00171         {
00172             fprintf(stderr, "backend pid query failed: %s",
00173                     PQerrorMessage(conns[i]));
00174             exit_nicely();
00175         }
00176         PQclear(res);
00177     }
00178 
00179     /* Set the session index fields in steps. */
00180     for (i = 0; i < testspec->nsessions; i++)
00181     {
00182         Session    *session = testspec->sessions[i];
00183         int         stepindex;
00184 
00185         for (stepindex = 0; stepindex < session->nsteps; stepindex++)
00186             session->steps[stepindex]->session = i;
00187     }
00188 
00189     /*
00190      * Build the query we'll use to detect lock contention among sessions in
00191      * the test specification.  Most of the time, we could get away with
00192      * simply checking whether a session is waiting for *any* lock: we don't
00193      * exactly expect concurrent use of test tables.  However, autovacuum will
00194      * occasionally take AccessExclusiveLock to truncate a table, and we must
00195      * ignore that transient wait.
00196      */
00197     initPQExpBuffer(&wait_query);
00198     appendPQExpBufferStr(&wait_query,
00199                          "SELECT 1 FROM pg_locks holder, pg_locks waiter "
00200                          "WHERE NOT waiter.granted AND waiter.pid = $1 "
00201                          "AND holder.granted "
00202                          "AND holder.pid <> $1 AND holder.pid IN (");
00203     /* The spec syntax requires at least one session; assume that here. */
00204     appendPQExpBuffer(&wait_query, "%s", backend_pids[1]);
00205     for (i = 2; i < nconns; i++)
00206         appendPQExpBuffer(&wait_query, ", %s", backend_pids[i]);
00207     appendPQExpBufferStr(&wait_query,
00208                          ") "
00209 
00210                          "AND holder.mode = ANY (CASE waiter.mode "
00211                          "WHEN 'AccessShareLock' THEN ARRAY["
00212                          "'AccessExclusiveLock'] "
00213                          "WHEN 'RowShareLock' THEN ARRAY["
00214                          "'ExclusiveLock',"
00215                          "'AccessExclusiveLock'] "
00216                          "WHEN 'RowExclusiveLock' THEN ARRAY["
00217                          "'ShareLock',"
00218                          "'ShareRowExclusiveLock',"
00219                          "'ExclusiveLock',"
00220                          "'AccessExclusiveLock'] "
00221                          "WHEN 'ShareUpdateExclusiveLock' THEN ARRAY["
00222                          "'ShareUpdateExclusiveLock',"
00223                          "'ShareLock',"
00224                          "'ShareRowExclusiveLock',"
00225                          "'ExclusiveLock',"
00226                          "'AccessExclusiveLock'] "
00227                          "WHEN 'ShareLock' THEN ARRAY["
00228                          "'RowExclusiveLock',"
00229                          "'ShareUpdateExclusiveLock',"
00230                          "'ShareRowExclusiveLock',"
00231                          "'ExclusiveLock',"
00232                          "'AccessExclusiveLock'] "
00233                          "WHEN 'ShareRowExclusiveLock' THEN ARRAY["
00234                          "'RowExclusiveLock',"
00235                          "'ShareUpdateExclusiveLock',"
00236                          "'ShareLock',"
00237                          "'ShareRowExclusiveLock',"
00238                          "'ExclusiveLock',"
00239                          "'AccessExclusiveLock'] "
00240                          "WHEN 'ExclusiveLock' THEN ARRAY["
00241                          "'RowShareLock',"
00242                          "'RowExclusiveLock',"
00243                          "'ShareUpdateExclusiveLock',"
00244                          "'ShareLock',"
00245                          "'ShareRowExclusiveLock',"
00246                          "'ExclusiveLock',"
00247                          "'AccessExclusiveLock'] "
00248                          "WHEN 'AccessExclusiveLock' THEN ARRAY["
00249                          "'AccessShareLock',"
00250                          "'RowShareLock',"
00251                          "'RowExclusiveLock',"
00252                          "'ShareUpdateExclusiveLock',"
00253                          "'ShareLock',"
00254                          "'ShareRowExclusiveLock',"
00255                          "'ExclusiveLock',"
00256                          "'AccessExclusiveLock'] END) "
00257 
00258                   "AND holder.locktype IS NOT DISTINCT FROM waiter.locktype "
00259                   "AND holder.database IS NOT DISTINCT FROM waiter.database "
00260                   "AND holder.relation IS NOT DISTINCT FROM waiter.relation "
00261                          "AND holder.page IS NOT DISTINCT FROM waiter.page "
00262                          "AND holder.tuple IS NOT DISTINCT FROM waiter.tuple "
00263               "AND holder.virtualxid IS NOT DISTINCT FROM waiter.virtualxid "
00264         "AND holder.transactionid IS NOT DISTINCT FROM waiter.transactionid "
00265                     "AND holder.classid IS NOT DISTINCT FROM waiter.classid "
00266                          "AND holder.objid IS NOT DISTINCT FROM waiter.objid "
00267                 "AND holder.objsubid IS NOT DISTINCT FROM waiter.objsubid ");
00268 
00269     res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL);
00270     if (PQresultStatus(res) != PGRES_COMMAND_OK)
00271     {
00272         fprintf(stderr, "prepare of lock wait query failed: %s",
00273                 PQerrorMessage(conns[0]));
00274         exit_nicely();
00275     }
00276     PQclear(res);
00277     termPQExpBuffer(&wait_query);
00278 
00279     /*
00280      * Run the permutations specified in the spec, or all if none were
00281      * explicitly specified.
00282      */
00283     run_testspec(testspec);
00284 
00285     /* Clean up and exit */
00286     for (i = 0; i < nconns; i++)
00287         PQfinish(conns[i]);
00288     fflush(stderr);
00289     fflush(stdout);
00290     return 0;
00291 }
00292 
00293 static int *piles;
00294 
00295 /*
00296  * Run the permutations specified in the spec, or all if none were
00297  * explicitly specified.
00298  */
00299 static void
00300 run_testspec(TestSpec * testspec)
00301 {
00302     if (testspec->permutations)
00303         run_named_permutations(testspec);
00304     else
00305         run_all_permutations(testspec);
00306 }
00307 
00308 /*
00309  * Run all permutations of the steps and sessions.
00310  */
00311 static void
00312 run_all_permutations(TestSpec * testspec)
00313 {
00314     int         nsteps;
00315     int         i;
00316     Step      **steps;
00317 
00318     /* Count the total number of steps in all sessions */
00319     nsteps = 0;
00320     for (i = 0; i < testspec->nsessions; i++)
00321         nsteps += testspec->sessions[i]->nsteps;
00322 
00323     steps = malloc(sizeof(Step *) * nsteps);
00324 
00325     /*
00326      * To generate the permutations, we conceptually put the steps of each
00327      * session on a pile. To generate a permutation, we pick steps from the
00328      * piles until all piles are empty. By picking steps from piles in
00329      * different order, we get different permutations.
00330      *
00331      * A pile is actually just an integer which tells how many steps we've
00332      * already picked from this pile.
00333      */
00334     piles = malloc(sizeof(int) * testspec->nsessions);
00335     for (i = 0; i < testspec->nsessions; i++)
00336         piles[i] = 0;
00337 
00338     run_all_permutations_recurse(testspec, 0, steps);
00339 }
00340 
00341 static void
00342 run_all_permutations_recurse(TestSpec * testspec, int nsteps, Step ** steps)
00343 {
00344     int         i;
00345     int         found = 0;
00346 
00347     for (i = 0; i < testspec->nsessions; i++)
00348     {
00349         /* If there's any more steps in this pile, pick it and recurse */
00350         if (piles[i] < testspec->sessions[i]->nsteps)
00351         {
00352             steps[nsteps] = testspec->sessions[i]->steps[piles[i]];
00353             piles[i]++;
00354 
00355             run_all_permutations_recurse(testspec, nsteps + 1, steps);
00356 
00357             piles[i]--;
00358 
00359             found = 1;
00360         }
00361     }
00362 
00363     /* If all the piles were empty, this permutation is completed. Run it */
00364     if (!found)
00365         run_permutation(testspec, nsteps, steps);
00366 }
00367 
00368 /*
00369  * Run permutations given in the test spec
00370  */
00371 static void
00372 run_named_permutations(TestSpec * testspec)
00373 {
00374     int         i,
00375                 j;
00376     int         n;
00377     int         nallsteps;
00378     Step      **allsteps;
00379 
00380     /* First create a lookup table of all steps */
00381     nallsteps = 0;
00382     for (i = 0; i < testspec->nsessions; i++)
00383         nallsteps += testspec->sessions[i]->nsteps;
00384 
00385     allsteps = malloc(nallsteps * sizeof(Step *));
00386 
00387     n = 0;
00388     for (i = 0; i < testspec->nsessions; i++)
00389     {
00390         for (j = 0; j < testspec->sessions[i]->nsteps; j++)
00391             allsteps[n++] = testspec->sessions[i]->steps[j];
00392     }
00393 
00394     qsort(allsteps, nallsteps, sizeof(Step *), &step_qsort_cmp);
00395 
00396     for (i = 0; i < testspec->npermutations; i++)
00397     {
00398         Permutation *p = testspec->permutations[i];
00399         Step      **steps;
00400 
00401         steps = malloc(p->nsteps * sizeof(Step *));
00402 
00403         /* Find all the named steps using the lookup table */
00404         for (j = 0; j < p->nsteps; j++)
00405         {
00406             Step      **this = (Step **) bsearch(p->stepnames[j], allsteps,
00407                                                  nallsteps, sizeof(Step *),
00408                                                  &step_bsearch_cmp);
00409 
00410             if (this == NULL)
00411             {
00412                 fprintf(stderr, "undefined step \"%s\" specified in permutation\n",
00413                         p->stepnames[j]);
00414                 exit_nicely();
00415             }
00416             steps[j] = *this;
00417         }
00418 
00419         /* And run them */
00420         run_permutation(testspec, p->nsteps, steps);
00421 
00422         free(steps);
00423     }
00424 }
00425 
00426 static int
00427 step_qsort_cmp(const void *a, const void *b)
00428 {
00429     Step       *stepa = *((Step **) a);
00430     Step       *stepb = *((Step **) b);
00431 
00432     return strcmp(stepa->name, stepb->name);
00433 }
00434 
00435 static int
00436 step_bsearch_cmp(const void *a, const void *b)
00437 {
00438     char       *stepname = (char *) a;
00439     Step       *step = *((Step **) b);
00440 
00441     return strcmp(stepname, step->name);
00442 }
00443 
00444 /*
00445  * If a step caused an error to be reported, print it out and clear it.
00446  */
00447 static void
00448 report_error_message(Step * step)
00449 {
00450     if (step->errormsg)
00451     {
00452         fprintf(stdout, "%s\n", step->errormsg);
00453         free(step->errormsg);
00454         step->errormsg = NULL;
00455     }
00456 }
00457 
00458 /*
00459  * As above, but reports messages possibly emitted by two steps.  This is
00460  * useful when we have a blocked command awakened by another one; we want to
00461  * report both messages identically, for the case where we don't care which
00462  * one fails due to a timeout such as deadlock timeout.
00463  */
00464 static void
00465 report_two_error_messages(Step * step1, Step * step2)
00466 {
00467     char       *prefix;
00468 
00469     prefix = malloc(strlen(step1->name) + strlen(step2->name) + 2);
00470     sprintf(prefix, "%s %s", step1->name, step2->name);
00471 
00472     if (step1->errormsg)
00473     {
00474         fprintf(stdout, "error in steps %s: %s\n", prefix,
00475                 step1->errormsg);
00476         free(step1->errormsg);
00477         step1->errormsg = NULL;
00478     }
00479     if (step2->errormsg)
00480     {
00481         fprintf(stdout, "error in steps %s: %s\n", prefix,
00482                 step2->errormsg);
00483         free(step2->errormsg);
00484         step2->errormsg = NULL;
00485     }
00486 
00487     free(prefix);
00488 }
00489 
00490 /*
00491  * Run one permutation
00492  */
00493 static void
00494 run_permutation(TestSpec * testspec, int nsteps, Step ** steps)
00495 {
00496     PGresult   *res;
00497     int         i;
00498     Step       *waiting = NULL;
00499 
00500     /*
00501      * In dry run mode, just display the permutation in the same format used
00502      * by spec files, and return.
00503      */
00504     if (dry_run)
00505     {
00506         printf("permutation");
00507         for (i = 0; i < nsteps; i++)
00508             printf(" \"%s\"", steps[i]->name);
00509         printf("\n");
00510         return;
00511     }
00512 
00513     printf("\nstarting permutation:");
00514     for (i = 0; i < nsteps; i++)
00515         printf(" %s", steps[i]->name);
00516     printf("\n");
00517 
00518     /* Perform setup */
00519     for (i = 0; i < testspec->nsetupsqls; i++)
00520     {
00521         res = PQexec(conns[0], testspec->setupsqls[i]);
00522         if (PQresultStatus(res) != PGRES_COMMAND_OK)
00523         {
00524             fprintf(stderr, "setup failed: %s", PQerrorMessage(conns[0]));
00525             exit_nicely();
00526         }
00527         PQclear(res);
00528     }
00529 
00530     /* Perform per-session setup */
00531     for (i = 0; i < testspec->nsessions; i++)
00532     {
00533         if (testspec->sessions[i]->setupsql)
00534         {
00535             res = PQexec(conns[i + 1], testspec->sessions[i]->setupsql);
00536             if (PQresultStatus(res) == PGRES_TUPLES_OK)
00537             {
00538                 printResultSet(res);
00539             }
00540             else if (PQresultStatus(res) != PGRES_COMMAND_OK)
00541             {
00542                 fprintf(stderr, "setup of session %s failed: %s",
00543                         testspec->sessions[i]->name,
00544                         PQerrorMessage(conns[i + 1]));
00545                 exit_nicely();
00546             }
00547             PQclear(res);
00548         }
00549     }
00550 
00551     /* Perform steps */
00552     for (i = 0; i < nsteps; i++)
00553     {
00554         Step       *step = steps[i];
00555         PGconn     *conn = conns[1 + step->session];
00556 
00557         if (waiting != NULL && step->session == waiting->session)
00558         {
00559             PGcancel   *cancel;
00560             PGresult   *res;
00561             int         j;
00562 
00563             /*
00564              * This permutation is invalid: it can never happen in real life.
00565              *
00566              * A session is blocked on an earlier step (waiting) and no
00567              * further steps from this session can run until it is unblocked,
00568              * but it can only be unblocked by running steps from other
00569              * sessions.
00570              */
00571             fflush(stdout);
00572             fprintf(stderr, "invalid permutation detected\n");
00573             fflush(stderr);
00574 
00575             /* Cancel the waiting statement from this session. */
00576             cancel = PQgetCancel(conn);
00577             if (cancel != NULL)
00578             {
00579                 char        buf[256];
00580 
00581                 if (!PQcancel(cancel, buf, sizeof(buf)))
00582                     fprintf(stderr, "PQcancel failed: %s\n", buf);
00583 
00584                 /* Be sure to consume the error message. */
00585                 while ((res = PQgetResult(conn)) != NULL)
00586                     PQclear(res);
00587 
00588                 PQfreeCancel(cancel);
00589             }
00590 
00591             /*
00592              * Now we really have to complete all the running transactions to
00593              * make sure teardown doesn't block.
00594              */
00595             for (j = 1; j < nconns; j++)
00596             {
00597                 res = PQexec(conns[j], "ROLLBACK");
00598                 if (res != NULL)
00599                     PQclear(res);
00600             }
00601 
00602             goto teardown;
00603         }
00604 
00605         if (!PQsendQuery(conn, step->sql))
00606         {
00607             fprintf(stdout, "failed to send query for step %s: %s\n",
00608                     step->name, PQerrorMessage(conns[1 + step->session]));
00609             exit_nicely();
00610         }
00611 
00612         if (waiting != NULL)
00613         {
00614             /* Some other step is already waiting: just block. */
00615             try_complete_step(step, 0);
00616 
00617             /*
00618              * See if this step unblocked the waiting step; report both error
00619              * messages together if so.
00620              */
00621             if (!try_complete_step(waiting, STEP_NONBLOCK | STEP_RETRY))
00622             {
00623                 report_two_error_messages(step, waiting);
00624                 waiting = NULL;
00625             }
00626             else
00627                 report_error_message(step);
00628         }
00629         else
00630         {
00631             if (try_complete_step(step, STEP_NONBLOCK))
00632                 waiting = step;
00633             report_error_message(step);
00634         }
00635     }
00636 
00637     /* Finish any waiting query. */
00638     if (waiting != NULL)
00639     {
00640         try_complete_step(waiting, STEP_RETRY);
00641         report_error_message(waiting);
00642     }
00643 
00644 teardown:
00645     /* Perform per-session teardown */
00646     for (i = 0; i < testspec->nsessions; i++)
00647     {
00648         if (testspec->sessions[i]->teardownsql)
00649         {
00650             res = PQexec(conns[i + 1], testspec->sessions[i]->teardownsql);
00651             if (PQresultStatus(res) != PGRES_COMMAND_OK)
00652             {
00653                 fprintf(stderr, "teardown of session %s failed: %s",
00654                         testspec->sessions[i]->name,
00655                         PQerrorMessage(conns[i + 1]));
00656                 /* don't exit on teardown failure */
00657                 fflush(stderr);
00658             }
00659             PQclear(res);
00660         }
00661     }
00662 
00663     /* Perform teardown */
00664     if (testspec->teardownsql)
00665     {
00666         res = PQexec(conns[0], testspec->teardownsql);
00667         if (PQresultStatus(res) == PGRES_TUPLES_OK)
00668         {
00669             printResultSet(res);
00670         }
00671         else if (PQresultStatus(res) != PGRES_COMMAND_OK)
00672         {
00673             fprintf(stderr, "teardown failed: %s",
00674                     PQerrorMessage(conns[0]));
00675             /* don't exit on teardown failure */
00676             fflush(stderr);
00677         }
00678         PQclear(res);
00679     }
00680 }
00681 
00682 /*
00683  * Our caller already sent the query associated with this step.  Wait for it
00684  * to either complete or (if given the STEP_NONBLOCK flag) to block while
00685  * waiting for a lock.  We assume that any lock wait will persist until we
00686  * have executed additional steps in the permutation.
00687  *
00688  * When calling this function on behalf of a given step for a second or later
00689  * time, pass the STEP_RETRY flag.  This only affects the messages printed.
00690  *
00691  * If the connection returns an error, the message is saved in step->errormsg.
00692  * Caller should call report_error_message shortly after this, to have it
00693  * printed and cleared.
00694  *
00695  * If the STEP_NONBLOCK flag was specified and the query is waiting to acquire
00696  * a lock, returns true.  Otherwise, returns false.
00697  */
00698 static bool
00699 try_complete_step(Step * step, int flags)
00700 {
00701     PGconn     *conn = conns[1 + step->session];
00702     fd_set      read_set;
00703     struct timeval timeout;
00704     int         sock = PQsocket(conn);
00705     int         ret;
00706     PGresult   *res;
00707 
00708     FD_ZERO(&read_set);
00709 
00710     while ((flags & STEP_NONBLOCK) && PQisBusy(conn))
00711     {
00712         FD_SET(sock, &read_set);
00713         timeout.tv_sec = 0;
00714         timeout.tv_usec = 10000;    /* Check for lock waits every 10ms. */
00715 
00716         ret = select(sock + 1, &read_set, NULL, NULL, &timeout);
00717         if (ret < 0)            /* error in select() */
00718         {
00719             if (errno == EINTR)
00720                 continue;
00721             fprintf(stderr, "select failed: %s\n", strerror(errno));
00722             exit_nicely();
00723         }
00724         else if (ret == 0)      /* select() timeout: check for lock wait */
00725         {
00726             int         ntuples;
00727 
00728             res = PQexecPrepared(conns[0], PREP_WAITING, 1,
00729                                  &backend_pids[step->session + 1],
00730                                  NULL, NULL, 0);
00731             if (PQresultStatus(res) != PGRES_TUPLES_OK)
00732             {
00733                 fprintf(stderr, "lock wait query failed: %s",
00734                         PQerrorMessage(conn));
00735                 exit_nicely();
00736             }
00737             ntuples = PQntuples(res);
00738             PQclear(res);
00739 
00740             if (ntuples >= 1)   /* waiting to acquire a lock */
00741             {
00742                 if (!(flags & STEP_RETRY))
00743                     printf("step %s: %s <waiting ...>\n",
00744                            step->name, step->sql);
00745                 return true;
00746             }
00747             /* else, not waiting: give it more time */
00748         }
00749         else if (!PQconsumeInput(conn)) /* select(): data available */
00750         {
00751             fprintf(stderr, "PQconsumeInput failed: %s\n",
00752                     PQerrorMessage(conn));
00753             exit_nicely();
00754         }
00755     }
00756 
00757     if (flags & STEP_RETRY)
00758         printf("step %s: <... completed>\n", step->name);
00759     else
00760         printf("step %s: %s\n", step->name, step->sql);
00761 
00762     while ((res = PQgetResult(conn)))
00763     {
00764         switch (PQresultStatus(res))
00765         {
00766             case PGRES_COMMAND_OK:
00767                 break;
00768             case PGRES_TUPLES_OK:
00769                 printResultSet(res);
00770                 break;
00771             case PGRES_FATAL_ERROR:
00772                 if (step->errormsg != NULL)
00773                 {
00774                     printf("WARNING: this step had a leftover error message\n");
00775                     printf("%s\n", step->errormsg);
00776                 }
00777 
00778                 /*
00779                  * Detail may contain XID values, so we want to just show
00780                  * primary.  Beware however that libpq-generated error results
00781                  * may not contain subfields, only an old-style message.
00782                  */
00783                 {
00784                     const char *sev = PQresultErrorField(res,
00785                                                          PG_DIAG_SEVERITY);
00786                     const char *msg = PQresultErrorField(res,
00787                                                     PG_DIAG_MESSAGE_PRIMARY);
00788 
00789                     if (sev && msg)
00790                     {
00791                         step->errormsg = malloc(5 + strlen(sev) + strlen(msg));
00792                         sprintf(step->errormsg, "%s:  %s", sev, msg);
00793                     }
00794                     else
00795                         step->errormsg = strdup(PQresultErrorMessage(res));
00796                 }
00797                 break;
00798             default:
00799                 printf("unexpected result status: %s\n",
00800                        PQresStatus(PQresultStatus(res)));
00801         }
00802         PQclear(res);
00803     }
00804 
00805     return false;
00806 }
00807 
00808 static void
00809 printResultSet(PGresult *res)
00810 {
00811     int         nFields;
00812     int         i,
00813                 j;
00814 
00815     /* first, print out the attribute names */
00816     nFields = PQnfields(res);
00817     for (i = 0; i < nFields; i++)
00818         printf("%-15s", PQfname(res, i));
00819     printf("\n\n");
00820 
00821     /* next, print out the rows */
00822     for (i = 0; i < PQntuples(res); i++)
00823     {
00824         for (j = 0; j < nFields; j++)
00825             printf("%-15s", PQgetvalue(res, i, j));
00826         printf("\n");
00827     }
00828 }