00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #include "postgres.h"
00035
00036 #include "access/htup_details.h"
00037 #include "catalog/objectaccess.h"
00038 #include "catalog/pg_aggregate.h"
00039 #include "catalog/pg_proc.h"
00040 #include "catalog/pg_type.h"
00041 #include "executor/executor.h"
00042 #include "executor/nodeWindowAgg.h"
00043 #include "miscadmin.h"
00044 #include "nodes/nodeFuncs.h"
00045 #include "optimizer/clauses.h"
00046 #include "parser/parse_agg.h"
00047 #include "parser/parse_coerce.h"
00048 #include "utils/acl.h"
00049 #include "utils/builtins.h"
00050 #include "utils/datum.h"
00051 #include "utils/lsyscache.h"
00052 #include "utils/memutils.h"
00053 #include "utils/syscache.h"
00054 #include "windowapi.h"
00055
00056
00057
00058
00059
00060 typedef struct WindowObjectData
00061 {
00062 NodeTag type;
00063 WindowAggState *winstate;
00064 List *argstates;
00065 void *localmem;
00066 int markptr;
00067 int readptr;
00068 int64 markpos;
00069 int64 seekpos;
00070 } WindowObjectData;
00071
00072
00073
00074
00075
00076 typedef struct WindowStatePerFuncData
00077 {
00078
00079 WindowFuncExprState *wfuncstate;
00080 WindowFunc *wfunc;
00081
00082 int numArguments;
00083
00084 FmgrInfo flinfo;
00085
00086 Oid winCollation;
00087
00088
00089
00090
00091
00092 int16 resulttypeLen;
00093 bool resulttypeByVal;
00094
00095 bool plain_agg;
00096 int aggno;
00097
00098 WindowObject winobj;
00099 } WindowStatePerFuncData;
00100
00101
00102
00103
00104 typedef struct WindowStatePerAggData
00105 {
00106
00107 Oid transfn_oid;
00108 Oid finalfn_oid;
00109
00110
00111
00112
00113
00114
00115 FmgrInfo transfn;
00116 FmgrInfo finalfn;
00117
00118
00119
00120
00121 Datum initValue;
00122 bool initValueIsNull;
00123
00124
00125
00126
00127 Datum resultValue;
00128 bool resultValueIsNull;
00129
00130
00131
00132
00133
00134 int16 inputtypeLen,
00135 resulttypeLen,
00136 transtypeLen;
00137 bool inputtypeByVal,
00138 resulttypeByVal,
00139 transtypeByVal;
00140
00141 int wfuncno;
00142
00143
00144 Datum transValue;
00145 bool transValueIsNull;
00146
00147 bool noTransValue;
00148 } WindowStatePerAggData;
00149
00150 static void initialize_windowaggregate(WindowAggState *winstate,
00151 WindowStatePerFunc perfuncstate,
00152 WindowStatePerAgg peraggstate);
00153 static void advance_windowaggregate(WindowAggState *winstate,
00154 WindowStatePerFunc perfuncstate,
00155 WindowStatePerAgg peraggstate);
00156 static void finalize_windowaggregate(WindowAggState *winstate,
00157 WindowStatePerFunc perfuncstate,
00158 WindowStatePerAgg peraggstate,
00159 Datum *result, bool *isnull);
00160
00161 static void eval_windowaggregates(WindowAggState *winstate);
00162 static void eval_windowfunction(WindowAggState *winstate,
00163 WindowStatePerFunc perfuncstate,
00164 Datum *result, bool *isnull);
00165
00166 static void begin_partition(WindowAggState *winstate);
00167 static void spool_tuples(WindowAggState *winstate, int64 pos);
00168 static void release_partition(WindowAggState *winstate);
00169
00170 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
00171 TupleTableSlot *slot);
00172 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
00173 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
00174
00175 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
00176 WindowFunc *wfunc,
00177 WindowStatePerAgg peraggstate);
00178 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
00179
00180 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
00181 TupleTableSlot *slot2);
00182 static bool window_gettupleslot(WindowObject winobj, int64 pos,
00183 TupleTableSlot *slot);
00184
00185
00186
00187
00188
00189
00190 static void
00191 initialize_windowaggregate(WindowAggState *winstate,
00192 WindowStatePerFunc perfuncstate,
00193 WindowStatePerAgg peraggstate)
00194 {
00195 MemoryContext oldContext;
00196
00197 if (peraggstate->initValueIsNull)
00198 peraggstate->transValue = peraggstate->initValue;
00199 else
00200 {
00201 oldContext = MemoryContextSwitchTo(winstate->aggcontext);
00202 peraggstate->transValue = datumCopy(peraggstate->initValue,
00203 peraggstate->transtypeByVal,
00204 peraggstate->transtypeLen);
00205 MemoryContextSwitchTo(oldContext);
00206 }
00207 peraggstate->transValueIsNull = peraggstate->initValueIsNull;
00208 peraggstate->noTransValue = peraggstate->initValueIsNull;
00209 peraggstate->resultValueIsNull = true;
00210 }
00211
00212
00213
00214
00215
00216 static void
00217 advance_windowaggregate(WindowAggState *winstate,
00218 WindowStatePerFunc perfuncstate,
00219 WindowStatePerAgg peraggstate)
00220 {
00221 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
00222 int numArguments = perfuncstate->numArguments;
00223 FunctionCallInfoData fcinfodata;
00224 FunctionCallInfo fcinfo = &fcinfodata;
00225 Datum newVal;
00226 ListCell *arg;
00227 int i;
00228 MemoryContext oldContext;
00229 ExprContext *econtext = winstate->tmpcontext;
00230
00231 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
00232
00233
00234 i = 1;
00235 foreach(arg, wfuncstate->args)
00236 {
00237 ExprState *argstate = (ExprState *) lfirst(arg);
00238
00239 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
00240 &fcinfo->argnull[i], NULL);
00241 i++;
00242 }
00243
00244 if (peraggstate->transfn.fn_strict)
00245 {
00246
00247
00248
00249
00250 for (i = 1; i <= numArguments; i++)
00251 {
00252 if (fcinfo->argnull[i])
00253 {
00254 MemoryContextSwitchTo(oldContext);
00255 return;
00256 }
00257 }
00258 if (peraggstate->noTransValue)
00259 {
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269 MemoryContextSwitchTo(winstate->aggcontext);
00270 peraggstate->transValue = datumCopy(fcinfo->arg[1],
00271 peraggstate->transtypeByVal,
00272 peraggstate->transtypeLen);
00273 peraggstate->transValueIsNull = false;
00274 peraggstate->noTransValue = false;
00275 MemoryContextSwitchTo(oldContext);
00276 return;
00277 }
00278 if (peraggstate->transValueIsNull)
00279 {
00280
00281
00282
00283
00284
00285
00286 MemoryContextSwitchTo(oldContext);
00287 return;
00288 }
00289 }
00290
00291
00292
00293
00294 InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
00295 numArguments + 1,
00296 perfuncstate->winCollation,
00297 (void *) winstate, NULL);
00298 fcinfo->arg[0] = peraggstate->transValue;
00299 fcinfo->argnull[0] = peraggstate->transValueIsNull;
00300 newVal = FunctionCallInvoke(fcinfo);
00301
00302
00303
00304
00305
00306
00307 if (!peraggstate->transtypeByVal &&
00308 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
00309 {
00310 if (!fcinfo->isnull)
00311 {
00312 MemoryContextSwitchTo(winstate->aggcontext);
00313 newVal = datumCopy(newVal,
00314 peraggstate->transtypeByVal,
00315 peraggstate->transtypeLen);
00316 }
00317 if (!peraggstate->transValueIsNull)
00318 pfree(DatumGetPointer(peraggstate->transValue));
00319 }
00320
00321 MemoryContextSwitchTo(oldContext);
00322 peraggstate->transValue = newVal;
00323 peraggstate->transValueIsNull = fcinfo->isnull;
00324 }
00325
00326
00327
00328
00329
00330 static void
00331 finalize_windowaggregate(WindowAggState *winstate,
00332 WindowStatePerFunc perfuncstate,
00333 WindowStatePerAgg peraggstate,
00334 Datum *result, bool *isnull)
00335 {
00336 MemoryContext oldContext;
00337
00338 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
00339
00340
00341
00342
00343 if (OidIsValid(peraggstate->finalfn_oid))
00344 {
00345 FunctionCallInfoData fcinfo;
00346
00347 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
00348 perfuncstate->winCollation,
00349 (void *) winstate, NULL);
00350 fcinfo.arg[0] = peraggstate->transValue;
00351 fcinfo.argnull[0] = peraggstate->transValueIsNull;
00352 if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
00353 {
00354
00355 *result = (Datum) 0;
00356 *isnull = true;
00357 }
00358 else
00359 {
00360 *result = FunctionCallInvoke(&fcinfo);
00361 *isnull = fcinfo.isnull;
00362 }
00363 }
00364 else
00365 {
00366 *result = peraggstate->transValue;
00367 *isnull = peraggstate->transValueIsNull;
00368 }
00369
00370
00371
00372
00373 if (!peraggstate->resulttypeByVal && !*isnull &&
00374 !MemoryContextContains(CurrentMemoryContext,
00375 DatumGetPointer(*result)))
00376 *result = datumCopy(*result,
00377 peraggstate->resulttypeByVal,
00378 peraggstate->resulttypeLen);
00379 MemoryContextSwitchTo(oldContext);
00380 }
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391 static void
00392 eval_windowaggregates(WindowAggState *winstate)
00393 {
00394 WindowStatePerAgg peraggstate;
00395 int wfuncno,
00396 numaggs;
00397 int i;
00398 MemoryContext oldContext;
00399 ExprContext *econtext;
00400 WindowObject agg_winobj;
00401 TupleTableSlot *agg_row_slot;
00402
00403 numaggs = winstate->numaggs;
00404 if (numaggs == 0)
00405 return;
00406
00407
00408 econtext = winstate->ss.ps.ps_ExprContext;
00409 agg_winobj = winstate->agg_winobj;
00410 agg_row_slot = winstate->agg_row_slot;
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453 update_frameheadpos(agg_winobj, winstate->temp_slot_1);
00454
00455
00456
00457
00458
00459 if (winstate->currentpos == 0 ||
00460 winstate->frameheadpos != winstate->aggregatedbase)
00461 {
00462
00463
00464
00465 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
00466
00467 for (i = 0; i < numaggs; i++)
00468 {
00469 peraggstate = &winstate->peragg[i];
00470 wfuncno = peraggstate->wfuncno;
00471 initialize_windowaggregate(winstate,
00472 &winstate->perfunc[wfuncno],
00473 peraggstate);
00474 }
00475
00476
00477
00478
00479
00480 if (agg_winobj->markptr >= 0)
00481 WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
00482
00483
00484
00485
00486 ExecClearTuple(agg_row_slot);
00487 winstate->aggregatedbase = winstate->frameheadpos;
00488 winstate->aggregatedupto = winstate->frameheadpos;
00489 }
00490
00491
00492
00493
00494
00495
00496
00497
00498 if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
00499 FRAMEOPTION_END_CURRENT_ROW)) &&
00500 winstate->aggregatedbase <= winstate->currentpos &&
00501 winstate->aggregatedupto > winstate->currentpos)
00502 {
00503 for (i = 0; i < numaggs; i++)
00504 {
00505 peraggstate = &winstate->peragg[i];
00506 wfuncno = peraggstate->wfuncno;
00507 econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
00508 econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
00509 }
00510 return;
00511 }
00512
00513
00514
00515
00516
00517
00518
00519
00520 for (;;)
00521 {
00522
00523 if (TupIsNull(agg_row_slot))
00524 {
00525 if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
00526 agg_row_slot))
00527 break;
00528 }
00529
00530
00531 if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
00532 break;
00533
00534
00535 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
00536
00537
00538 for (i = 0; i < numaggs; i++)
00539 {
00540 peraggstate = &winstate->peragg[i];
00541 wfuncno = peraggstate->wfuncno;
00542 advance_windowaggregate(winstate,
00543 &winstate->perfunc[wfuncno],
00544 peraggstate);
00545 }
00546
00547
00548 ResetExprContext(winstate->tmpcontext);
00549
00550
00551 winstate->aggregatedupto++;
00552 ExecClearTuple(agg_row_slot);
00553 }
00554
00555
00556
00557
00558 for (i = 0; i < numaggs; i++)
00559 {
00560 Datum *result;
00561 bool *isnull;
00562
00563 peraggstate = &winstate->peragg[i];
00564 wfuncno = peraggstate->wfuncno;
00565 result = &econtext->ecxt_aggvalues[wfuncno];
00566 isnull = &econtext->ecxt_aggnulls[wfuncno];
00567 finalize_windowaggregate(winstate,
00568 &winstate->perfunc[wfuncno],
00569 peraggstate,
00570 result, isnull);
00571
00572
00573
00574
00575
00576
00577
00578
00579 if (!peraggstate->resulttypeByVal)
00580 {
00581
00582
00583
00584
00585
00586 if (!peraggstate->resultValueIsNull)
00587 pfree(DatumGetPointer(peraggstate->resultValue));
00588
00589
00590
00591
00592 if (!*isnull)
00593 {
00594 oldContext = MemoryContextSwitchTo(winstate->aggcontext);
00595 peraggstate->resultValue =
00596 datumCopy(*result,
00597 peraggstate->resulttypeByVal,
00598 peraggstate->resulttypeLen);
00599 MemoryContextSwitchTo(oldContext);
00600 }
00601 }
00602 else
00603 {
00604 peraggstate->resultValue = *result;
00605 }
00606 peraggstate->resultValueIsNull = *isnull;
00607 }
00608 }
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619 static void
00620 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
00621 Datum *result, bool *isnull)
00622 {
00623 FunctionCallInfoData fcinfo;
00624 MemoryContext oldContext;
00625
00626 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
00627
00628
00629
00630
00631
00632
00633
00634 InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
00635 perfuncstate->numArguments,
00636 perfuncstate->winCollation,
00637 (void *) perfuncstate->winobj, NULL);
00638
00639 memset(fcinfo.argnull, true, perfuncstate->numArguments);
00640
00641 *result = FunctionCallInvoke(&fcinfo);
00642 *isnull = fcinfo.isnull;
00643
00644
00645
00646
00647
00648
00649 if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
00650 !MemoryContextContains(CurrentMemoryContext,
00651 DatumGetPointer(*result)))
00652 *result = datumCopy(*result,
00653 perfuncstate->resulttypeByVal,
00654 perfuncstate->resulttypeLen);
00655
00656 MemoryContextSwitchTo(oldContext);
00657 }
00658
00659
00660
00661
00662
00663 static void
00664 begin_partition(WindowAggState *winstate)
00665 {
00666 PlanState *outerPlan = outerPlanState(winstate);
00667 int numfuncs = winstate->numfuncs;
00668 int i;
00669
00670 winstate->partition_spooled = false;
00671 winstate->framehead_valid = false;
00672 winstate->frametail_valid = false;
00673 winstate->spooled_rows = 0;
00674 winstate->currentpos = 0;
00675 winstate->frameheadpos = 0;
00676 winstate->frametailpos = -1;
00677 ExecClearTuple(winstate->agg_row_slot);
00678
00679
00680
00681
00682
00683 if (TupIsNull(winstate->first_part_slot))
00684 {
00685 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
00686
00687 if (!TupIsNull(outerslot))
00688 ExecCopySlot(winstate->first_part_slot, outerslot);
00689 else
00690 {
00691
00692 winstate->partition_spooled = true;
00693 winstate->more_partitions = false;
00694 return;
00695 }
00696 }
00697
00698
00699 winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
00700
00701
00702
00703
00704
00705
00706 winstate->current_ptr = 0;
00707
00708
00709 tuplestore_set_eflags(winstate->buffer, 0);
00710
00711
00712 if (winstate->numaggs > 0)
00713 {
00714 WindowObject agg_winobj = winstate->agg_winobj;
00715 int readptr_flags = 0;
00716
00717
00718 if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
00719 {
00720
00721 agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
00722
00723 readptr_flags |= EXEC_FLAG_BACKWARD;
00724 }
00725
00726 agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
00727 readptr_flags);
00728 agg_winobj->markpos = -1;
00729 agg_winobj->seekpos = -1;
00730
00731
00732 winstate->aggregatedbase = 0;
00733 winstate->aggregatedupto = 0;
00734 }
00735
00736
00737 for (i = 0; i < numfuncs; i++)
00738 {
00739 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
00740
00741 if (!perfuncstate->plain_agg)
00742 {
00743 WindowObject winobj = perfuncstate->winobj;
00744
00745 winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
00746 0);
00747 winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
00748 EXEC_FLAG_BACKWARD);
00749 winobj->markpos = -1;
00750 winobj->seekpos = -1;
00751 }
00752 }
00753
00754
00755
00756
00757
00758 tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
00759 winstate->spooled_rows++;
00760 }
00761
00762
00763
00764
00765
00766 static void
00767 spool_tuples(WindowAggState *winstate, int64 pos)
00768 {
00769 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
00770 PlanState *outerPlan;
00771 TupleTableSlot *outerslot;
00772 MemoryContext oldcontext;
00773
00774 if (!winstate->buffer)
00775 return;
00776 if (winstate->partition_spooled)
00777 return;
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787 if (!tuplestore_in_memory(winstate->buffer))
00788 pos = -1;
00789
00790 outerPlan = outerPlanState(winstate);
00791
00792
00793 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
00794
00795 while (winstate->spooled_rows <= pos || pos == -1)
00796 {
00797 outerslot = ExecProcNode(outerPlan);
00798 if (TupIsNull(outerslot))
00799 {
00800
00801 winstate->partition_spooled = true;
00802 winstate->more_partitions = false;
00803 break;
00804 }
00805
00806 if (node->partNumCols > 0)
00807 {
00808
00809 if (!execTuplesMatch(winstate->first_part_slot,
00810 outerslot,
00811 node->partNumCols, node->partColIdx,
00812 winstate->partEqfunctions,
00813 winstate->tmpcontext->ecxt_per_tuple_memory))
00814 {
00815
00816
00817
00818 ExecCopySlot(winstate->first_part_slot, outerslot);
00819 winstate->partition_spooled = true;
00820 winstate->more_partitions = true;
00821 break;
00822 }
00823 }
00824
00825
00826 tuplestore_puttupleslot(winstate->buffer, outerslot);
00827 winstate->spooled_rows++;
00828 }
00829
00830 MemoryContextSwitchTo(oldcontext);
00831 }
00832
00833
00834
00835
00836
00837
00838 static void
00839 release_partition(WindowAggState *winstate)
00840 {
00841 int i;
00842
00843 for (i = 0; i < winstate->numfuncs; i++)
00844 {
00845 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
00846
00847
00848 if (perfuncstate->winobj)
00849 perfuncstate->winobj->localmem = NULL;
00850 }
00851
00852
00853
00854
00855
00856
00857
00858 MemoryContextResetAndDeleteChildren(winstate->partcontext);
00859 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
00860
00861 if (winstate->buffer)
00862 tuplestore_end(winstate->buffer);
00863 winstate->buffer = NULL;
00864 winstate->partition_spooled = false;
00865 }
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876 static bool
00877 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
00878 {
00879 int frameOptions = winstate->frameOptions;
00880
00881 Assert(pos >= 0);
00882
00883
00884 if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
00885 {
00886 if (frameOptions & FRAMEOPTION_ROWS)
00887 {
00888
00889 if (pos < winstate->currentpos)
00890 return false;
00891 }
00892 else if (frameOptions & FRAMEOPTION_RANGE)
00893 {
00894
00895 if (pos < winstate->currentpos &&
00896 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
00897 return false;
00898 }
00899 else
00900 Assert(false);
00901 }
00902 else if (frameOptions & FRAMEOPTION_START_VALUE)
00903 {
00904 if (frameOptions & FRAMEOPTION_ROWS)
00905 {
00906 int64 offset = DatumGetInt64(winstate->startOffsetValue);
00907
00908
00909 if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
00910 offset = -offset;
00911
00912 if (pos < winstate->currentpos + offset)
00913 return false;
00914 }
00915 else if (frameOptions & FRAMEOPTION_RANGE)
00916 {
00917
00918 elog(ERROR, "window frame with value offset is not implemented");
00919 }
00920 else
00921 Assert(false);
00922 }
00923
00924
00925 if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
00926 {
00927 if (frameOptions & FRAMEOPTION_ROWS)
00928 {
00929
00930 if (pos > winstate->currentpos)
00931 return false;
00932 }
00933 else if (frameOptions & FRAMEOPTION_RANGE)
00934 {
00935
00936 if (pos > winstate->currentpos &&
00937 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
00938 return false;
00939 }
00940 else
00941 Assert(false);
00942 }
00943 else if (frameOptions & FRAMEOPTION_END_VALUE)
00944 {
00945 if (frameOptions & FRAMEOPTION_ROWS)
00946 {
00947 int64 offset = DatumGetInt64(winstate->endOffsetValue);
00948
00949
00950 if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
00951 offset = -offset;
00952
00953 if (pos > winstate->currentpos + offset)
00954 return false;
00955 }
00956 else if (frameOptions & FRAMEOPTION_RANGE)
00957 {
00958
00959 elog(ERROR, "window frame with value offset is not implemented");
00960 }
00961 else
00962 Assert(false);
00963 }
00964
00965
00966 return true;
00967 }
00968
00969
00970
00971
00972
00973
00974
00975
00976
00977
00978 static void
00979 update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
00980 {
00981 WindowAggState *winstate = winobj->winstate;
00982 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
00983 int frameOptions = winstate->frameOptions;
00984
00985 if (winstate->framehead_valid)
00986 return;
00987
00988 if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
00989 {
00990
00991 winstate->frameheadpos = 0;
00992 winstate->framehead_valid = true;
00993 }
00994 else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
00995 {
00996 if (frameOptions & FRAMEOPTION_ROWS)
00997 {
00998
00999 winstate->frameheadpos = winstate->currentpos;
01000 winstate->framehead_valid = true;
01001 }
01002 else if (frameOptions & FRAMEOPTION_RANGE)
01003 {
01004 int64 fhprev;
01005
01006
01007 if (node->ordNumCols == 0)
01008 {
01009 winstate->frameheadpos = 0;
01010 winstate->framehead_valid = true;
01011 return;
01012 }
01013
01014
01015
01016
01017
01018
01019
01020
01021 fhprev = winstate->currentpos - 1;
01022 for (;;)
01023 {
01024
01025 if (fhprev < winstate->frameheadpos)
01026 break;
01027 if (!window_gettupleslot(winobj, fhprev, slot))
01028 break;
01029 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
01030 break;
01031 fhprev--;
01032 }
01033 winstate->frameheadpos = fhprev + 1;
01034 winstate->framehead_valid = true;
01035 }
01036 else
01037 Assert(false);
01038 }
01039 else if (frameOptions & FRAMEOPTION_START_VALUE)
01040 {
01041 if (frameOptions & FRAMEOPTION_ROWS)
01042 {
01043
01044 int64 offset = DatumGetInt64(winstate->startOffsetValue);
01045
01046 if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
01047 offset = -offset;
01048
01049 winstate->frameheadpos = winstate->currentpos + offset;
01050
01051 if (winstate->frameheadpos < 0)
01052 winstate->frameheadpos = 0;
01053 else if (winstate->frameheadpos > winstate->currentpos)
01054 {
01055
01056 spool_tuples(winstate, winstate->frameheadpos - 1);
01057 if (winstate->frameheadpos > winstate->spooled_rows)
01058 winstate->frameheadpos = winstate->spooled_rows;
01059 }
01060 winstate->framehead_valid = true;
01061 }
01062 else if (frameOptions & FRAMEOPTION_RANGE)
01063 {
01064
01065 elog(ERROR, "window frame with value offset is not implemented");
01066 }
01067 else
01068 Assert(false);
01069 }
01070 else
01071 Assert(false);
01072 }
01073
01074
01075
01076
01077
01078
01079
01080
01081
01082
01083 static void
01084 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
01085 {
01086 WindowAggState *winstate = winobj->winstate;
01087 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
01088 int frameOptions = winstate->frameOptions;
01089
01090 if (winstate->frametail_valid)
01091 return;
01092
01093 if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
01094 {
01095
01096 spool_tuples(winstate, -1);
01097 winstate->frametailpos = winstate->spooled_rows - 1;
01098 winstate->frametail_valid = true;
01099 }
01100 else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
01101 {
01102 if (frameOptions & FRAMEOPTION_ROWS)
01103 {
01104
01105 winstate->frametailpos = winstate->currentpos;
01106 winstate->frametail_valid = true;
01107 }
01108 else if (frameOptions & FRAMEOPTION_RANGE)
01109 {
01110 int64 ftnext;
01111
01112
01113 if (node->ordNumCols == 0)
01114 {
01115 spool_tuples(winstate, -1);
01116 winstate->frametailpos = winstate->spooled_rows - 1;
01117 winstate->frametail_valid = true;
01118 return;
01119 }
01120
01121
01122
01123
01124
01125
01126
01127
01128 ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
01129 for (;;)
01130 {
01131 if (!window_gettupleslot(winobj, ftnext, slot))
01132 break;
01133 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
01134 break;
01135 ftnext++;
01136 }
01137 winstate->frametailpos = ftnext - 1;
01138 winstate->frametail_valid = true;
01139 }
01140 else
01141 Assert(false);
01142 }
01143 else if (frameOptions & FRAMEOPTION_END_VALUE)
01144 {
01145 if (frameOptions & FRAMEOPTION_ROWS)
01146 {
01147
01148 int64 offset = DatumGetInt64(winstate->endOffsetValue);
01149
01150 if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
01151 offset = -offset;
01152
01153 winstate->frametailpos = winstate->currentpos + offset;
01154
01155 if (winstate->frametailpos < 0)
01156 winstate->frametailpos = -1;
01157 else if (winstate->frametailpos > winstate->currentpos)
01158 {
01159
01160 spool_tuples(winstate, winstate->frametailpos);
01161 if (winstate->frametailpos >= winstate->spooled_rows)
01162 winstate->frametailpos = winstate->spooled_rows - 1;
01163 }
01164 winstate->frametail_valid = true;
01165 }
01166 else if (frameOptions & FRAMEOPTION_RANGE)
01167 {
01168
01169 elog(ERROR, "window frame with value offset is not implemented");
01170 }
01171 else
01172 Assert(false);
01173 }
01174 else
01175 Assert(false);
01176 }
01177
01178
01179
01180
01181
01182
01183
01184
01185
01186
01187
01188
01189 TupleTableSlot *
01190 ExecWindowAgg(WindowAggState *winstate)
01191 {
01192 TupleTableSlot *result;
01193 ExprDoneCond isDone;
01194 ExprContext *econtext;
01195 int i;
01196 int numfuncs;
01197
01198 if (winstate->all_done)
01199 return NULL;
01200
01201
01202
01203
01204
01205
01206 if (winstate->ss.ps.ps_TupFromTlist)
01207 {
01208 TupleTableSlot *result;
01209 ExprDoneCond isDone;
01210
01211 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
01212 if (isDone == ExprMultipleResult)
01213 return result;
01214
01215 winstate->ss.ps.ps_TupFromTlist = false;
01216 }
01217
01218
01219
01220
01221 if (winstate->all_first)
01222 {
01223 int frameOptions = winstate->frameOptions;
01224 ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
01225 Datum value;
01226 bool isnull;
01227 int16 len;
01228 bool byval;
01229
01230 if (frameOptions & FRAMEOPTION_START_VALUE)
01231 {
01232 Assert(winstate->startOffset != NULL);
01233 value = ExecEvalExprSwitchContext(winstate->startOffset,
01234 econtext,
01235 &isnull,
01236 NULL);
01237 if (isnull)
01238 ereport(ERROR,
01239 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
01240 errmsg("frame starting offset must not be null")));
01241
01242 get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
01243 &len, &byval);
01244 winstate->startOffsetValue = datumCopy(value, byval, len);
01245 if (frameOptions & FRAMEOPTION_ROWS)
01246 {
01247
01248 int64 offset = DatumGetInt64(value);
01249
01250 if (offset < 0)
01251 ereport(ERROR,
01252 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01253 errmsg("frame starting offset must not be negative")));
01254 }
01255 }
01256 if (frameOptions & FRAMEOPTION_END_VALUE)
01257 {
01258 Assert(winstate->endOffset != NULL);
01259 value = ExecEvalExprSwitchContext(winstate->endOffset,
01260 econtext,
01261 &isnull,
01262 NULL);
01263 if (isnull)
01264 ereport(ERROR,
01265 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
01266 errmsg("frame ending offset must not be null")));
01267
01268 get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
01269 &len, &byval);
01270 winstate->endOffsetValue = datumCopy(value, byval, len);
01271 if (frameOptions & FRAMEOPTION_ROWS)
01272 {
01273
01274 int64 offset = DatumGetInt64(value);
01275
01276 if (offset < 0)
01277 ereport(ERROR,
01278 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01279 errmsg("frame ending offset must not be negative")));
01280 }
01281 }
01282 winstate->all_first = false;
01283 }
01284
01285 restart:
01286 if (winstate->buffer == NULL)
01287 {
01288
01289 begin_partition(winstate);
01290
01291 }
01292 else
01293 {
01294
01295 winstate->currentpos++;
01296
01297 winstate->framehead_valid = false;
01298 winstate->frametail_valid = false;
01299 }
01300
01301
01302
01303
01304
01305 spool_tuples(winstate, winstate->currentpos);
01306
01307
01308 if (winstate->partition_spooled &&
01309 winstate->currentpos >= winstate->spooled_rows)
01310 {
01311 release_partition(winstate);
01312
01313 if (winstate->more_partitions)
01314 {
01315 begin_partition(winstate);
01316 Assert(winstate->spooled_rows > 0);
01317 }
01318 else
01319 {
01320 winstate->all_done = true;
01321 return NULL;
01322 }
01323 }
01324
01325
01326 econtext = winstate->ss.ps.ps_ExprContext;
01327
01328
01329 ResetExprContext(econtext);
01330
01331
01332
01333
01334
01335
01336
01337
01338
01339
01340 tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
01341 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
01342 winstate->ss.ss_ScanTupleSlot))
01343 elog(ERROR, "unexpected end of tuplestore");
01344
01345
01346
01347
01348 numfuncs = winstate->numfuncs;
01349 for (i = 0; i < numfuncs; i++)
01350 {
01351 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
01352
01353 if (perfuncstate->plain_agg)
01354 continue;
01355 eval_windowfunction(winstate, perfuncstate,
01356 &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
01357 &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
01358 }
01359
01360
01361
01362
01363 if (winstate->numaggs > 0)
01364 eval_windowaggregates(winstate);
01365
01366
01367
01368
01369 tuplestore_trim(winstate->buffer);
01370
01371
01372
01373
01374
01375
01376 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
01377 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
01378
01379 if (isDone == ExprEndResult)
01380 {
01381
01382 goto restart;
01383 }
01384
01385 winstate->ss.ps.ps_TupFromTlist =
01386 (isDone == ExprMultipleResult);
01387 return result;
01388 }
01389
01390
01391
01392
01393
01394
01395
01396
01397 WindowAggState *
01398 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
01399 {
01400 WindowAggState *winstate;
01401 Plan *outerPlan;
01402 ExprContext *econtext;
01403 ExprContext *tmpcontext;
01404 WindowStatePerFunc perfunc;
01405 WindowStatePerAgg peragg;
01406 int numfuncs,
01407 wfuncno,
01408 numaggs,
01409 aggno;
01410 ListCell *l;
01411
01412
01413 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
01414
01415
01416
01417
01418 winstate = makeNode(WindowAggState);
01419 winstate->ss.ps.plan = (Plan *) node;
01420 winstate->ss.ps.state = estate;
01421
01422
01423
01424
01425
01426
01427 ExecAssignExprContext(estate, &winstate->ss.ps);
01428 tmpcontext = winstate->ss.ps.ps_ExprContext;
01429 winstate->tmpcontext = tmpcontext;
01430 ExecAssignExprContext(estate, &winstate->ss.ps);
01431
01432
01433 winstate->partcontext =
01434 AllocSetContextCreate(CurrentMemoryContext,
01435 "WindowAgg_Partition",
01436 ALLOCSET_DEFAULT_MINSIZE,
01437 ALLOCSET_DEFAULT_INITSIZE,
01438 ALLOCSET_DEFAULT_MAXSIZE);
01439
01440
01441 winstate->aggcontext =
01442 AllocSetContextCreate(CurrentMemoryContext,
01443 "WindowAgg_Aggregates",
01444 ALLOCSET_DEFAULT_MINSIZE,
01445 ALLOCSET_DEFAULT_INITSIZE,
01446 ALLOCSET_DEFAULT_MAXSIZE);
01447
01448
01449
01450
01451 ExecInitScanTupleSlot(estate, &winstate->ss);
01452 ExecInitResultTupleSlot(estate, &winstate->ss.ps);
01453 winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
01454 winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
01455 winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
01456 winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
01457
01458 winstate->ss.ps.targetlist = (List *)
01459 ExecInitExpr((Expr *) node->plan.targetlist,
01460 (PlanState *) winstate);
01461
01462
01463
01464
01465
01466 Assert(node->plan.qual == NIL);
01467 winstate->ss.ps.qual = NIL;
01468
01469
01470
01471
01472 outerPlan = outerPlan(node);
01473 outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
01474
01475
01476
01477
01478
01479 ExecAssignScanTypeFromOuterPlan(&winstate->ss);
01480
01481 ExecSetSlotDescriptor(winstate->first_part_slot,
01482 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
01483 ExecSetSlotDescriptor(winstate->agg_row_slot,
01484 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
01485 ExecSetSlotDescriptor(winstate->temp_slot_1,
01486 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
01487 ExecSetSlotDescriptor(winstate->temp_slot_2,
01488 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
01489
01490
01491
01492
01493 ExecAssignResultTypeFromTL(&winstate->ss.ps);
01494 ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
01495
01496 winstate->ss.ps.ps_TupFromTlist = false;
01497
01498
01499 if (node->partNumCols > 0)
01500 winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
01501 node->partOperators);
01502 if (node->ordNumCols > 0)
01503 winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
01504 node->ordOperators);
01505
01506
01507
01508
01509 numfuncs = winstate->numfuncs;
01510 numaggs = winstate->numaggs;
01511 econtext = winstate->ss.ps.ps_ExprContext;
01512 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
01513 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
01514
01515
01516
01517
01518 perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
01519 peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
01520 winstate->perfunc = perfunc;
01521 winstate->peragg = peragg;
01522
01523 wfuncno = -1;
01524 aggno = -1;
01525 foreach(l, winstate->funcs)
01526 {
01527 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
01528 WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
01529 WindowStatePerFunc perfuncstate;
01530 AclResult aclresult;
01531 int i;
01532
01533 if (wfunc->winref != node->winref)
01534 elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
01535 wfunc->winref, node->winref);
01536
01537
01538 for (i = 0; i <= wfuncno; i++)
01539 {
01540 if (equal(wfunc, perfunc[i].wfunc) &&
01541 !contain_volatile_functions((Node *) wfunc))
01542 break;
01543 }
01544 if (i <= wfuncno)
01545 {
01546
01547 wfuncstate->wfuncno = i;
01548 continue;
01549 }
01550
01551
01552 perfuncstate = &perfunc[++wfuncno];
01553
01554
01555 wfuncstate->wfuncno = wfuncno;
01556
01557
01558 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
01559 ACL_EXECUTE);
01560 if (aclresult != ACLCHECK_OK)
01561 aclcheck_error(aclresult, ACL_KIND_PROC,
01562 get_func_name(wfunc->winfnoid));
01563 InvokeFunctionExecuteHook(wfunc->winfnoid);
01564
01565
01566 perfuncstate->wfuncstate = wfuncstate;
01567 perfuncstate->wfunc = wfunc;
01568 perfuncstate->numArguments = list_length(wfuncstate->args);
01569
01570 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
01571 econtext->ecxt_per_query_memory);
01572 fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
01573
01574 perfuncstate->winCollation = wfunc->inputcollid;
01575
01576 get_typlenbyval(wfunc->wintype,
01577 &perfuncstate->resulttypeLen,
01578 &perfuncstate->resulttypeByVal);
01579
01580
01581
01582
01583
01584 perfuncstate->plain_agg = wfunc->winagg;
01585 if (wfunc->winagg)
01586 {
01587 WindowStatePerAgg peraggstate;
01588
01589 perfuncstate->aggno = ++aggno;
01590 peraggstate = &winstate->peragg[aggno];
01591 initialize_peragg(winstate, wfunc, peraggstate);
01592 peraggstate->wfuncno = wfuncno;
01593 }
01594 else
01595 {
01596 WindowObject winobj = makeNode(WindowObjectData);
01597
01598 winobj->winstate = winstate;
01599 winobj->argstates = wfuncstate->args;
01600 winobj->localmem = NULL;
01601 perfuncstate->winobj = winobj;
01602 }
01603 }
01604
01605
01606 winstate->numfuncs = wfuncno + 1;
01607 winstate->numaggs = aggno + 1;
01608
01609
01610 if (winstate->numaggs > 0)
01611 {
01612 WindowObject agg_winobj = makeNode(WindowObjectData);
01613
01614 agg_winobj->winstate = winstate;
01615 agg_winobj->argstates = NIL;
01616 agg_winobj->localmem = NULL;
01617
01618 agg_winobj->markptr = -1;
01619 agg_winobj->readptr = -1;
01620 winstate->agg_winobj = agg_winobj;
01621 }
01622
01623
01624 winstate->frameOptions = node->frameOptions;
01625
01626
01627 winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
01628 (PlanState *) winstate);
01629 winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
01630 (PlanState *) winstate);
01631
01632 winstate->all_first = true;
01633 winstate->partition_spooled = false;
01634 winstate->more_partitions = false;
01635
01636 return winstate;
01637 }
01638
01639
01640
01641
01642
01643 void
01644 ExecEndWindowAgg(WindowAggState *node)
01645 {
01646 PlanState *outerPlan;
01647
01648 release_partition(node);
01649
01650 pfree(node->perfunc);
01651 pfree(node->peragg);
01652
01653 ExecClearTuple(node->ss.ss_ScanTupleSlot);
01654 ExecClearTuple(node->first_part_slot);
01655 ExecClearTuple(node->agg_row_slot);
01656 ExecClearTuple(node->temp_slot_1);
01657 ExecClearTuple(node->temp_slot_2);
01658
01659
01660
01661
01662 ExecFreeExprContext(&node->ss.ps);
01663 node->ss.ps.ps_ExprContext = node->tmpcontext;
01664 ExecFreeExprContext(&node->ss.ps);
01665
01666 MemoryContextDelete(node->partcontext);
01667 MemoryContextDelete(node->aggcontext);
01668
01669 outerPlan = outerPlanState(node);
01670 ExecEndNode(outerPlan);
01671 }
01672
01673
01674
01675
01676
01677 void
01678 ExecReScanWindowAgg(WindowAggState *node)
01679 {
01680 ExprContext *econtext = node->ss.ps.ps_ExprContext;
01681
01682 node->all_done = false;
01683
01684 node->ss.ps.ps_TupFromTlist = false;
01685 node->all_first = true;
01686
01687
01688 release_partition(node);
01689
01690
01691 ExecClearTuple(node->ss.ss_ScanTupleSlot);
01692 ExecClearTuple(node->first_part_slot);
01693 ExecClearTuple(node->agg_row_slot);
01694 ExecClearTuple(node->temp_slot_1);
01695 ExecClearTuple(node->temp_slot_2);
01696
01697
01698 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
01699 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
01700
01701
01702
01703
01704
01705 if (node->ss.ps.lefttree->chgParam == NULL)
01706 ExecReScan(node->ss.ps.lefttree);
01707 }
01708
01709
01710
01711
01712
01713
01714 static WindowStatePerAggData *
01715 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
01716 WindowStatePerAgg peraggstate)
01717 {
01718 Oid inputTypes[FUNC_MAX_ARGS];
01719 int numArguments;
01720 HeapTuple aggTuple;
01721 Form_pg_aggregate aggform;
01722 Oid aggtranstype;
01723 AclResult aclresult;
01724 Oid transfn_oid,
01725 finalfn_oid;
01726 Expr *transfnexpr,
01727 *finalfnexpr;
01728 Datum textInitVal;
01729 int i;
01730 ListCell *lc;
01731
01732 numArguments = list_length(wfunc->args);
01733
01734 i = 0;
01735 foreach(lc, wfunc->args)
01736 {
01737 inputTypes[i++] = exprType((Node *) lfirst(lc));
01738 }
01739
01740 aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
01741 if (!HeapTupleIsValid(aggTuple))
01742 elog(ERROR, "cache lookup failed for aggregate %u",
01743 wfunc->winfnoid);
01744 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
01745
01746
01747
01748
01749
01750
01751 peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
01752 peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
01753
01754
01755 {
01756 HeapTuple procTuple;
01757 Oid aggOwner;
01758
01759 procTuple = SearchSysCache1(PROCOID,
01760 ObjectIdGetDatum(wfunc->winfnoid));
01761 if (!HeapTupleIsValid(procTuple))
01762 elog(ERROR, "cache lookup failed for function %u",
01763 wfunc->winfnoid);
01764 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
01765 ReleaseSysCache(procTuple);
01766
01767 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
01768 ACL_EXECUTE);
01769 if (aclresult != ACLCHECK_OK)
01770 aclcheck_error(aclresult, ACL_KIND_PROC,
01771 get_func_name(transfn_oid));
01772 InvokeFunctionExecuteHook(transfn_oid);
01773 if (OidIsValid(finalfn_oid))
01774 {
01775 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
01776 ACL_EXECUTE);
01777 if (aclresult != ACLCHECK_OK)
01778 aclcheck_error(aclresult, ACL_KIND_PROC,
01779 get_func_name(finalfn_oid));
01780 InvokeFunctionExecuteHook(finalfn_oid);
01781 }
01782 }
01783
01784
01785 aggtranstype = aggform->aggtranstype;
01786 if (IsPolymorphicType(aggtranstype))
01787 {
01788
01789 Oid *declaredArgTypes;
01790 int agg_nargs;
01791
01792 get_func_signature(wfunc->winfnoid,
01793 &declaredArgTypes, &agg_nargs);
01794 Assert(agg_nargs == numArguments);
01795 aggtranstype = enforce_generic_type_consistency(inputTypes,
01796 declaredArgTypes,
01797 agg_nargs,
01798 aggtranstype,
01799 false);
01800 pfree(declaredArgTypes);
01801 }
01802
01803
01804 build_aggregate_fnexprs(inputTypes,
01805 numArguments,
01806 aggtranstype,
01807 wfunc->wintype,
01808 wfunc->inputcollid,
01809 transfn_oid,
01810 finalfn_oid,
01811 &transfnexpr,
01812 &finalfnexpr);
01813
01814 fmgr_info(transfn_oid, &peraggstate->transfn);
01815 fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
01816
01817 if (OidIsValid(finalfn_oid))
01818 {
01819 fmgr_info(finalfn_oid, &peraggstate->finalfn);
01820 fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
01821 }
01822
01823 get_typlenbyval(wfunc->wintype,
01824 &peraggstate->resulttypeLen,
01825 &peraggstate->resulttypeByVal);
01826 get_typlenbyval(aggtranstype,
01827 &peraggstate->transtypeLen,
01828 &peraggstate->transtypeByVal);
01829
01830
01831
01832
01833
01834 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
01835 Anum_pg_aggregate_agginitval,
01836 &peraggstate->initValueIsNull);
01837
01838 if (peraggstate->initValueIsNull)
01839 peraggstate->initValue = (Datum) 0;
01840 else
01841 peraggstate->initValue = GetAggInitVal(textInitVal,
01842 aggtranstype);
01843
01844
01845
01846
01847
01848
01849
01850 if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
01851 {
01852 if (numArguments < 1 ||
01853 !IsBinaryCoercible(inputTypes[0], aggtranstype))
01854 ereport(ERROR,
01855 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01856 errmsg("aggregate %u needs to have compatible input type and transition type",
01857 wfunc->winfnoid)));
01858 }
01859
01860 ReleaseSysCache(aggTuple);
01861
01862 return peraggstate;
01863 }
01864
01865 static Datum
01866 GetAggInitVal(Datum textInitVal, Oid transtype)
01867 {
01868 Oid typinput,
01869 typioparam;
01870 char *strInitVal;
01871 Datum initVal;
01872
01873 getTypeInputInfo(transtype, &typinput, &typioparam);
01874 strInitVal = TextDatumGetCString(textInitVal);
01875 initVal = OidInputFunctionCall(typinput, strInitVal,
01876 typioparam, -1);
01877 pfree(strInitVal);
01878 return initVal;
01879 }
01880
01881
01882
01883
01884
01885
01886
01887 static bool
01888 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
01889 TupleTableSlot *slot2)
01890 {
01891 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
01892
01893
01894 if (node->ordNumCols == 0)
01895 return true;
01896
01897 return execTuplesMatch(slot1, slot2,
01898 node->ordNumCols, node->ordColIdx,
01899 winstate->ordEqfunctions,
01900 winstate->tmpcontext->ecxt_per_tuple_memory);
01901 }
01902
01903
01904
01905
01906
01907
01908
01909
01910 static bool
01911 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
01912 {
01913 WindowAggState *winstate = winobj->winstate;
01914 MemoryContext oldcontext;
01915
01916
01917 if (pos < 0)
01918 return false;
01919
01920
01921 spool_tuples(winstate, pos);
01922
01923 if (pos >= winstate->spooled_rows)
01924 return false;
01925
01926 if (pos < winobj->markpos)
01927 elog(ERROR, "cannot fetch row before WindowObject's mark position");
01928
01929 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
01930
01931 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
01932
01933
01934
01935
01936
01937
01938
01939 if (winobj->seekpos == pos)
01940 {
01941 tuplestore_advance(winstate->buffer, true);
01942 winobj->seekpos++;
01943 }
01944
01945 while (winobj->seekpos > pos)
01946 {
01947 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
01948 elog(ERROR, "unexpected end of tuplestore");
01949 winobj->seekpos--;
01950 }
01951
01952 while (winobj->seekpos < pos)
01953 {
01954 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
01955 elog(ERROR, "unexpected end of tuplestore");
01956 winobj->seekpos++;
01957 }
01958
01959 MemoryContextSwitchTo(oldcontext);
01960
01961 return true;
01962 }
01963
01964
01965
01966
01967
01968
01969
01970
01971
01972
01973
01974
01975
01976
01977
01978
01979
01980
01981
01982 void *
01983 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
01984 {
01985 Assert(WindowObjectIsValid(winobj));
01986 if (winobj->localmem == NULL)
01987 winobj->localmem =
01988 MemoryContextAllocZero(winobj->winstate->partcontext, sz);
01989 return winobj->localmem;
01990 }
01991
01992
01993
01994
01995
01996
01997 int64
01998 WinGetCurrentPosition(WindowObject winobj)
01999 {
02000 Assert(WindowObjectIsValid(winobj));
02001 return winobj->winstate->currentpos;
02002 }
02003
02004
02005
02006
02007
02008
02009
02010
02011
02012 int64
02013 WinGetPartitionRowCount(WindowObject winobj)
02014 {
02015 Assert(WindowObjectIsValid(winobj));
02016 spool_tuples(winobj->winstate, -1);
02017 return winobj->winstate->spooled_rows;
02018 }
02019
02020
02021
02022
02023
02024
02025
02026
02027
02028
02029
02030 void
02031 WinSetMarkPosition(WindowObject winobj, int64 markpos)
02032 {
02033 WindowAggState *winstate;
02034
02035 Assert(WindowObjectIsValid(winobj));
02036 winstate = winobj->winstate;
02037
02038 if (markpos < winobj->markpos)
02039 elog(ERROR, "cannot move WindowObject's mark position backward");
02040 tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
02041 while (markpos > winobj->markpos)
02042 {
02043 tuplestore_advance(winstate->buffer, true);
02044 winobj->markpos++;
02045 }
02046 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
02047 while (markpos > winobj->seekpos)
02048 {
02049 tuplestore_advance(winstate->buffer, true);
02050 winobj->seekpos++;
02051 }
02052 }
02053
02054
02055
02056
02057
02058
02059
02060
02061 bool
02062 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
02063 {
02064 WindowAggState *winstate;
02065 WindowAgg *node;
02066 TupleTableSlot *slot1;
02067 TupleTableSlot *slot2;
02068 bool res;
02069
02070 Assert(WindowObjectIsValid(winobj));
02071 winstate = winobj->winstate;
02072 node = (WindowAgg *) winstate->ss.ps.plan;
02073
02074
02075 if (node->ordNumCols == 0)
02076 return true;
02077
02078 slot1 = winstate->temp_slot_1;
02079 slot2 = winstate->temp_slot_2;
02080
02081 if (!window_gettupleslot(winobj, pos1, slot1))
02082 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
02083 pos1);
02084 if (!window_gettupleslot(winobj, pos2, slot2))
02085 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
02086 pos2);
02087
02088 res = are_peers(winstate, slot1, slot2);
02089
02090 ExecClearTuple(slot1);
02091 ExecClearTuple(slot2);
02092
02093 return res;
02094 }
02095
02096
02097
02098
02099
02100
02101
02102
02103
02104
02105
02106
02107
02108
02109
02110
02111
02112
02113
02114 Datum
02115 WinGetFuncArgInPartition(WindowObject winobj, int argno,
02116 int relpos, int seektype, bool set_mark,
02117 bool *isnull, bool *isout)
02118 {
02119 WindowAggState *winstate;
02120 ExprContext *econtext;
02121 TupleTableSlot *slot;
02122 bool gottuple;
02123 int64 abs_pos;
02124
02125 Assert(WindowObjectIsValid(winobj));
02126 winstate = winobj->winstate;
02127 econtext = winstate->ss.ps.ps_ExprContext;
02128 slot = winstate->temp_slot_1;
02129
02130 switch (seektype)
02131 {
02132 case WINDOW_SEEK_CURRENT:
02133 abs_pos = winstate->currentpos + relpos;
02134 break;
02135 case WINDOW_SEEK_HEAD:
02136 abs_pos = relpos;
02137 break;
02138 case WINDOW_SEEK_TAIL:
02139 spool_tuples(winstate, -1);
02140 abs_pos = winstate->spooled_rows - 1 + relpos;
02141 break;
02142 default:
02143 elog(ERROR, "unrecognized window seek type: %d", seektype);
02144 abs_pos = 0;
02145 break;
02146 }
02147
02148 gottuple = window_gettupleslot(winobj, abs_pos, slot);
02149
02150 if (!gottuple)
02151 {
02152 if (isout)
02153 *isout = true;
02154 *isnull = true;
02155 return (Datum) 0;
02156 }
02157 else
02158 {
02159 if (isout)
02160 *isout = false;
02161 if (set_mark)
02162 {
02163 int frameOptions = winstate->frameOptions;
02164 int64 mark_pos = abs_pos;
02165
02166
02167
02168
02169
02170
02171
02172
02173
02174
02175
02176
02177 if ((frameOptions & FRAMEOPTION_RANGE) &&
02178 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
02179 {
02180 update_frameheadpos(winobj, winstate->temp_slot_2);
02181 if (mark_pos > winstate->frameheadpos)
02182 mark_pos = winstate->frameheadpos;
02183 }
02184 WinSetMarkPosition(winobj, mark_pos);
02185 }
02186 econtext->ecxt_outertuple = slot;
02187 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
02188 econtext, isnull, NULL);
02189 }
02190 }
02191
02192
02193
02194
02195
02196
02197
02198
02199
02200
02201
02202
02203
02204
02205
02206
02207
02208
02209
02210 Datum
02211 WinGetFuncArgInFrame(WindowObject winobj, int argno,
02212 int relpos, int seektype, bool set_mark,
02213 bool *isnull, bool *isout)
02214 {
02215 WindowAggState *winstate;
02216 ExprContext *econtext;
02217 TupleTableSlot *slot;
02218 bool gottuple;
02219 int64 abs_pos;
02220
02221 Assert(WindowObjectIsValid(winobj));
02222 winstate = winobj->winstate;
02223 econtext = winstate->ss.ps.ps_ExprContext;
02224 slot = winstate->temp_slot_1;
02225
02226 switch (seektype)
02227 {
02228 case WINDOW_SEEK_CURRENT:
02229 abs_pos = winstate->currentpos + relpos;
02230 break;
02231 case WINDOW_SEEK_HEAD:
02232 update_frameheadpos(winobj, slot);
02233 abs_pos = winstate->frameheadpos + relpos;
02234 break;
02235 case WINDOW_SEEK_TAIL:
02236 update_frametailpos(winobj, slot);
02237 abs_pos = winstate->frametailpos + relpos;
02238 break;
02239 default:
02240 elog(ERROR, "unrecognized window seek type: %d", seektype);
02241 abs_pos = 0;
02242 break;
02243 }
02244
02245 gottuple = window_gettupleslot(winobj, abs_pos, slot);
02246 if (gottuple)
02247 gottuple = row_is_in_frame(winstate, abs_pos, slot);
02248
02249 if (!gottuple)
02250 {
02251 if (isout)
02252 *isout = true;
02253 *isnull = true;
02254 return (Datum) 0;
02255 }
02256 else
02257 {
02258 if (isout)
02259 *isout = false;
02260 if (set_mark)
02261 {
02262 int frameOptions = winstate->frameOptions;
02263 int64 mark_pos = abs_pos;
02264
02265
02266
02267
02268
02269
02270
02271
02272
02273
02274
02275
02276 if ((frameOptions & FRAMEOPTION_RANGE) &&
02277 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
02278 {
02279 update_frameheadpos(winobj, winstate->temp_slot_2);
02280 if (mark_pos > winstate->frameheadpos)
02281 mark_pos = winstate->frameheadpos;
02282 }
02283 WinSetMarkPosition(winobj, mark_pos);
02284 }
02285 econtext->ecxt_outertuple = slot;
02286 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
02287 econtext, isnull, NULL);
02288 }
02289 }
02290
02291
02292
02293
02294
02295
02296
02297
02298
02299
02300
02301
02302
02303
02304 Datum
02305 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
02306 {
02307 WindowAggState *winstate;
02308 ExprContext *econtext;
02309
02310 Assert(WindowObjectIsValid(winobj));
02311 winstate = winobj->winstate;
02312
02313 econtext = winstate->ss.ps.ps_ExprContext;
02314
02315 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
02316 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
02317 econtext, isnull, NULL);
02318 }