Header And Logo

PostgreSQL
| The world's most advanced open source database.

pquery.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * pquery.c
00004  *    POSTGRES process query command code
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/tcop/pquery.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/xact.h"
00019 #include "commands/prepare.h"
00020 #include "executor/tstoreReceiver.h"
00021 #include "miscadmin.h"
00022 #include "pg_trace.h"
00023 #include "tcop/pquery.h"
00024 #include "tcop/utility.h"
00025 #include "utils/memutils.h"
00026 #include "utils/snapmgr.h"
00027 
00028 
00029 /*
00030  * ActivePortal is the currently executing Portal (the most closely nested,
00031  * if there are several).
00032  */
00033 Portal      ActivePortal = NULL;
00034 
00035 
00036 static void ProcessQuery(PlannedStmt *plan,
00037              const char *sourceText,
00038              ParamListInfo params,
00039              DestReceiver *dest,
00040              char *completionTag);
00041 static void FillPortalStore(Portal portal, bool isTopLevel);
00042 static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
00043              DestReceiver *dest);
00044 static long PortalRunSelect(Portal portal, bool forward, long count,
00045                 DestReceiver *dest);
00046 static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
00047                  DestReceiver *dest, char *completionTag);
00048 static void PortalRunMulti(Portal portal, bool isTopLevel,
00049                DestReceiver *dest, DestReceiver *altdest,
00050                char *completionTag);
00051 static long DoPortalRunFetch(Portal portal,
00052                  FetchDirection fdirection,
00053                  long count,
00054                  DestReceiver *dest);
00055 static void DoPortalRewind(Portal portal);
00056 
00057 
00058 /*
00059  * CreateQueryDesc
00060  */
00061 QueryDesc *
00062 CreateQueryDesc(PlannedStmt *plannedstmt,
00063                 const char *sourceText,
00064                 Snapshot snapshot,
00065                 Snapshot crosscheck_snapshot,
00066                 DestReceiver *dest,
00067                 ParamListInfo params,
00068                 int instrument_options)
00069 {
00070     QueryDesc  *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
00071 
00072     qd->operation = plannedstmt->commandType;   /* operation */
00073     qd->plannedstmt = plannedstmt;      /* plan */
00074     qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */
00075     qd->sourceText = sourceText;    /* query text */
00076     qd->snapshot = RegisterSnapshot(snapshot);  /* snapshot */
00077     /* RI check snapshot */
00078     qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
00079     qd->dest = dest;            /* output dest */
00080     qd->params = params;        /* parameter values passed into query */
00081     qd->instrument_options = instrument_options;        /* instrumentation
00082                                                          * wanted? */
00083 
00084     /* null these fields until set by ExecutorStart */
00085     qd->tupDesc = NULL;
00086     qd->estate = NULL;
00087     qd->planstate = NULL;
00088     qd->totaltime = NULL;
00089 
00090     return qd;
00091 }
00092 
00093 /*
00094  * CreateUtilityQueryDesc
00095  */
00096 QueryDesc *
00097 CreateUtilityQueryDesc(Node *utilitystmt,
00098                        const char *sourceText,
00099                        Snapshot snapshot,
00100                        DestReceiver *dest,
00101                        ParamListInfo params)
00102 {
00103     QueryDesc  *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
00104 
00105     qd->operation = CMD_UTILITY;    /* operation */
00106     qd->plannedstmt = NULL;
00107     qd->utilitystmt = utilitystmt;      /* utility command */
00108     qd->sourceText = sourceText;    /* query text */
00109     qd->snapshot = RegisterSnapshot(snapshot);  /* snapshot */
00110     qd->crosscheck_snapshot = InvalidSnapshot;  /* RI check snapshot */
00111     qd->dest = dest;            /* output dest */
00112     qd->params = params;        /* parameter values passed into query */
00113     qd->instrument_options = false;     /* uninteresting for utilities */
00114 
00115     /* null these fields until set by ExecutorStart */
00116     qd->tupDesc = NULL;
00117     qd->estate = NULL;
00118     qd->planstate = NULL;
00119     qd->totaltime = NULL;
00120 
00121     return qd;
00122 }
00123 
00124 /*
00125  * FreeQueryDesc
00126  */
00127 void
00128 FreeQueryDesc(QueryDesc *qdesc)
00129 {
00130     /* Can't be a live query */
00131     Assert(qdesc->estate == NULL);
00132 
00133     /* forget our snapshots */
00134     UnregisterSnapshot(qdesc->snapshot);
00135     UnregisterSnapshot(qdesc->crosscheck_snapshot);
00136 
00137     /* Only the QueryDesc itself need be freed */
00138     pfree(qdesc);
00139 }
00140 
00141 
00142 /*
00143  * ProcessQuery
00144  *      Execute a single plannable query within a PORTAL_MULTI_QUERY,
00145  *      PORTAL_ONE_RETURNING, or PORTAL_ONE_MOD_WITH portal
00146  *
00147  *  plan: the plan tree for the query
00148  *  sourceText: the source text of the query
00149  *  params: any parameters needed
00150  *  dest: where to send results
00151  *  completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
00152  *      in which to store a command completion status string.
00153  *
00154  * completionTag may be NULL if caller doesn't want a status string.
00155  *
00156  * Must be called in a memory context that will be reset or deleted on
00157  * error; otherwise the executor's memory usage will be leaked.
00158  */
00159 static void
00160 ProcessQuery(PlannedStmt *plan,
00161              const char *sourceText,
00162              ParamListInfo params,
00163              DestReceiver *dest,
00164              char *completionTag)
00165 {
00166     QueryDesc  *queryDesc;
00167 
00168     elog(DEBUG3, "ProcessQuery");
00169 
00170     /*
00171      * Create the QueryDesc object
00172      */
00173     queryDesc = CreateQueryDesc(plan, sourceText,
00174                                 GetActiveSnapshot(), InvalidSnapshot,
00175                                 dest, params, 0);
00176 
00177     /*
00178      * Call ExecutorStart to prepare the plan for execution
00179      */
00180     ExecutorStart(queryDesc, 0);
00181 
00182     /*
00183      * Run the plan to completion.
00184      */
00185     ExecutorRun(queryDesc, ForwardScanDirection, 0L);
00186 
00187     /*
00188      * Build command completion status string, if caller wants one.
00189      */
00190     if (completionTag)
00191     {
00192         Oid         lastOid;
00193 
00194         switch (queryDesc->operation)
00195         {
00196             case CMD_SELECT:
00197                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00198                          "SELECT %u", queryDesc->estate->es_processed);
00199                 break;
00200             case CMD_INSERT:
00201                 if (queryDesc->estate->es_processed == 1)
00202                     lastOid = queryDesc->estate->es_lastoid;
00203                 else
00204                     lastOid = InvalidOid;
00205                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00206                    "INSERT %u %u", lastOid, queryDesc->estate->es_processed);
00207                 break;
00208             case CMD_UPDATE:
00209                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00210                          "UPDATE %u", queryDesc->estate->es_processed);
00211                 break;
00212             case CMD_DELETE:
00213                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00214                          "DELETE %u", queryDesc->estate->es_processed);
00215                 break;
00216             default:
00217                 strcpy(completionTag, "???");
00218                 break;
00219         }
00220     }
00221 
00222     /*
00223      * Now, we close down all the scans and free allocated resources.
00224      */
00225     ExecutorFinish(queryDesc);
00226     ExecutorEnd(queryDesc);
00227 
00228     FreeQueryDesc(queryDesc);
00229 }
00230 
00231 /*
00232  * ChoosePortalStrategy
00233  *      Select portal execution strategy given the intended statement list.
00234  *
00235  * The list elements can be Querys, PlannedStmts, or utility statements.
00236  * That's more general than portals need, but plancache.c uses this too.
00237  *
00238  * See the comments in portal.h.
00239  */
00240 PortalStrategy
00241 ChoosePortalStrategy(List *stmts)
00242 {
00243     int         nSetTag;
00244     ListCell   *lc;
00245 
00246     /*
00247      * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the
00248      * single-statement case, since there are no rewrite rules that can add
00249      * auxiliary queries to a SELECT or a utility command. PORTAL_ONE_MOD_WITH
00250      * likewise allows only one top-level statement.
00251      */
00252     if (list_length(stmts) == 1)
00253     {
00254         Node       *stmt = (Node *) linitial(stmts);
00255 
00256         if (IsA(stmt, Query))
00257         {
00258             Query      *query = (Query *) stmt;
00259 
00260             if (query->canSetTag)
00261             {
00262                 if (query->commandType == CMD_SELECT &&
00263                     query->utilityStmt == NULL)
00264                 {
00265                     if (query->hasModifyingCTE)
00266                         return PORTAL_ONE_MOD_WITH;
00267                     else
00268                         return PORTAL_ONE_SELECT;
00269                 }
00270                 if (query->commandType == CMD_UTILITY &&
00271                     query->utilityStmt != NULL)
00272                 {
00273                     if (UtilityReturnsTuples(query->utilityStmt))
00274                         return PORTAL_UTIL_SELECT;
00275                     /* it can't be ONE_RETURNING, so give up */
00276                     return PORTAL_MULTI_QUERY;
00277                 }
00278             }
00279         }
00280         else if (IsA(stmt, PlannedStmt))
00281         {
00282             PlannedStmt *pstmt = (PlannedStmt *) stmt;
00283 
00284             if (pstmt->canSetTag)
00285             {
00286                 if (pstmt->commandType == CMD_SELECT &&
00287                     pstmt->utilityStmt == NULL)
00288                 {
00289                     if (pstmt->hasModifyingCTE)
00290                         return PORTAL_ONE_MOD_WITH;
00291                     else
00292                         return PORTAL_ONE_SELECT;
00293                 }
00294             }
00295         }
00296         else
00297         {
00298             /* must be a utility command; assume it's canSetTag */
00299             if (UtilityReturnsTuples(stmt))
00300                 return PORTAL_UTIL_SELECT;
00301             /* it can't be ONE_RETURNING, so give up */
00302             return PORTAL_MULTI_QUERY;
00303         }
00304     }
00305 
00306     /*
00307      * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite.
00308      * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and
00309      * it has a RETURNING list.
00310      */
00311     nSetTag = 0;
00312     foreach(lc, stmts)
00313     {
00314         Node       *stmt = (Node *) lfirst(lc);
00315 
00316         if (IsA(stmt, Query))
00317         {
00318             Query      *query = (Query *) stmt;
00319 
00320             if (query->canSetTag)
00321             {
00322                 if (++nSetTag > 1)
00323                     return PORTAL_MULTI_QUERY;  /* no need to look further */
00324                 if (query->returningList == NIL)
00325                     return PORTAL_MULTI_QUERY;  /* no need to look further */
00326             }
00327         }
00328         else if (IsA(stmt, PlannedStmt))
00329         {
00330             PlannedStmt *pstmt = (PlannedStmt *) stmt;
00331 
00332             if (pstmt->canSetTag)
00333             {
00334                 if (++nSetTag > 1)
00335                     return PORTAL_MULTI_QUERY;  /* no need to look further */
00336                 if (!pstmt->hasReturning)
00337                     return PORTAL_MULTI_QUERY;  /* no need to look further */
00338             }
00339         }
00340         /* otherwise, utility command, assumed not canSetTag */
00341     }
00342     if (nSetTag == 1)
00343         return PORTAL_ONE_RETURNING;
00344 
00345     /* Else, it's the general case... */
00346     return PORTAL_MULTI_QUERY;
00347 }
00348 
00349 /*
00350  * FetchPortalTargetList
00351  *      Given a portal that returns tuples, extract the query targetlist.
00352  *      Returns NIL if the portal doesn't have a determinable targetlist.
00353  *
00354  * Note: do not modify the result.
00355  */
00356 List *
00357 FetchPortalTargetList(Portal portal)
00358 {
00359     /* no point in looking if we determined it doesn't return tuples */
00360     if (portal->strategy == PORTAL_MULTI_QUERY)
00361         return NIL;
00362     /* get the primary statement and find out what it returns */
00363     return FetchStatementTargetList(PortalGetPrimaryStmt(portal));
00364 }
00365 
00366 /*
00367  * FetchStatementTargetList
00368  *      Given a statement that returns tuples, extract the query targetlist.
00369  *      Returns NIL if the statement doesn't have a determinable targetlist.
00370  *
00371  * This can be applied to a Query, a PlannedStmt, or a utility statement.
00372  * That's more general than portals need, but plancache.c uses this too.
00373  *
00374  * Note: do not modify the result.
00375  *
00376  * XXX be careful to keep this in sync with UtilityReturnsTuples.
00377  */
00378 List *
00379 FetchStatementTargetList(Node *stmt)
00380 {
00381     if (stmt == NULL)
00382         return NIL;
00383     if (IsA(stmt, Query))
00384     {
00385         Query      *query = (Query *) stmt;
00386 
00387         if (query->commandType == CMD_UTILITY &&
00388             query->utilityStmt != NULL)
00389         {
00390             /* transfer attention to utility statement */
00391             stmt = query->utilityStmt;
00392         }
00393         else
00394         {
00395             if (query->commandType == CMD_SELECT &&
00396                 query->utilityStmt == NULL)
00397                 return query->targetList;
00398             if (query->returningList)
00399                 return query->returningList;
00400             return NIL;
00401         }
00402     }
00403     if (IsA(stmt, PlannedStmt))
00404     {
00405         PlannedStmt *pstmt = (PlannedStmt *) stmt;
00406 
00407         if (pstmt->commandType == CMD_SELECT &&
00408             pstmt->utilityStmt == NULL)
00409             return pstmt->planTree->targetlist;
00410         if (pstmt->hasReturning)
00411             return pstmt->planTree->targetlist;
00412         return NIL;
00413     }
00414     if (IsA(stmt, FetchStmt))
00415     {
00416         FetchStmt  *fstmt = (FetchStmt *) stmt;
00417         Portal      subportal;
00418 
00419         Assert(!fstmt->ismove);
00420         subportal = GetPortalByName(fstmt->portalname);
00421         Assert(PortalIsValid(subportal));
00422         return FetchPortalTargetList(subportal);
00423     }
00424     if (IsA(stmt, ExecuteStmt))
00425     {
00426         ExecuteStmt *estmt = (ExecuteStmt *) stmt;
00427         PreparedStatement *entry;
00428 
00429         entry = FetchPreparedStatement(estmt->name, true);
00430         return FetchPreparedStatementTargetList(entry);
00431     }
00432     return NIL;
00433 }
00434 
00435 /*
00436  * PortalStart
00437  *      Prepare a portal for execution.
00438  *
00439  * Caller must already have created the portal, done PortalDefineQuery(),
00440  * and adjusted portal options if needed.
00441  *
00442  * If parameters are needed by the query, they must be passed in "params"
00443  * (caller is responsible for giving them appropriate lifetime).
00444  *
00445  * The caller can also provide an initial set of "eflags" to be passed to
00446  * ExecutorStart (but note these can be modified internally, and they are
00447  * currently only honored for PORTAL_ONE_SELECT portals).  Most callers
00448  * should simply pass zero.
00449  *
00450  * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
00451  * for the normal behavior of setting a new snapshot.  This parameter is
00452  * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
00453  * to be used for cursors).
00454  *
00455  * On return, portal is ready to accept PortalRun() calls, and the result
00456  * tupdesc (if any) is known.
00457  */
00458 void
00459 PortalStart(Portal portal, ParamListInfo params,
00460             int eflags, Snapshot snapshot)
00461 {
00462     Portal      saveActivePortal;
00463     ResourceOwner saveResourceOwner;
00464     MemoryContext savePortalContext;
00465     MemoryContext oldContext;
00466     QueryDesc  *queryDesc;
00467     int         myeflags;
00468 
00469     AssertArg(PortalIsValid(portal));
00470     AssertState(portal->status == PORTAL_DEFINED);
00471 
00472     /*
00473      * Set up global portal context pointers.
00474      */
00475     saveActivePortal = ActivePortal;
00476     saveResourceOwner = CurrentResourceOwner;
00477     savePortalContext = PortalContext;
00478     PG_TRY();
00479     {
00480         ActivePortal = portal;
00481         CurrentResourceOwner = portal->resowner;
00482         PortalContext = PortalGetHeapMemory(portal);
00483 
00484         oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
00485 
00486         /* Must remember portal param list, if any */
00487         portal->portalParams = params;
00488 
00489         /*
00490          * Determine the portal execution strategy
00491          */
00492         portal->strategy = ChoosePortalStrategy(portal->stmts);
00493 
00494         /*
00495          * Fire her up according to the strategy
00496          */
00497         switch (portal->strategy)
00498         {
00499             case PORTAL_ONE_SELECT:
00500 
00501                 /* Must set snapshot before starting executor. */
00502                 if (snapshot)
00503                     PushActiveSnapshot(snapshot);
00504                 else
00505                     PushActiveSnapshot(GetTransactionSnapshot());
00506 
00507                 /*
00508                  * Create QueryDesc in portal's context; for the moment, set
00509                  * the destination to DestNone.
00510                  */
00511                 queryDesc = CreateQueryDesc((PlannedStmt *) linitial(portal->stmts),
00512                                             portal->sourceText,
00513                                             GetActiveSnapshot(),
00514                                             InvalidSnapshot,
00515                                             None_Receiver,
00516                                             params,
00517                                             0);
00518 
00519                 /*
00520                  * If it's a scrollable cursor, executor needs to support
00521                  * REWIND and backwards scan, as well as whatever the caller
00522                  * might've asked for.
00523                  */
00524                 if (portal->cursorOptions & CURSOR_OPT_SCROLL)
00525                     myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
00526                 else
00527                     myeflags = eflags;
00528 
00529                 /*
00530                  * Call ExecutorStart to prepare the plan for execution
00531                  */
00532                 ExecutorStart(queryDesc, myeflags);
00533 
00534                 /*
00535                  * This tells PortalCleanup to shut down the executor
00536                  */
00537                 portal->queryDesc = queryDesc;
00538 
00539                 /*
00540                  * Remember tuple descriptor (computed by ExecutorStart)
00541                  */
00542                 portal->tupDesc = queryDesc->tupDesc;
00543 
00544                 /*
00545                  * Reset cursor position data to "start of query"
00546                  */
00547                 portal->atStart = true;
00548                 portal->atEnd = false;  /* allow fetches */
00549                 portal->portalPos = 0;
00550                 portal->posOverflow = false;
00551 
00552                 PopActiveSnapshot();
00553                 break;
00554 
00555             case PORTAL_ONE_RETURNING:
00556             case PORTAL_ONE_MOD_WITH:
00557 
00558                 /*
00559                  * We don't start the executor until we are told to run the
00560                  * portal.  We do need to set up the result tupdesc.
00561                  */
00562                 {
00563                     PlannedStmt *pstmt;
00564 
00565                     pstmt = (PlannedStmt *) PortalGetPrimaryStmt(portal);
00566                     Assert(IsA(pstmt, PlannedStmt));
00567                     portal->tupDesc =
00568                         ExecCleanTypeFromTL(pstmt->planTree->targetlist,
00569                                             false);
00570                 }
00571 
00572                 /*
00573                  * Reset cursor position data to "start of query"
00574                  */
00575                 portal->atStart = true;
00576                 portal->atEnd = false;  /* allow fetches */
00577                 portal->portalPos = 0;
00578                 portal->posOverflow = false;
00579                 break;
00580 
00581             case PORTAL_UTIL_SELECT:
00582 
00583                 /*
00584                  * We don't set snapshot here, because PortalRunUtility will
00585                  * take care of it if needed.
00586                  */
00587                 {
00588                     Node       *ustmt = PortalGetPrimaryStmt(portal);
00589 
00590                     Assert(!IsA(ustmt, PlannedStmt));
00591                     portal->tupDesc = UtilityTupleDescriptor(ustmt);
00592                 }
00593 
00594                 /*
00595                  * Reset cursor position data to "start of query"
00596                  */
00597                 portal->atStart = true;
00598                 portal->atEnd = false;  /* allow fetches */
00599                 portal->portalPos = 0;
00600                 portal->posOverflow = false;
00601                 break;
00602 
00603             case PORTAL_MULTI_QUERY:
00604                 /* Need do nothing now */
00605                 portal->tupDesc = NULL;
00606                 break;
00607         }
00608     }
00609     PG_CATCH();
00610     {
00611         /* Uncaught error while executing portal: mark it dead */
00612         MarkPortalFailed(portal);
00613 
00614         /* Restore global vars and propagate error */
00615         ActivePortal = saveActivePortal;
00616         CurrentResourceOwner = saveResourceOwner;
00617         PortalContext = savePortalContext;
00618 
00619         PG_RE_THROW();
00620     }
00621     PG_END_TRY();
00622 
00623     MemoryContextSwitchTo(oldContext);
00624 
00625     ActivePortal = saveActivePortal;
00626     CurrentResourceOwner = saveResourceOwner;
00627     PortalContext = savePortalContext;
00628 
00629     portal->status = PORTAL_READY;
00630 }
00631 
00632 /*
00633  * PortalSetResultFormat
00634  *      Select the format codes for a portal's output.
00635  *
00636  * This must be run after PortalStart for a portal that will be read by
00637  * a DestRemote or DestRemoteExecute destination.  It is not presently needed
00638  * for other destination types.
00639  *
00640  * formats[] is the client format request, as per Bind message conventions.
00641  */
00642 void
00643 PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
00644 {
00645     int         natts;
00646     int         i;
00647 
00648     /* Do nothing if portal won't return tuples */
00649     if (portal->tupDesc == NULL)
00650         return;
00651     natts = portal->tupDesc->natts;
00652     portal->formats = (int16 *)
00653         MemoryContextAlloc(PortalGetHeapMemory(portal),
00654                            natts * sizeof(int16));
00655     if (nFormats > 1)
00656     {
00657         /* format specified for each column */
00658         if (nFormats != natts)
00659             ereport(ERROR,
00660                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
00661                      errmsg("bind message has %d result formats but query has %d columns",
00662                             nFormats, natts)));
00663         memcpy(portal->formats, formats, natts * sizeof(int16));
00664     }
00665     else if (nFormats > 0)
00666     {
00667         /* single format specified, use for all columns */
00668         int16       format1 = formats[0];
00669 
00670         for (i = 0; i < natts; i++)
00671             portal->formats[i] = format1;
00672     }
00673     else
00674     {
00675         /* use default format for all columns */
00676         for (i = 0; i < natts; i++)
00677             portal->formats[i] = 0;
00678     }
00679 }
00680 
00681 /*
00682  * PortalRun
00683  *      Run a portal's query or queries.
00684  *
00685  * count <= 0 is interpreted as a no-op: the destination gets started up
00686  * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
00687  * interpreted as "all rows".  Note that count is ignored in multi-query
00688  * situations, where we always run the portal to completion.
00689  *
00690  * isTopLevel: true if query is being executed at backend "top level"
00691  * (that is, directly from a client command message)
00692  *
00693  * dest: where to send output of primary (canSetTag) query
00694  *
00695  * altdest: where to send output of non-primary queries
00696  *
00697  * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
00698  *      in which to store a command completion status string.
00699  *      May be NULL if caller doesn't want a status string.
00700  *
00701  * Returns TRUE if the portal's execution is complete, FALSE if it was
00702  * suspended due to exhaustion of the count parameter.
00703  */
00704 bool
00705 PortalRun(Portal portal, long count, bool isTopLevel,
00706           DestReceiver *dest, DestReceiver *altdest,
00707           char *completionTag)
00708 {
00709     bool        result;
00710     uint32      nprocessed;
00711     ResourceOwner saveTopTransactionResourceOwner;
00712     MemoryContext saveTopTransactionContext;
00713     Portal      saveActivePortal;
00714     ResourceOwner saveResourceOwner;
00715     MemoryContext savePortalContext;
00716     MemoryContext saveMemoryContext;
00717 
00718     AssertArg(PortalIsValid(portal));
00719 
00720     TRACE_POSTGRESQL_QUERY_EXECUTE_START();
00721 
00722     /* Initialize completion tag to empty string */
00723     if (completionTag)
00724         completionTag[0] = '\0';
00725 
00726     if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
00727     {
00728         elog(DEBUG3, "PortalRun");
00729         /* PORTAL_MULTI_QUERY logs its own stats per query */
00730         ResetUsage();
00731     }
00732 
00733     /*
00734      * Check for improper portal use, and mark portal active.
00735      */
00736     if (portal->status != PORTAL_READY)
00737         ereport(ERROR,
00738                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00739                  errmsg("portal \"%s\" cannot be run", portal->name)));
00740     portal->status = PORTAL_ACTIVE;
00741 
00742     /*
00743      * Set up global portal context pointers.
00744      *
00745      * We have to play a special game here to support utility commands like
00746      * VACUUM and CLUSTER, which internally start and commit transactions.
00747      * When we are called to execute such a command, CurrentResourceOwner will
00748      * be pointing to the TopTransactionResourceOwner --- which will be
00749      * destroyed and replaced in the course of the internal commit and
00750      * restart.  So we need to be prepared to restore it as pointing to the
00751      * exit-time TopTransactionResourceOwner.  (Ain't that ugly?  This idea of
00752      * internally starting whole new transactions is not good.)
00753      * CurrentMemoryContext has a similar problem, but the other pointers we
00754      * save here will be NULL or pointing to longer-lived objects.
00755      */
00756     saveTopTransactionResourceOwner = TopTransactionResourceOwner;
00757     saveTopTransactionContext = TopTransactionContext;
00758     saveActivePortal = ActivePortal;
00759     saveResourceOwner = CurrentResourceOwner;
00760     savePortalContext = PortalContext;
00761     saveMemoryContext = CurrentMemoryContext;
00762     PG_TRY();
00763     {
00764         ActivePortal = portal;
00765         CurrentResourceOwner = portal->resowner;
00766         PortalContext = PortalGetHeapMemory(portal);
00767 
00768         MemoryContextSwitchTo(PortalContext);
00769 
00770         switch (portal->strategy)
00771         {
00772             case PORTAL_ONE_SELECT:
00773             case PORTAL_ONE_RETURNING:
00774             case PORTAL_ONE_MOD_WITH:
00775             case PORTAL_UTIL_SELECT:
00776 
00777                 /*
00778                  * If we have not yet run the command, do so, storing its
00779                  * results in the portal's tuplestore.  But we don't do that
00780                  * for the PORTAL_ONE_SELECT case.
00781                  */
00782                 if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
00783                     FillPortalStore(portal, isTopLevel);
00784 
00785                 /*
00786                  * Now fetch desired portion of results.
00787                  */
00788                 nprocessed = PortalRunSelect(portal, true, count, dest);
00789 
00790                 /*
00791                  * If the portal result contains a command tag and the caller
00792                  * gave us a pointer to store it, copy it. Patch the "SELECT"
00793                  * tag to also provide the rowcount.
00794                  */
00795                 if (completionTag && portal->commandTag)
00796                 {
00797                     if (strcmp(portal->commandTag, "SELECT") == 0)
00798                         snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00799                                  "SELECT %u", nprocessed);
00800                     else
00801                         strcpy(completionTag, portal->commandTag);
00802                 }
00803 
00804                 /* Mark portal not active */
00805                 portal->status = PORTAL_READY;
00806 
00807                 /*
00808                  * Since it's a forward fetch, say DONE iff atEnd is now true.
00809                  */
00810                 result = portal->atEnd;
00811                 break;
00812 
00813             case PORTAL_MULTI_QUERY:
00814                 PortalRunMulti(portal, isTopLevel,
00815                                dest, altdest, completionTag);
00816 
00817                 /* Prevent portal's commands from being re-executed */
00818                 MarkPortalDone(portal);
00819 
00820                 /* Always complete at end of RunMulti */
00821                 result = true;
00822                 break;
00823 
00824             default:
00825                 elog(ERROR, "unrecognized portal strategy: %d",
00826                      (int) portal->strategy);
00827                 result = false; /* keep compiler quiet */
00828                 break;
00829         }
00830     }
00831     PG_CATCH();
00832     {
00833         /* Uncaught error while executing portal: mark it dead */
00834         MarkPortalFailed(portal);
00835 
00836         /* Restore global vars and propagate error */
00837         if (saveMemoryContext == saveTopTransactionContext)
00838             MemoryContextSwitchTo(TopTransactionContext);
00839         else
00840             MemoryContextSwitchTo(saveMemoryContext);
00841         ActivePortal = saveActivePortal;
00842         if (saveResourceOwner == saveTopTransactionResourceOwner)
00843             CurrentResourceOwner = TopTransactionResourceOwner;
00844         else
00845             CurrentResourceOwner = saveResourceOwner;
00846         PortalContext = savePortalContext;
00847 
00848         PG_RE_THROW();
00849     }
00850     PG_END_TRY();
00851 
00852     if (saveMemoryContext == saveTopTransactionContext)
00853         MemoryContextSwitchTo(TopTransactionContext);
00854     else
00855         MemoryContextSwitchTo(saveMemoryContext);
00856     ActivePortal = saveActivePortal;
00857     if (saveResourceOwner == saveTopTransactionResourceOwner)
00858         CurrentResourceOwner = TopTransactionResourceOwner;
00859     else
00860         CurrentResourceOwner = saveResourceOwner;
00861     PortalContext = savePortalContext;
00862 
00863     if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
00864         ShowUsage("EXECUTOR STATISTICS");
00865 
00866     TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
00867 
00868     return result;
00869 }
00870 
00871 /*
00872  * PortalRunSelect
00873  *      Execute a portal's query in PORTAL_ONE_SELECT mode, and also
00874  *      when fetching from a completed holdStore in PORTAL_ONE_RETURNING,
00875  *      PORTAL_ONE_MOD_WITH, and PORTAL_UTIL_SELECT cases.
00876  *
00877  * This handles simple N-rows-forward-or-backward cases.  For more complex
00878  * nonsequential access to a portal, see PortalRunFetch.
00879  *
00880  * count <= 0 is interpreted as a no-op: the destination gets started up
00881  * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
00882  * interpreted as "all rows".
00883  *
00884  * Caller must already have validated the Portal and done appropriate
00885  * setup (cf. PortalRun).
00886  *
00887  * Returns number of rows processed (suitable for use in result tag)
00888  */
00889 static long
00890 PortalRunSelect(Portal portal,
00891                 bool forward,
00892                 long count,
00893                 DestReceiver *dest)
00894 {
00895     QueryDesc  *queryDesc;
00896     ScanDirection direction;
00897     uint32      nprocessed;
00898 
00899     /*
00900      * NB: queryDesc will be NULL if we are fetching from a held cursor or a
00901      * completed utility query; can't use it in that path.
00902      */
00903     queryDesc = PortalGetQueryDesc(portal);
00904 
00905     /* Caller messed up if we have neither a ready query nor held data. */
00906     Assert(queryDesc || portal->holdStore);
00907 
00908     /*
00909      * Force the queryDesc destination to the right thing.  This supports
00910      * MOVE, for example, which will pass in dest = DestNone.  This is okay to
00911      * change as long as we do it on every fetch.  (The Executor must not
00912      * assume that dest never changes.)
00913      */
00914     if (queryDesc)
00915         queryDesc->dest = dest;
00916 
00917     /*
00918      * Determine which direction to go in, and check to see if we're already
00919      * at the end of the available tuples in that direction.  If so, set the
00920      * direction to NoMovement to avoid trying to fetch any tuples.  (This
00921      * check exists because not all plan node types are robust about being
00922      * called again if they've already returned NULL once.)  Then call the
00923      * executor (we must not skip this, because the destination needs to see a
00924      * setup and shutdown even if no tuples are available).  Finally, update
00925      * the portal position state depending on the number of tuples that were
00926      * retrieved.
00927      */
00928     if (forward)
00929     {
00930         if (portal->atEnd || count <= 0)
00931             direction = NoMovementScanDirection;
00932         else
00933             direction = ForwardScanDirection;
00934 
00935         /* In the executor, zero count processes all rows */
00936         if (count == FETCH_ALL)
00937             count = 0;
00938 
00939         if (portal->holdStore)
00940             nprocessed = RunFromStore(portal, direction, count, dest);
00941         else
00942         {
00943             PushActiveSnapshot(queryDesc->snapshot);
00944             ExecutorRun(queryDesc, direction, count);
00945             nprocessed = queryDesc->estate->es_processed;
00946             PopActiveSnapshot();
00947         }
00948 
00949         if (!ScanDirectionIsNoMovement(direction))
00950         {
00951             long        oldPos;
00952 
00953             if (nprocessed > 0)
00954                 portal->atStart = false;        /* OK to go backward now */
00955             if (count == 0 ||
00956                 (unsigned long) nprocessed < (unsigned long) count)
00957                 portal->atEnd = true;   /* we retrieved 'em all */
00958             oldPos = portal->portalPos;
00959             portal->portalPos += nprocessed;
00960             /* portalPos doesn't advance when we fall off the end */
00961             if (portal->portalPos < oldPos)
00962                 portal->posOverflow = true;
00963         }
00964     }
00965     else
00966     {
00967         if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
00968             ereport(ERROR,
00969                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00970                      errmsg("cursor can only scan forward"),
00971                      errhint("Declare it with SCROLL option to enable backward scan.")));
00972 
00973         if (portal->atStart || count <= 0)
00974             direction = NoMovementScanDirection;
00975         else
00976             direction = BackwardScanDirection;
00977 
00978         /* In the executor, zero count processes all rows */
00979         if (count == FETCH_ALL)
00980             count = 0;
00981 
00982         if (portal->holdStore)
00983             nprocessed = RunFromStore(portal, direction, count, dest);
00984         else
00985         {
00986             PushActiveSnapshot(queryDesc->snapshot);
00987             ExecutorRun(queryDesc, direction, count);
00988             nprocessed = queryDesc->estate->es_processed;
00989             PopActiveSnapshot();
00990         }
00991 
00992         if (!ScanDirectionIsNoMovement(direction))
00993         {
00994             if (nprocessed > 0 && portal->atEnd)
00995             {
00996                 portal->atEnd = false;  /* OK to go forward now */
00997                 portal->portalPos++;    /* adjust for endpoint case */
00998             }
00999             if (count == 0 ||
01000                 (unsigned long) nprocessed < (unsigned long) count)
01001             {
01002                 portal->atStart = true; /* we retrieved 'em all */
01003                 portal->portalPos = 0;
01004                 portal->posOverflow = false;
01005             }
01006             else
01007             {
01008                 long        oldPos;
01009 
01010                 oldPos = portal->portalPos;
01011                 portal->portalPos -= nprocessed;
01012                 if (portal->portalPos > oldPos ||
01013                     portal->portalPos <= 0)
01014                     portal->posOverflow = true;
01015             }
01016         }
01017     }
01018 
01019     return nprocessed;
01020 }
01021 
01022 /*
01023  * FillPortalStore
01024  *      Run the query and load result tuples into the portal's tuple store.
01025  *
01026  * This is used for PORTAL_ONE_RETURNING, PORTAL_ONE_MOD_WITH, and
01027  * PORTAL_UTIL_SELECT cases only.
01028  */
01029 static void
01030 FillPortalStore(Portal portal, bool isTopLevel)
01031 {
01032     DestReceiver *treceiver;
01033     char        completionTag[COMPLETION_TAG_BUFSIZE];
01034 
01035     PortalCreateHoldStore(portal);
01036     treceiver = CreateDestReceiver(DestTuplestore);
01037     SetTuplestoreDestReceiverParams(treceiver,
01038                                     portal->holdStore,
01039                                     portal->holdContext,
01040                                     false);
01041 
01042     completionTag[0] = '\0';
01043 
01044     switch (portal->strategy)
01045     {
01046         case PORTAL_ONE_RETURNING:
01047         case PORTAL_ONE_MOD_WITH:
01048 
01049             /*
01050              * Run the portal to completion just as for the default
01051              * MULTI_QUERY case, but send the primary query's output to the
01052              * tuplestore. Auxiliary query outputs are discarded.
01053              */
01054             PortalRunMulti(portal, isTopLevel,
01055                            treceiver, None_Receiver, completionTag);
01056             break;
01057 
01058         case PORTAL_UTIL_SELECT:
01059             PortalRunUtility(portal, (Node *) linitial(portal->stmts),
01060                              isTopLevel, treceiver, completionTag);
01061             break;
01062 
01063         default:
01064             elog(ERROR, "unsupported portal strategy: %d",
01065                  (int) portal->strategy);
01066             break;
01067     }
01068 
01069     /* Override default completion tag with actual command result */
01070     if (completionTag[0] != '\0')
01071         portal->commandTag = pstrdup(completionTag);
01072 
01073     (*treceiver->rDestroy) (treceiver);
01074 }
01075 
01076 /*
01077  * RunFromStore
01078  *      Fetch tuples from the portal's tuple store.
01079  *
01080  * Calling conventions are similar to ExecutorRun, except that we
01081  * do not depend on having a queryDesc or estate.  Therefore we return the
01082  * number of tuples processed as the result, not in estate->es_processed.
01083  *
01084  * One difference from ExecutorRun is that the destination receiver functions
01085  * are run in the caller's memory context (since we have no estate).  Watch
01086  * out for memory leaks.
01087  */
01088 static uint32
01089 RunFromStore(Portal portal, ScanDirection direction, long count,
01090              DestReceiver *dest)
01091 {
01092     long        current_tuple_count = 0;
01093     TupleTableSlot *slot;
01094 
01095     slot = MakeSingleTupleTableSlot(portal->tupDesc);
01096 
01097     (*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc);
01098 
01099     if (ScanDirectionIsNoMovement(direction))
01100     {
01101         /* do nothing except start/stop the destination */
01102     }
01103     else
01104     {
01105         bool        forward = ScanDirectionIsForward(direction);
01106 
01107         for (;;)
01108         {
01109             MemoryContext oldcontext;
01110             bool        ok;
01111 
01112             oldcontext = MemoryContextSwitchTo(portal->holdContext);
01113 
01114             ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
01115                                          slot);
01116 
01117             MemoryContextSwitchTo(oldcontext);
01118 
01119             if (!ok)
01120                 break;
01121 
01122             (*dest->receiveSlot) (slot, dest);
01123 
01124             ExecClearTuple(slot);
01125 
01126             /*
01127              * check our tuple count.. if we've processed the proper number
01128              * then quit, else loop again and process more tuples. Zero count
01129              * means no limit.
01130              */
01131             current_tuple_count++;
01132             if (count && count == current_tuple_count)
01133                 break;
01134         }
01135     }
01136 
01137     (*dest->rShutdown) (dest);
01138 
01139     ExecDropSingleTupleTableSlot(slot);
01140 
01141     return (uint32) current_tuple_count;
01142 }
01143 
01144 /*
01145  * PortalRunUtility
01146  *      Execute a utility statement inside a portal.
01147  */
01148 static void
01149 PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
01150                  DestReceiver *dest, char *completionTag)
01151 {
01152     bool        active_snapshot_set;
01153 
01154     elog(DEBUG3, "ProcessUtility");
01155 
01156     /*
01157      * Set snapshot if utility stmt needs one.  Most reliable way to do this
01158      * seems to be to enumerate those that do not need one; this is a short
01159      * list.  Transaction control, LOCK, and SET must *not* set a snapshot
01160      * since they need to be executable at the start of a transaction-snapshot
01161      * mode transaction without freezing a snapshot.  By extension we allow
01162      * SHOW not to set a snapshot.  The other stmts listed are just efficiency
01163      * hacks.  Beware of listing anything that can modify the database --- if,
01164      * say, it has to update an index with expressions that invoke
01165      * user-defined functions, then it had better have a snapshot.
01166      */
01167     if (!(IsA(utilityStmt, TransactionStmt) ||
01168           IsA(utilityStmt, LockStmt) ||
01169           IsA(utilityStmt, VariableSetStmt) ||
01170           IsA(utilityStmt, VariableShowStmt) ||
01171           IsA(utilityStmt, ConstraintsSetStmt) ||
01172     /* efficiency hacks from here down */
01173           IsA(utilityStmt, FetchStmt) ||
01174           IsA(utilityStmt, ListenStmt) ||
01175           IsA(utilityStmt, NotifyStmt) ||
01176           IsA(utilityStmt, UnlistenStmt) ||
01177           IsA(utilityStmt, CheckPointStmt)))
01178     {
01179         PushActiveSnapshot(GetTransactionSnapshot());
01180         active_snapshot_set = true;
01181     }
01182     else
01183         active_snapshot_set = false;
01184 
01185     ProcessUtility(utilityStmt,
01186                    portal->sourceText,
01187                    isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
01188                    portal->portalParams,
01189                    dest,
01190                    completionTag);
01191 
01192     /* Some utility statements may change context on us */
01193     MemoryContextSwitchTo(PortalGetHeapMemory(portal));
01194 
01195     /*
01196      * Some utility commands may pop the ActiveSnapshot stack from under us,
01197      * so we only pop the stack if we actually see a snapshot set.  Note that
01198      * the set of utility commands that do this must be the same set
01199      * disallowed to run inside a transaction; otherwise, we could be popping
01200      * a snapshot that belongs to some other operation.
01201      */
01202     if (active_snapshot_set && ActiveSnapshotSet())
01203         PopActiveSnapshot();
01204 }
01205 
01206 /*
01207  * PortalRunMulti
01208  *      Execute a portal's queries in the general case (multi queries
01209  *      or non-SELECT-like queries)
01210  */
01211 static void
01212 PortalRunMulti(Portal portal, bool isTopLevel,
01213                DestReceiver *dest, DestReceiver *altdest,
01214                char *completionTag)
01215 {
01216     bool        active_snapshot_set = false;
01217     ListCell   *stmtlist_item;
01218 
01219     /*
01220      * If the destination is DestRemoteExecute, change to DestNone.  The
01221      * reason is that the client won't be expecting any tuples, and indeed has
01222      * no way to know what they are, since there is no provision for Describe
01223      * to send a RowDescription message when this portal execution strategy is
01224      * in effect.  This presently will only affect SELECT commands added to
01225      * non-SELECT queries by rewrite rules: such commands will be executed,
01226      * but the results will be discarded unless you use "simple Query"
01227      * protocol.
01228      */
01229     if (dest->mydest == DestRemoteExecute)
01230         dest = None_Receiver;
01231     if (altdest->mydest == DestRemoteExecute)
01232         altdest = None_Receiver;
01233 
01234     /*
01235      * Loop to handle the individual queries generated from a single parsetree
01236      * by analysis and rewrite.
01237      */
01238     foreach(stmtlist_item, portal->stmts)
01239     {
01240         Node       *stmt = (Node *) lfirst(stmtlist_item);
01241 
01242         /*
01243          * If we got a cancel signal in prior command, quit
01244          */
01245         CHECK_FOR_INTERRUPTS();
01246 
01247         if (IsA(stmt, PlannedStmt) &&
01248             ((PlannedStmt *) stmt)->utilityStmt == NULL)
01249         {
01250             /*
01251              * process a plannable query.
01252              */
01253             PlannedStmt *pstmt = (PlannedStmt *) stmt;
01254 
01255             TRACE_POSTGRESQL_QUERY_EXECUTE_START();
01256 
01257             if (log_executor_stats)
01258                 ResetUsage();
01259 
01260             /*
01261              * Must always have a snapshot for plannable queries.  First time
01262              * through, take a new snapshot; for subsequent queries in the
01263              * same portal, just update the snapshot's copy of the command
01264              * counter.
01265              */
01266             if (!active_snapshot_set)
01267             {
01268                 PushActiveSnapshot(GetTransactionSnapshot());
01269                 active_snapshot_set = true;
01270             }
01271             else
01272                 UpdateActiveSnapshotCommandId();
01273 
01274             if (pstmt->canSetTag)
01275             {
01276                 /* statement can set tag string */
01277                 ProcessQuery(pstmt,
01278                              portal->sourceText,
01279                              portal->portalParams,
01280                              dest, completionTag);
01281             }
01282             else
01283             {
01284                 /* stmt added by rewrite cannot set tag */
01285                 ProcessQuery(pstmt,
01286                              portal->sourceText,
01287                              portal->portalParams,
01288                              altdest, NULL);
01289             }
01290 
01291             if (log_executor_stats)
01292                 ShowUsage("EXECUTOR STATISTICS");
01293 
01294             TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
01295         }
01296         else
01297         {
01298             /*
01299              * process utility functions (create, destroy, etc..)
01300              *
01301              * These are assumed canSetTag if they're the only stmt in the
01302              * portal.
01303              *
01304              * We must not set a snapshot here for utility commands (if one is
01305              * needed, PortalRunUtility will do it).  If a utility command is
01306              * alone in a portal then everything's fine.  The only case where
01307              * a utility command can be part of a longer list is that rules
01308              * are allowed to include NotifyStmt.  NotifyStmt doesn't care
01309              * whether it has a snapshot or not, so we just leave the current
01310              * snapshot alone if we have one.
01311              */
01312             if (list_length(portal->stmts) == 1)
01313             {
01314                 Assert(!active_snapshot_set);
01315                 /* statement can set tag string */
01316                 PortalRunUtility(portal, stmt, isTopLevel,
01317                                  dest, completionTag);
01318             }
01319             else
01320             {
01321                 Assert(IsA(stmt, NotifyStmt));
01322                 /* stmt added by rewrite cannot set tag */
01323                 PortalRunUtility(portal, stmt, isTopLevel,
01324                                  altdest, NULL);
01325             }
01326         }
01327 
01328         /*
01329          * Increment command counter between queries, but not after the last
01330          * one.
01331          */
01332         if (lnext(stmtlist_item) != NULL)
01333             CommandCounterIncrement();
01334 
01335         /*
01336          * Clear subsidiary contexts to recover temporary memory.
01337          */
01338         Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext);
01339 
01340         MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
01341     }
01342 
01343     /* Pop the snapshot if we pushed one. */
01344     if (active_snapshot_set)
01345         PopActiveSnapshot();
01346 
01347     /*
01348      * If a command completion tag was supplied, use it.  Otherwise use the
01349      * portal's commandTag as the default completion tag.
01350      *
01351      * Exception: Clients expect INSERT/UPDATE/DELETE tags to have counts, so
01352      * fake them with zeros.  This can happen with DO INSTEAD rules if there
01353      * is no replacement query of the same type as the original.  We print "0
01354      * 0" here because technically there is no query of the matching tag type,
01355      * and printing a non-zero count for a different query type seems wrong,
01356      * e.g.  an INSERT that does an UPDATE instead should not print "0 1" if
01357      * one row was updated.  See QueryRewrite(), step 3, for details.
01358      */
01359     if (completionTag && completionTag[0] == '\0')
01360     {
01361         if (portal->commandTag)
01362             strcpy(completionTag, portal->commandTag);
01363         if (strcmp(completionTag, "SELECT") == 0)
01364             sprintf(completionTag, "SELECT 0 0");
01365         else if (strcmp(completionTag, "INSERT") == 0)
01366             strcpy(completionTag, "INSERT 0 0");
01367         else if (strcmp(completionTag, "UPDATE") == 0)
01368             strcpy(completionTag, "UPDATE 0");
01369         else if (strcmp(completionTag, "DELETE") == 0)
01370             strcpy(completionTag, "DELETE 0");
01371     }
01372 }
01373 
01374 /*
01375  * PortalRunFetch
01376  *      Variant form of PortalRun that supports SQL FETCH directions.
01377  *
01378  * Note: we presently assume that no callers of this want isTopLevel = true.
01379  *
01380  * Returns number of rows processed (suitable for use in result tag)
01381  */
01382 long
01383 PortalRunFetch(Portal portal,
01384                FetchDirection fdirection,
01385                long count,
01386                DestReceiver *dest)
01387 {
01388     long        result;
01389     Portal      saveActivePortal;
01390     ResourceOwner saveResourceOwner;
01391     MemoryContext savePortalContext;
01392     MemoryContext oldContext;
01393 
01394     AssertArg(PortalIsValid(portal));
01395 
01396     /*
01397      * Check for improper portal use, and mark portal active.
01398      */
01399     if (portal->status != PORTAL_READY)
01400         ereport(ERROR,
01401                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
01402                  errmsg("portal \"%s\" cannot be run", portal->name)));
01403     portal->status = PORTAL_ACTIVE;
01404 
01405     /*
01406      * Set up global portal context pointers.
01407      */
01408     saveActivePortal = ActivePortal;
01409     saveResourceOwner = CurrentResourceOwner;
01410     savePortalContext = PortalContext;
01411     PG_TRY();
01412     {
01413         ActivePortal = portal;
01414         CurrentResourceOwner = portal->resowner;
01415         PortalContext = PortalGetHeapMemory(portal);
01416 
01417         oldContext = MemoryContextSwitchTo(PortalContext);
01418 
01419         switch (portal->strategy)
01420         {
01421             case PORTAL_ONE_SELECT:
01422                 result = DoPortalRunFetch(portal, fdirection, count, dest);
01423                 break;
01424 
01425             case PORTAL_ONE_RETURNING:
01426             case PORTAL_ONE_MOD_WITH:
01427             case PORTAL_UTIL_SELECT:
01428 
01429                 /*
01430                  * If we have not yet run the command, do so, storing its
01431                  * results in the portal's tuplestore.
01432                  */
01433                 if (!portal->holdStore)
01434                     FillPortalStore(portal, false /* isTopLevel */ );
01435 
01436                 /*
01437                  * Now fetch desired portion of results.
01438                  */
01439                 result = DoPortalRunFetch(portal, fdirection, count, dest);
01440                 break;
01441 
01442             default:
01443                 elog(ERROR, "unsupported portal strategy");
01444                 result = 0;     /* keep compiler quiet */
01445                 break;
01446         }
01447     }
01448     PG_CATCH();
01449     {
01450         /* Uncaught error while executing portal: mark it dead */
01451         MarkPortalFailed(portal);
01452 
01453         /* Restore global vars and propagate error */
01454         ActivePortal = saveActivePortal;
01455         CurrentResourceOwner = saveResourceOwner;
01456         PortalContext = savePortalContext;
01457 
01458         PG_RE_THROW();
01459     }
01460     PG_END_TRY();
01461 
01462     MemoryContextSwitchTo(oldContext);
01463 
01464     /* Mark portal not active */
01465     portal->status = PORTAL_READY;
01466 
01467     ActivePortal = saveActivePortal;
01468     CurrentResourceOwner = saveResourceOwner;
01469     PortalContext = savePortalContext;
01470 
01471     return result;
01472 }
01473 
01474 /*
01475  * DoPortalRunFetch
01476  *      Guts of PortalRunFetch --- the portal context is already set up
01477  *
01478  * Returns number of rows processed (suitable for use in result tag)
01479  */
01480 static long
01481 DoPortalRunFetch(Portal portal,
01482                  FetchDirection fdirection,
01483                  long count,
01484                  DestReceiver *dest)
01485 {
01486     bool        forward;
01487 
01488     Assert(portal->strategy == PORTAL_ONE_SELECT ||
01489            portal->strategy == PORTAL_ONE_RETURNING ||
01490            portal->strategy == PORTAL_ONE_MOD_WITH ||
01491            portal->strategy == PORTAL_UTIL_SELECT);
01492 
01493     switch (fdirection)
01494     {
01495         case FETCH_FORWARD:
01496             if (count < 0)
01497             {
01498                 fdirection = FETCH_BACKWARD;
01499                 count = -count;
01500             }
01501             /* fall out of switch to share code with FETCH_BACKWARD */
01502             break;
01503         case FETCH_BACKWARD:
01504             if (count < 0)
01505             {
01506                 fdirection = FETCH_FORWARD;
01507                 count = -count;
01508             }
01509             /* fall out of switch to share code with FETCH_FORWARD */
01510             break;
01511         case FETCH_ABSOLUTE:
01512             if (count > 0)
01513             {
01514                 /*
01515                  * Definition: Rewind to start, advance count-1 rows, return
01516                  * next row (if any).  In practice, if the goal is less than
01517                  * halfway back to the start, it's better to scan from where
01518                  * we are.  In any case, we arrange to fetch the target row
01519                  * going forwards.
01520                  */
01521                 if (portal->posOverflow || portal->portalPos == LONG_MAX ||
01522                     count - 1 <= portal->portalPos / 2)
01523                 {
01524                     DoPortalRewind(portal);
01525                     if (count > 1)
01526                         PortalRunSelect(portal, true, count - 1,
01527                                         None_Receiver);
01528                 }
01529                 else
01530                 {
01531                     long        pos = portal->portalPos;
01532 
01533                     if (portal->atEnd)
01534                         pos++;  /* need one extra fetch if off end */
01535                     if (count <= pos)
01536                         PortalRunSelect(portal, false, pos - count + 1,
01537                                         None_Receiver);
01538                     else if (count > pos + 1)
01539                         PortalRunSelect(portal, true, count - pos - 1,
01540                                         None_Receiver);
01541                 }
01542                 return PortalRunSelect(portal, true, 1L, dest);
01543             }
01544             else if (count < 0)
01545             {
01546                 /*
01547                  * Definition: Advance to end, back up abs(count)-1 rows,
01548                  * return prior row (if any).  We could optimize this if we
01549                  * knew in advance where the end was, but typically we won't.
01550                  * (Is it worth considering case where count > half of size of
01551                  * query?  We could rewind once we know the size ...)
01552                  */
01553                 PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
01554                 if (count < -1)
01555                     PortalRunSelect(portal, false, -count - 1, None_Receiver);
01556                 return PortalRunSelect(portal, false, 1L, dest);
01557             }
01558             else
01559             {
01560                 /* count == 0 */
01561                 /* Rewind to start, return zero rows */
01562                 DoPortalRewind(portal);
01563                 return PortalRunSelect(portal, true, 0L, dest);
01564             }
01565             break;
01566         case FETCH_RELATIVE:
01567             if (count > 0)
01568             {
01569                 /*
01570                  * Definition: advance count-1 rows, return next row (if any).
01571                  */
01572                 if (count > 1)
01573                     PortalRunSelect(portal, true, count - 1, None_Receiver);
01574                 return PortalRunSelect(portal, true, 1L, dest);
01575             }
01576             else if (count < 0)
01577             {
01578                 /*
01579                  * Definition: back up abs(count)-1 rows, return prior row (if
01580                  * any).
01581                  */
01582                 if (count < -1)
01583                     PortalRunSelect(portal, false, -count - 1, None_Receiver);
01584                 return PortalRunSelect(portal, false, 1L, dest);
01585             }
01586             else
01587             {
01588                 /* count == 0 */
01589                 /* Same as FETCH FORWARD 0, so fall out of switch */
01590                 fdirection = FETCH_FORWARD;
01591             }
01592             break;
01593         default:
01594             elog(ERROR, "bogus direction");
01595             break;
01596     }
01597 
01598     /*
01599      * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
01600      * >= 0.
01601      */
01602     forward = (fdirection == FETCH_FORWARD);
01603 
01604     /*
01605      * Zero count means to re-fetch the current row, if any (per SQL)
01606      */
01607     if (count == 0)
01608     {
01609         bool        on_row;
01610 
01611         /* Are we sitting on a row? */
01612         on_row = (!portal->atStart && !portal->atEnd);
01613 
01614         if (dest->mydest == DestNone)
01615         {
01616             /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
01617             return on_row ? 1L : 0L;
01618         }
01619         else
01620         {
01621             /*
01622              * If we are sitting on a row, back up one so we can re-fetch it.
01623              * If we are not sitting on a row, we still have to start up and
01624              * shut down the executor so that the destination is initialized
01625              * and shut down correctly; so keep going.  To PortalRunSelect,
01626              * count == 0 means we will retrieve no row.
01627              */
01628             if (on_row)
01629             {
01630                 PortalRunSelect(portal, false, 1L, None_Receiver);
01631                 /* Set up to fetch one row forward */
01632                 count = 1;
01633                 forward = true;
01634             }
01635         }
01636     }
01637 
01638     /*
01639      * Optimize MOVE BACKWARD ALL into a Rewind.
01640      */
01641     if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
01642     {
01643         long        result = portal->portalPos;
01644 
01645         if (result > 0 && !portal->atEnd)
01646             result--;
01647         DoPortalRewind(portal);
01648         /* result is bogus if pos had overflowed, but it's best we can do */
01649         return result;
01650     }
01651 
01652     return PortalRunSelect(portal, forward, count, dest);
01653 }
01654 
01655 /*
01656  * DoPortalRewind - rewind a Portal to starting point
01657  */
01658 static void
01659 DoPortalRewind(Portal portal)
01660 {
01661     if (portal->holdStore)
01662     {
01663         MemoryContext oldcontext;
01664 
01665         oldcontext = MemoryContextSwitchTo(portal->holdContext);
01666         tuplestore_rescan(portal->holdStore);
01667         MemoryContextSwitchTo(oldcontext);
01668     }
01669     if (PortalGetQueryDesc(portal))
01670         ExecutorRewind(PortalGetQueryDesc(portal));
01671 
01672     portal->atStart = true;
01673     portal->atEnd = false;
01674     portal->portalPos = 0;
01675     portal->posOverflow = false;
01676 }