00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "postgres.h"
00017
00018 #include "access/xact.h"
00019 #include "commands/prepare.h"
00020 #include "executor/tstoreReceiver.h"
00021 #include "miscadmin.h"
00022 #include "pg_trace.h"
00023 #include "tcop/pquery.h"
00024 #include "tcop/utility.h"
00025 #include "utils/memutils.h"
00026 #include "utils/snapmgr.h"
00027
00028
00029
00030
00031
00032
00033 Portal ActivePortal = NULL;
00034
00035
00036 static void ProcessQuery(PlannedStmt *plan,
00037 const char *sourceText,
00038 ParamListInfo params,
00039 DestReceiver *dest,
00040 char *completionTag);
00041 static void FillPortalStore(Portal portal, bool isTopLevel);
00042 static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
00043 DestReceiver *dest);
00044 static long PortalRunSelect(Portal portal, bool forward, long count,
00045 DestReceiver *dest);
00046 static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
00047 DestReceiver *dest, char *completionTag);
00048 static void PortalRunMulti(Portal portal, bool isTopLevel,
00049 DestReceiver *dest, DestReceiver *altdest,
00050 char *completionTag);
00051 static long DoPortalRunFetch(Portal portal,
00052 FetchDirection fdirection,
00053 long count,
00054 DestReceiver *dest);
00055 static void DoPortalRewind(Portal portal);
00056
00057
00058
00059
00060
00061 QueryDesc *
00062 CreateQueryDesc(PlannedStmt *plannedstmt,
00063 const char *sourceText,
00064 Snapshot snapshot,
00065 Snapshot crosscheck_snapshot,
00066 DestReceiver *dest,
00067 ParamListInfo params,
00068 int instrument_options)
00069 {
00070 QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
00071
00072 qd->operation = plannedstmt->commandType;
00073 qd->plannedstmt = plannedstmt;
00074 qd->utilitystmt = plannedstmt->utilityStmt;
00075 qd->sourceText = sourceText;
00076 qd->snapshot = RegisterSnapshot(snapshot);
00077
00078 qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
00079 qd->dest = dest;
00080 qd->params = params;
00081 qd->instrument_options = instrument_options;
00082
00083
00084
00085 qd->tupDesc = NULL;
00086 qd->estate = NULL;
00087 qd->planstate = NULL;
00088 qd->totaltime = NULL;
00089
00090 return qd;
00091 }
00092
00093
00094
00095
00096 QueryDesc *
00097 CreateUtilityQueryDesc(Node *utilitystmt,
00098 const char *sourceText,
00099 Snapshot snapshot,
00100 DestReceiver *dest,
00101 ParamListInfo params)
00102 {
00103 QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
00104
00105 qd->operation = CMD_UTILITY;
00106 qd->plannedstmt = NULL;
00107 qd->utilitystmt = utilitystmt;
00108 qd->sourceText = sourceText;
00109 qd->snapshot = RegisterSnapshot(snapshot);
00110 qd->crosscheck_snapshot = InvalidSnapshot;
00111 qd->dest = dest;
00112 qd->params = params;
00113 qd->instrument_options = false;
00114
00115
00116 qd->tupDesc = NULL;
00117 qd->estate = NULL;
00118 qd->planstate = NULL;
00119 qd->totaltime = NULL;
00120
00121 return qd;
00122 }
00123
00124
00125
00126
00127 void
00128 FreeQueryDesc(QueryDesc *qdesc)
00129 {
00130
00131 Assert(qdesc->estate == NULL);
00132
00133
00134 UnregisterSnapshot(qdesc->snapshot);
00135 UnregisterSnapshot(qdesc->crosscheck_snapshot);
00136
00137
00138 pfree(qdesc);
00139 }
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159 static void
00160 ProcessQuery(PlannedStmt *plan,
00161 const char *sourceText,
00162 ParamListInfo params,
00163 DestReceiver *dest,
00164 char *completionTag)
00165 {
00166 QueryDesc *queryDesc;
00167
00168 elog(DEBUG3, "ProcessQuery");
00169
00170
00171
00172
00173 queryDesc = CreateQueryDesc(plan, sourceText,
00174 GetActiveSnapshot(), InvalidSnapshot,
00175 dest, params, 0);
00176
00177
00178
00179
00180 ExecutorStart(queryDesc, 0);
00181
00182
00183
00184
00185 ExecutorRun(queryDesc, ForwardScanDirection, 0L);
00186
00187
00188
00189
00190 if (completionTag)
00191 {
00192 Oid lastOid;
00193
00194 switch (queryDesc->operation)
00195 {
00196 case CMD_SELECT:
00197 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00198 "SELECT %u", queryDesc->estate->es_processed);
00199 break;
00200 case CMD_INSERT:
00201 if (queryDesc->estate->es_processed == 1)
00202 lastOid = queryDesc->estate->es_lastoid;
00203 else
00204 lastOid = InvalidOid;
00205 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00206 "INSERT %u %u", lastOid, queryDesc->estate->es_processed);
00207 break;
00208 case CMD_UPDATE:
00209 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00210 "UPDATE %u", queryDesc->estate->es_processed);
00211 break;
00212 case CMD_DELETE:
00213 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00214 "DELETE %u", queryDesc->estate->es_processed);
00215 break;
00216 default:
00217 strcpy(completionTag, "???");
00218 break;
00219 }
00220 }
00221
00222
00223
00224
00225 ExecutorFinish(queryDesc);
00226 ExecutorEnd(queryDesc);
00227
00228 FreeQueryDesc(queryDesc);
00229 }
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240 PortalStrategy
00241 ChoosePortalStrategy(List *stmts)
00242 {
00243 int nSetTag;
00244 ListCell *lc;
00245
00246
00247
00248
00249
00250
00251
00252 if (list_length(stmts) == 1)
00253 {
00254 Node *stmt = (Node *) linitial(stmts);
00255
00256 if (IsA(stmt, Query))
00257 {
00258 Query *query = (Query *) stmt;
00259
00260 if (query->canSetTag)
00261 {
00262 if (query->commandType == CMD_SELECT &&
00263 query->utilityStmt == NULL)
00264 {
00265 if (query->hasModifyingCTE)
00266 return PORTAL_ONE_MOD_WITH;
00267 else
00268 return PORTAL_ONE_SELECT;
00269 }
00270 if (query->commandType == CMD_UTILITY &&
00271 query->utilityStmt != NULL)
00272 {
00273 if (UtilityReturnsTuples(query->utilityStmt))
00274 return PORTAL_UTIL_SELECT;
00275
00276 return PORTAL_MULTI_QUERY;
00277 }
00278 }
00279 }
00280 else if (IsA(stmt, PlannedStmt))
00281 {
00282 PlannedStmt *pstmt = (PlannedStmt *) stmt;
00283
00284 if (pstmt->canSetTag)
00285 {
00286 if (pstmt->commandType == CMD_SELECT &&
00287 pstmt->utilityStmt == NULL)
00288 {
00289 if (pstmt->hasModifyingCTE)
00290 return PORTAL_ONE_MOD_WITH;
00291 else
00292 return PORTAL_ONE_SELECT;
00293 }
00294 }
00295 }
00296 else
00297 {
00298
00299 if (UtilityReturnsTuples(stmt))
00300 return PORTAL_UTIL_SELECT;
00301
00302 return PORTAL_MULTI_QUERY;
00303 }
00304 }
00305
00306
00307
00308
00309
00310
00311 nSetTag = 0;
00312 foreach(lc, stmts)
00313 {
00314 Node *stmt = (Node *) lfirst(lc);
00315
00316 if (IsA(stmt, Query))
00317 {
00318 Query *query = (Query *) stmt;
00319
00320 if (query->canSetTag)
00321 {
00322 if (++nSetTag > 1)
00323 return PORTAL_MULTI_QUERY;
00324 if (query->returningList == NIL)
00325 return PORTAL_MULTI_QUERY;
00326 }
00327 }
00328 else if (IsA(stmt, PlannedStmt))
00329 {
00330 PlannedStmt *pstmt = (PlannedStmt *) stmt;
00331
00332 if (pstmt->canSetTag)
00333 {
00334 if (++nSetTag > 1)
00335 return PORTAL_MULTI_QUERY;
00336 if (!pstmt->hasReturning)
00337 return PORTAL_MULTI_QUERY;
00338 }
00339 }
00340
00341 }
00342 if (nSetTag == 1)
00343 return PORTAL_ONE_RETURNING;
00344
00345
00346 return PORTAL_MULTI_QUERY;
00347 }
00348
00349
00350
00351
00352
00353
00354
00355
00356 List *
00357 FetchPortalTargetList(Portal portal)
00358 {
00359
00360 if (portal->strategy == PORTAL_MULTI_QUERY)
00361 return NIL;
00362
00363 return FetchStatementTargetList(PortalGetPrimaryStmt(portal));
00364 }
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378 List *
00379 FetchStatementTargetList(Node *stmt)
00380 {
00381 if (stmt == NULL)
00382 return NIL;
00383 if (IsA(stmt, Query))
00384 {
00385 Query *query = (Query *) stmt;
00386
00387 if (query->commandType == CMD_UTILITY &&
00388 query->utilityStmt != NULL)
00389 {
00390
00391 stmt = query->utilityStmt;
00392 }
00393 else
00394 {
00395 if (query->commandType == CMD_SELECT &&
00396 query->utilityStmt == NULL)
00397 return query->targetList;
00398 if (query->returningList)
00399 return query->returningList;
00400 return NIL;
00401 }
00402 }
00403 if (IsA(stmt, PlannedStmt))
00404 {
00405 PlannedStmt *pstmt = (PlannedStmt *) stmt;
00406
00407 if (pstmt->commandType == CMD_SELECT &&
00408 pstmt->utilityStmt == NULL)
00409 return pstmt->planTree->targetlist;
00410 if (pstmt->hasReturning)
00411 return pstmt->planTree->targetlist;
00412 return NIL;
00413 }
00414 if (IsA(stmt, FetchStmt))
00415 {
00416 FetchStmt *fstmt = (FetchStmt *) stmt;
00417 Portal subportal;
00418
00419 Assert(!fstmt->ismove);
00420 subportal = GetPortalByName(fstmt->portalname);
00421 Assert(PortalIsValid(subportal));
00422 return FetchPortalTargetList(subportal);
00423 }
00424 if (IsA(stmt, ExecuteStmt))
00425 {
00426 ExecuteStmt *estmt = (ExecuteStmt *) stmt;
00427 PreparedStatement *entry;
00428
00429 entry = FetchPreparedStatement(estmt->name, true);
00430 return FetchPreparedStatementTargetList(entry);
00431 }
00432 return NIL;
00433 }
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458 void
00459 PortalStart(Portal portal, ParamListInfo params,
00460 int eflags, Snapshot snapshot)
00461 {
00462 Portal saveActivePortal;
00463 ResourceOwner saveResourceOwner;
00464 MemoryContext savePortalContext;
00465 MemoryContext oldContext;
00466 QueryDesc *queryDesc;
00467 int myeflags;
00468
00469 AssertArg(PortalIsValid(portal));
00470 AssertState(portal->status == PORTAL_DEFINED);
00471
00472
00473
00474
00475 saveActivePortal = ActivePortal;
00476 saveResourceOwner = CurrentResourceOwner;
00477 savePortalContext = PortalContext;
00478 PG_TRY();
00479 {
00480 ActivePortal = portal;
00481 CurrentResourceOwner = portal->resowner;
00482 PortalContext = PortalGetHeapMemory(portal);
00483
00484 oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
00485
00486
00487 portal->portalParams = params;
00488
00489
00490
00491
00492 portal->strategy = ChoosePortalStrategy(portal->stmts);
00493
00494
00495
00496
00497 switch (portal->strategy)
00498 {
00499 case PORTAL_ONE_SELECT:
00500
00501
00502 if (snapshot)
00503 PushActiveSnapshot(snapshot);
00504 else
00505 PushActiveSnapshot(GetTransactionSnapshot());
00506
00507
00508
00509
00510
00511 queryDesc = CreateQueryDesc((PlannedStmt *) linitial(portal->stmts),
00512 portal->sourceText,
00513 GetActiveSnapshot(),
00514 InvalidSnapshot,
00515 None_Receiver,
00516 params,
00517 0);
00518
00519
00520
00521
00522
00523
00524 if (portal->cursorOptions & CURSOR_OPT_SCROLL)
00525 myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
00526 else
00527 myeflags = eflags;
00528
00529
00530
00531
00532 ExecutorStart(queryDesc, myeflags);
00533
00534
00535
00536
00537 portal->queryDesc = queryDesc;
00538
00539
00540
00541
00542 portal->tupDesc = queryDesc->tupDesc;
00543
00544
00545
00546
00547 portal->atStart = true;
00548 portal->atEnd = false;
00549 portal->portalPos = 0;
00550 portal->posOverflow = false;
00551
00552 PopActiveSnapshot();
00553 break;
00554
00555 case PORTAL_ONE_RETURNING:
00556 case PORTAL_ONE_MOD_WITH:
00557
00558
00559
00560
00561
00562 {
00563 PlannedStmt *pstmt;
00564
00565 pstmt = (PlannedStmt *) PortalGetPrimaryStmt(portal);
00566 Assert(IsA(pstmt, PlannedStmt));
00567 portal->tupDesc =
00568 ExecCleanTypeFromTL(pstmt->planTree->targetlist,
00569 false);
00570 }
00571
00572
00573
00574
00575 portal->atStart = true;
00576 portal->atEnd = false;
00577 portal->portalPos = 0;
00578 portal->posOverflow = false;
00579 break;
00580
00581 case PORTAL_UTIL_SELECT:
00582
00583
00584
00585
00586
00587 {
00588 Node *ustmt = PortalGetPrimaryStmt(portal);
00589
00590 Assert(!IsA(ustmt, PlannedStmt));
00591 portal->tupDesc = UtilityTupleDescriptor(ustmt);
00592 }
00593
00594
00595
00596
00597 portal->atStart = true;
00598 portal->atEnd = false;
00599 portal->portalPos = 0;
00600 portal->posOverflow = false;
00601 break;
00602
00603 case PORTAL_MULTI_QUERY:
00604
00605 portal->tupDesc = NULL;
00606 break;
00607 }
00608 }
00609 PG_CATCH();
00610 {
00611
00612 MarkPortalFailed(portal);
00613
00614
00615 ActivePortal = saveActivePortal;
00616 CurrentResourceOwner = saveResourceOwner;
00617 PortalContext = savePortalContext;
00618
00619 PG_RE_THROW();
00620 }
00621 PG_END_TRY();
00622
00623 MemoryContextSwitchTo(oldContext);
00624
00625 ActivePortal = saveActivePortal;
00626 CurrentResourceOwner = saveResourceOwner;
00627 PortalContext = savePortalContext;
00628
00629 portal->status = PORTAL_READY;
00630 }
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642 void
00643 PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
00644 {
00645 int natts;
00646 int i;
00647
00648
00649 if (portal->tupDesc == NULL)
00650 return;
00651 natts = portal->tupDesc->natts;
00652 portal->formats = (int16 *)
00653 MemoryContextAlloc(PortalGetHeapMemory(portal),
00654 natts * sizeof(int16));
00655 if (nFormats > 1)
00656 {
00657
00658 if (nFormats != natts)
00659 ereport(ERROR,
00660 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00661 errmsg("bind message has %d result formats but query has %d columns",
00662 nFormats, natts)));
00663 memcpy(portal->formats, formats, natts * sizeof(int16));
00664 }
00665 else if (nFormats > 0)
00666 {
00667
00668 int16 format1 = formats[0];
00669
00670 for (i = 0; i < natts; i++)
00671 portal->formats[i] = format1;
00672 }
00673 else
00674 {
00675
00676 for (i = 0; i < natts; i++)
00677 portal->formats[i] = 0;
00678 }
00679 }
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698
00699
00700
00701
00702
00703
00704 bool
00705 PortalRun(Portal portal, long count, bool isTopLevel,
00706 DestReceiver *dest, DestReceiver *altdest,
00707 char *completionTag)
00708 {
00709 bool result;
00710 uint32 nprocessed;
00711 ResourceOwner saveTopTransactionResourceOwner;
00712 MemoryContext saveTopTransactionContext;
00713 Portal saveActivePortal;
00714 ResourceOwner saveResourceOwner;
00715 MemoryContext savePortalContext;
00716 MemoryContext saveMemoryContext;
00717
00718 AssertArg(PortalIsValid(portal));
00719
00720 TRACE_POSTGRESQL_QUERY_EXECUTE_START();
00721
00722
00723 if (completionTag)
00724 completionTag[0] = '\0';
00725
00726 if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
00727 {
00728 elog(DEBUG3, "PortalRun");
00729
00730 ResetUsage();
00731 }
00732
00733
00734
00735
00736 if (portal->status != PORTAL_READY)
00737 ereport(ERROR,
00738 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00739 errmsg("portal \"%s\" cannot be run", portal->name)));
00740 portal->status = PORTAL_ACTIVE;
00741
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756 saveTopTransactionResourceOwner = TopTransactionResourceOwner;
00757 saveTopTransactionContext = TopTransactionContext;
00758 saveActivePortal = ActivePortal;
00759 saveResourceOwner = CurrentResourceOwner;
00760 savePortalContext = PortalContext;
00761 saveMemoryContext = CurrentMemoryContext;
00762 PG_TRY();
00763 {
00764 ActivePortal = portal;
00765 CurrentResourceOwner = portal->resowner;
00766 PortalContext = PortalGetHeapMemory(portal);
00767
00768 MemoryContextSwitchTo(PortalContext);
00769
00770 switch (portal->strategy)
00771 {
00772 case PORTAL_ONE_SELECT:
00773 case PORTAL_ONE_RETURNING:
00774 case PORTAL_ONE_MOD_WITH:
00775 case PORTAL_UTIL_SELECT:
00776
00777
00778
00779
00780
00781
00782 if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
00783 FillPortalStore(portal, isTopLevel);
00784
00785
00786
00787
00788 nprocessed = PortalRunSelect(portal, true, count, dest);
00789
00790
00791
00792
00793
00794
00795 if (completionTag && portal->commandTag)
00796 {
00797 if (strcmp(portal->commandTag, "SELECT") == 0)
00798 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
00799 "SELECT %u", nprocessed);
00800 else
00801 strcpy(completionTag, portal->commandTag);
00802 }
00803
00804
00805 portal->status = PORTAL_READY;
00806
00807
00808
00809
00810 result = portal->atEnd;
00811 break;
00812
00813 case PORTAL_MULTI_QUERY:
00814 PortalRunMulti(portal, isTopLevel,
00815 dest, altdest, completionTag);
00816
00817
00818 MarkPortalDone(portal);
00819
00820
00821 result = true;
00822 break;
00823
00824 default:
00825 elog(ERROR, "unrecognized portal strategy: %d",
00826 (int) portal->strategy);
00827 result = false;
00828 break;
00829 }
00830 }
00831 PG_CATCH();
00832 {
00833
00834 MarkPortalFailed(portal);
00835
00836
00837 if (saveMemoryContext == saveTopTransactionContext)
00838 MemoryContextSwitchTo(TopTransactionContext);
00839 else
00840 MemoryContextSwitchTo(saveMemoryContext);
00841 ActivePortal = saveActivePortal;
00842 if (saveResourceOwner == saveTopTransactionResourceOwner)
00843 CurrentResourceOwner = TopTransactionResourceOwner;
00844 else
00845 CurrentResourceOwner = saveResourceOwner;
00846 PortalContext = savePortalContext;
00847
00848 PG_RE_THROW();
00849 }
00850 PG_END_TRY();
00851
00852 if (saveMemoryContext == saveTopTransactionContext)
00853 MemoryContextSwitchTo(TopTransactionContext);
00854 else
00855 MemoryContextSwitchTo(saveMemoryContext);
00856 ActivePortal = saveActivePortal;
00857 if (saveResourceOwner == saveTopTransactionResourceOwner)
00858 CurrentResourceOwner = TopTransactionResourceOwner;
00859 else
00860 CurrentResourceOwner = saveResourceOwner;
00861 PortalContext = savePortalContext;
00862
00863 if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
00864 ShowUsage("EXECUTOR STATISTICS");
00865
00866 TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
00867
00868 return result;
00869 }
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889 static long
00890 PortalRunSelect(Portal portal,
00891 bool forward,
00892 long count,
00893 DestReceiver *dest)
00894 {
00895 QueryDesc *queryDesc;
00896 ScanDirection direction;
00897 uint32 nprocessed;
00898
00899
00900
00901
00902
00903 queryDesc = PortalGetQueryDesc(portal);
00904
00905
00906 Assert(queryDesc || portal->holdStore);
00907
00908
00909
00910
00911
00912
00913
00914 if (queryDesc)
00915 queryDesc->dest = dest;
00916
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928 if (forward)
00929 {
00930 if (portal->atEnd || count <= 0)
00931 direction = NoMovementScanDirection;
00932 else
00933 direction = ForwardScanDirection;
00934
00935
00936 if (count == FETCH_ALL)
00937 count = 0;
00938
00939 if (portal->holdStore)
00940 nprocessed = RunFromStore(portal, direction, count, dest);
00941 else
00942 {
00943 PushActiveSnapshot(queryDesc->snapshot);
00944 ExecutorRun(queryDesc, direction, count);
00945 nprocessed = queryDesc->estate->es_processed;
00946 PopActiveSnapshot();
00947 }
00948
00949 if (!ScanDirectionIsNoMovement(direction))
00950 {
00951 long oldPos;
00952
00953 if (nprocessed > 0)
00954 portal->atStart = false;
00955 if (count == 0 ||
00956 (unsigned long) nprocessed < (unsigned long) count)
00957 portal->atEnd = true;
00958 oldPos = portal->portalPos;
00959 portal->portalPos += nprocessed;
00960
00961 if (portal->portalPos < oldPos)
00962 portal->posOverflow = true;
00963 }
00964 }
00965 else
00966 {
00967 if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
00968 ereport(ERROR,
00969 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00970 errmsg("cursor can only scan forward"),
00971 errhint("Declare it with SCROLL option to enable backward scan.")));
00972
00973 if (portal->atStart || count <= 0)
00974 direction = NoMovementScanDirection;
00975 else
00976 direction = BackwardScanDirection;
00977
00978
00979 if (count == FETCH_ALL)
00980 count = 0;
00981
00982 if (portal->holdStore)
00983 nprocessed = RunFromStore(portal, direction, count, dest);
00984 else
00985 {
00986 PushActiveSnapshot(queryDesc->snapshot);
00987 ExecutorRun(queryDesc, direction, count);
00988 nprocessed = queryDesc->estate->es_processed;
00989 PopActiveSnapshot();
00990 }
00991
00992 if (!ScanDirectionIsNoMovement(direction))
00993 {
00994 if (nprocessed > 0 && portal->atEnd)
00995 {
00996 portal->atEnd = false;
00997 portal->portalPos++;
00998 }
00999 if (count == 0 ||
01000 (unsigned long) nprocessed < (unsigned long) count)
01001 {
01002 portal->atStart = true;
01003 portal->portalPos = 0;
01004 portal->posOverflow = false;
01005 }
01006 else
01007 {
01008 long oldPos;
01009
01010 oldPos = portal->portalPos;
01011 portal->portalPos -= nprocessed;
01012 if (portal->portalPos > oldPos ||
01013 portal->portalPos <= 0)
01014 portal->posOverflow = true;
01015 }
01016 }
01017 }
01018
01019 return nprocessed;
01020 }
01021
01022
01023
01024
01025
01026
01027
01028
01029 static void
01030 FillPortalStore(Portal portal, bool isTopLevel)
01031 {
01032 DestReceiver *treceiver;
01033 char completionTag[COMPLETION_TAG_BUFSIZE];
01034
01035 PortalCreateHoldStore(portal);
01036 treceiver = CreateDestReceiver(DestTuplestore);
01037 SetTuplestoreDestReceiverParams(treceiver,
01038 portal->holdStore,
01039 portal->holdContext,
01040 false);
01041
01042 completionTag[0] = '\0';
01043
01044 switch (portal->strategy)
01045 {
01046 case PORTAL_ONE_RETURNING:
01047 case PORTAL_ONE_MOD_WITH:
01048
01049
01050
01051
01052
01053
01054 PortalRunMulti(portal, isTopLevel,
01055 treceiver, None_Receiver, completionTag);
01056 break;
01057
01058 case PORTAL_UTIL_SELECT:
01059 PortalRunUtility(portal, (Node *) linitial(portal->stmts),
01060 isTopLevel, treceiver, completionTag);
01061 break;
01062
01063 default:
01064 elog(ERROR, "unsupported portal strategy: %d",
01065 (int) portal->strategy);
01066 break;
01067 }
01068
01069
01070 if (completionTag[0] != '\0')
01071 portal->commandTag = pstrdup(completionTag);
01072
01073 (*treceiver->rDestroy) (treceiver);
01074 }
01075
01076
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087
01088 static uint32
01089 RunFromStore(Portal portal, ScanDirection direction, long count,
01090 DestReceiver *dest)
01091 {
01092 long current_tuple_count = 0;
01093 TupleTableSlot *slot;
01094
01095 slot = MakeSingleTupleTableSlot(portal->tupDesc);
01096
01097 (*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc);
01098
01099 if (ScanDirectionIsNoMovement(direction))
01100 {
01101
01102 }
01103 else
01104 {
01105 bool forward = ScanDirectionIsForward(direction);
01106
01107 for (;;)
01108 {
01109 MemoryContext oldcontext;
01110 bool ok;
01111
01112 oldcontext = MemoryContextSwitchTo(portal->holdContext);
01113
01114 ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
01115 slot);
01116
01117 MemoryContextSwitchTo(oldcontext);
01118
01119 if (!ok)
01120 break;
01121
01122 (*dest->receiveSlot) (slot, dest);
01123
01124 ExecClearTuple(slot);
01125
01126
01127
01128
01129
01130
01131 current_tuple_count++;
01132 if (count && count == current_tuple_count)
01133 break;
01134 }
01135 }
01136
01137 (*dest->rShutdown) (dest);
01138
01139 ExecDropSingleTupleTableSlot(slot);
01140
01141 return (uint32) current_tuple_count;
01142 }
01143
01144
01145
01146
01147
01148 static void
01149 PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
01150 DestReceiver *dest, char *completionTag)
01151 {
01152 bool active_snapshot_set;
01153
01154 elog(DEBUG3, "ProcessUtility");
01155
01156
01157
01158
01159
01160
01161
01162
01163
01164
01165
01166
01167 if (!(IsA(utilityStmt, TransactionStmt) ||
01168 IsA(utilityStmt, LockStmt) ||
01169 IsA(utilityStmt, VariableSetStmt) ||
01170 IsA(utilityStmt, VariableShowStmt) ||
01171 IsA(utilityStmt, ConstraintsSetStmt) ||
01172
01173 IsA(utilityStmt, FetchStmt) ||
01174 IsA(utilityStmt, ListenStmt) ||
01175 IsA(utilityStmt, NotifyStmt) ||
01176 IsA(utilityStmt, UnlistenStmt) ||
01177 IsA(utilityStmt, CheckPointStmt)))
01178 {
01179 PushActiveSnapshot(GetTransactionSnapshot());
01180 active_snapshot_set = true;
01181 }
01182 else
01183 active_snapshot_set = false;
01184
01185 ProcessUtility(utilityStmt,
01186 portal->sourceText,
01187 isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
01188 portal->portalParams,
01189 dest,
01190 completionTag);
01191
01192
01193 MemoryContextSwitchTo(PortalGetHeapMemory(portal));
01194
01195
01196
01197
01198
01199
01200
01201
01202 if (active_snapshot_set && ActiveSnapshotSet())
01203 PopActiveSnapshot();
01204 }
01205
01206
01207
01208
01209
01210
01211 static void
01212 PortalRunMulti(Portal portal, bool isTopLevel,
01213 DestReceiver *dest, DestReceiver *altdest,
01214 char *completionTag)
01215 {
01216 bool active_snapshot_set = false;
01217 ListCell *stmtlist_item;
01218
01219
01220
01221
01222
01223
01224
01225
01226
01227
01228
01229 if (dest->mydest == DestRemoteExecute)
01230 dest = None_Receiver;
01231 if (altdest->mydest == DestRemoteExecute)
01232 altdest = None_Receiver;
01233
01234
01235
01236
01237
01238 foreach(stmtlist_item, portal->stmts)
01239 {
01240 Node *stmt = (Node *) lfirst(stmtlist_item);
01241
01242
01243
01244
01245 CHECK_FOR_INTERRUPTS();
01246
01247 if (IsA(stmt, PlannedStmt) &&
01248 ((PlannedStmt *) stmt)->utilityStmt == NULL)
01249 {
01250
01251
01252
01253 PlannedStmt *pstmt = (PlannedStmt *) stmt;
01254
01255 TRACE_POSTGRESQL_QUERY_EXECUTE_START();
01256
01257 if (log_executor_stats)
01258 ResetUsage();
01259
01260
01261
01262
01263
01264
01265
01266 if (!active_snapshot_set)
01267 {
01268 PushActiveSnapshot(GetTransactionSnapshot());
01269 active_snapshot_set = true;
01270 }
01271 else
01272 UpdateActiveSnapshotCommandId();
01273
01274 if (pstmt->canSetTag)
01275 {
01276
01277 ProcessQuery(pstmt,
01278 portal->sourceText,
01279 portal->portalParams,
01280 dest, completionTag);
01281 }
01282 else
01283 {
01284
01285 ProcessQuery(pstmt,
01286 portal->sourceText,
01287 portal->portalParams,
01288 altdest, NULL);
01289 }
01290
01291 if (log_executor_stats)
01292 ShowUsage("EXECUTOR STATISTICS");
01293
01294 TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
01295 }
01296 else
01297 {
01298
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308
01309
01310
01311
01312 if (list_length(portal->stmts) == 1)
01313 {
01314 Assert(!active_snapshot_set);
01315
01316 PortalRunUtility(portal, stmt, isTopLevel,
01317 dest, completionTag);
01318 }
01319 else
01320 {
01321 Assert(IsA(stmt, NotifyStmt));
01322
01323 PortalRunUtility(portal, stmt, isTopLevel,
01324 altdest, NULL);
01325 }
01326 }
01327
01328
01329
01330
01331
01332 if (lnext(stmtlist_item) != NULL)
01333 CommandCounterIncrement();
01334
01335
01336
01337
01338 Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext);
01339
01340 MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
01341 }
01342
01343
01344 if (active_snapshot_set)
01345 PopActiveSnapshot();
01346
01347
01348
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358
01359 if (completionTag && completionTag[0] == '\0')
01360 {
01361 if (portal->commandTag)
01362 strcpy(completionTag, portal->commandTag);
01363 if (strcmp(completionTag, "SELECT") == 0)
01364 sprintf(completionTag, "SELECT 0 0");
01365 else if (strcmp(completionTag, "INSERT") == 0)
01366 strcpy(completionTag, "INSERT 0 0");
01367 else if (strcmp(completionTag, "UPDATE") == 0)
01368 strcpy(completionTag, "UPDATE 0");
01369 else if (strcmp(completionTag, "DELETE") == 0)
01370 strcpy(completionTag, "DELETE 0");
01371 }
01372 }
01373
01374
01375
01376
01377
01378
01379
01380
01381
01382 long
01383 PortalRunFetch(Portal portal,
01384 FetchDirection fdirection,
01385 long count,
01386 DestReceiver *dest)
01387 {
01388 long result;
01389 Portal saveActivePortal;
01390 ResourceOwner saveResourceOwner;
01391 MemoryContext savePortalContext;
01392 MemoryContext oldContext;
01393
01394 AssertArg(PortalIsValid(portal));
01395
01396
01397
01398
01399 if (portal->status != PORTAL_READY)
01400 ereport(ERROR,
01401 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
01402 errmsg("portal \"%s\" cannot be run", portal->name)));
01403 portal->status = PORTAL_ACTIVE;
01404
01405
01406
01407
01408 saveActivePortal = ActivePortal;
01409 saveResourceOwner = CurrentResourceOwner;
01410 savePortalContext = PortalContext;
01411 PG_TRY();
01412 {
01413 ActivePortal = portal;
01414 CurrentResourceOwner = portal->resowner;
01415 PortalContext = PortalGetHeapMemory(portal);
01416
01417 oldContext = MemoryContextSwitchTo(PortalContext);
01418
01419 switch (portal->strategy)
01420 {
01421 case PORTAL_ONE_SELECT:
01422 result = DoPortalRunFetch(portal, fdirection, count, dest);
01423 break;
01424
01425 case PORTAL_ONE_RETURNING:
01426 case PORTAL_ONE_MOD_WITH:
01427 case PORTAL_UTIL_SELECT:
01428
01429
01430
01431
01432
01433 if (!portal->holdStore)
01434 FillPortalStore(portal, false );
01435
01436
01437
01438
01439 result = DoPortalRunFetch(portal, fdirection, count, dest);
01440 break;
01441
01442 default:
01443 elog(ERROR, "unsupported portal strategy");
01444 result = 0;
01445 break;
01446 }
01447 }
01448 PG_CATCH();
01449 {
01450
01451 MarkPortalFailed(portal);
01452
01453
01454 ActivePortal = saveActivePortal;
01455 CurrentResourceOwner = saveResourceOwner;
01456 PortalContext = savePortalContext;
01457
01458 PG_RE_THROW();
01459 }
01460 PG_END_TRY();
01461
01462 MemoryContextSwitchTo(oldContext);
01463
01464
01465 portal->status = PORTAL_READY;
01466
01467 ActivePortal = saveActivePortal;
01468 CurrentResourceOwner = saveResourceOwner;
01469 PortalContext = savePortalContext;
01470
01471 return result;
01472 }
01473
01474
01475
01476
01477
01478
01479
01480 static long
01481 DoPortalRunFetch(Portal portal,
01482 FetchDirection fdirection,
01483 long count,
01484 DestReceiver *dest)
01485 {
01486 bool forward;
01487
01488 Assert(portal->strategy == PORTAL_ONE_SELECT ||
01489 portal->strategy == PORTAL_ONE_RETURNING ||
01490 portal->strategy == PORTAL_ONE_MOD_WITH ||
01491 portal->strategy == PORTAL_UTIL_SELECT);
01492
01493 switch (fdirection)
01494 {
01495 case FETCH_FORWARD:
01496 if (count < 0)
01497 {
01498 fdirection = FETCH_BACKWARD;
01499 count = -count;
01500 }
01501
01502 break;
01503 case FETCH_BACKWARD:
01504 if (count < 0)
01505 {
01506 fdirection = FETCH_FORWARD;
01507 count = -count;
01508 }
01509
01510 break;
01511 case FETCH_ABSOLUTE:
01512 if (count > 0)
01513 {
01514
01515
01516
01517
01518
01519
01520
01521 if (portal->posOverflow || portal->portalPos == LONG_MAX ||
01522 count - 1 <= portal->portalPos / 2)
01523 {
01524 DoPortalRewind(portal);
01525 if (count > 1)
01526 PortalRunSelect(portal, true, count - 1,
01527 None_Receiver);
01528 }
01529 else
01530 {
01531 long pos = portal->portalPos;
01532
01533 if (portal->atEnd)
01534 pos++;
01535 if (count <= pos)
01536 PortalRunSelect(portal, false, pos - count + 1,
01537 None_Receiver);
01538 else if (count > pos + 1)
01539 PortalRunSelect(portal, true, count - pos - 1,
01540 None_Receiver);
01541 }
01542 return PortalRunSelect(portal, true, 1L, dest);
01543 }
01544 else if (count < 0)
01545 {
01546
01547
01548
01549
01550
01551
01552
01553 PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
01554 if (count < -1)
01555 PortalRunSelect(portal, false, -count - 1, None_Receiver);
01556 return PortalRunSelect(portal, false, 1L, dest);
01557 }
01558 else
01559 {
01560
01561
01562 DoPortalRewind(portal);
01563 return PortalRunSelect(portal, true, 0L, dest);
01564 }
01565 break;
01566 case FETCH_RELATIVE:
01567 if (count > 0)
01568 {
01569
01570
01571
01572 if (count > 1)
01573 PortalRunSelect(portal, true, count - 1, None_Receiver);
01574 return PortalRunSelect(portal, true, 1L, dest);
01575 }
01576 else if (count < 0)
01577 {
01578
01579
01580
01581
01582 if (count < -1)
01583 PortalRunSelect(portal, false, -count - 1, None_Receiver);
01584 return PortalRunSelect(portal, false, 1L, dest);
01585 }
01586 else
01587 {
01588
01589
01590 fdirection = FETCH_FORWARD;
01591 }
01592 break;
01593 default:
01594 elog(ERROR, "bogus direction");
01595 break;
01596 }
01597
01598
01599
01600
01601
01602 forward = (fdirection == FETCH_FORWARD);
01603
01604
01605
01606
01607 if (count == 0)
01608 {
01609 bool on_row;
01610
01611
01612 on_row = (!portal->atStart && !portal->atEnd);
01613
01614 if (dest->mydest == DestNone)
01615 {
01616
01617 return on_row ? 1L : 0L;
01618 }
01619 else
01620 {
01621
01622
01623
01624
01625
01626
01627
01628 if (on_row)
01629 {
01630 PortalRunSelect(portal, false, 1L, None_Receiver);
01631
01632 count = 1;
01633 forward = true;
01634 }
01635 }
01636 }
01637
01638
01639
01640
01641 if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
01642 {
01643 long result = portal->portalPos;
01644
01645 if (result > 0 && !portal->atEnd)
01646 result--;
01647 DoPortalRewind(portal);
01648
01649 return result;
01650 }
01651
01652 return PortalRunSelect(portal, forward, count, dest);
01653 }
01654
01655
01656
01657
01658 static void
01659 DoPortalRewind(Portal portal)
01660 {
01661 if (portal->holdStore)
01662 {
01663 MemoryContext oldcontext;
01664
01665 oldcontext = MemoryContextSwitchTo(portal->holdContext);
01666 tuplestore_rescan(portal->holdStore);
01667 MemoryContextSwitchTo(oldcontext);
01668 }
01669 if (PortalGetQueryDesc(portal))
01670 ExecutorRewind(PortalGetQueryDesc(portal));
01671
01672 portal->atStart = true;
01673 portal->atEnd = false;
01674 portal->portalPos = 0;
01675 portal->posOverflow = false;
01676 }