Header And Logo

PostgreSQL
| The world's most advanced open source database.

nodeWindowAgg.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * nodeWindowAgg.c
00004  *    routines to handle WindowAgg nodes.
00005  *
00006  * A WindowAgg node evaluates "window functions" across suitable partitions
00007  * of the input tuple set.  Any one WindowAgg works for just a single window
00008  * specification, though it can evaluate multiple window functions sharing
00009  * identical window specifications.  The input tuples are required to be
00010  * delivered in sorted order, with the PARTITION BY columns (if any) as
00011  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
00012  * (The planner generates a stack of WindowAggs with intervening Sort nodes
00013  * as needed, if a query involves more than one window specification.)
00014  *
00015  * Since window functions can require access to any or all of the rows in
00016  * the current partition, we accumulate rows of the partition into a
00017  * tuplestore.  The window functions are called using the WindowObject API
00018  * so that they can access those rows as needed.
00019  *
00020  * We also support using plain aggregate functions as window functions.
00021  * For these, the regular Agg-node environment is emulated for each partition.
00022  * As required by the SQL spec, the output represents the value of the
00023  * aggregate function over all rows in the current row's window frame.
00024  *
00025  *
00026  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00027  * Portions Copyright (c) 1994, Regents of the University of California
00028  *
00029  * IDENTIFICATION
00030  *    src/backend/executor/nodeWindowAgg.c
00031  *
00032  *-------------------------------------------------------------------------
00033  */
00034 #include "postgres.h"
00035 
00036 #include "access/htup_details.h"
00037 #include "catalog/objectaccess.h"
00038 #include "catalog/pg_aggregate.h"
00039 #include "catalog/pg_proc.h"
00040 #include "catalog/pg_type.h"
00041 #include "executor/executor.h"
00042 #include "executor/nodeWindowAgg.h"
00043 #include "miscadmin.h"
00044 #include "nodes/nodeFuncs.h"
00045 #include "optimizer/clauses.h"
00046 #include "parser/parse_agg.h"
00047 #include "parser/parse_coerce.h"
00048 #include "utils/acl.h"
00049 #include "utils/builtins.h"
00050 #include "utils/datum.h"
00051 #include "utils/lsyscache.h"
00052 #include "utils/memutils.h"
00053 #include "utils/syscache.h"
00054 #include "windowapi.h"
00055 
00056 /*
00057  * All the window function APIs are called with this object, which is passed
00058  * to window functions as fcinfo->context.
00059  */
00060 typedef struct WindowObjectData
00061 {
00062     NodeTag     type;
00063     WindowAggState *winstate;   /* parent WindowAggState */
00064     List       *argstates;      /* ExprState trees for fn's arguments */
00065     void       *localmem;       /* WinGetPartitionLocalMemory's chunk */
00066     int         markptr;        /* tuplestore mark pointer for this fn */
00067     int         readptr;        /* tuplestore read pointer for this fn */
00068     int64       markpos;        /* row that markptr is positioned on */
00069     int64       seekpos;        /* row that readptr is positioned on */
00070 } WindowObjectData;
00071 
00072 /*
00073  * We have one WindowStatePerFunc struct for each window function and
00074  * window aggregate handled by this node.
00075  */
00076 typedef struct WindowStatePerFuncData
00077 {
00078     /* Links to WindowFunc expr and state nodes this working state is for */
00079     WindowFuncExprState *wfuncstate;
00080     WindowFunc *wfunc;
00081 
00082     int         numArguments;   /* number of arguments */
00083 
00084     FmgrInfo    flinfo;         /* fmgr lookup data for window function */
00085 
00086     Oid         winCollation;   /* collation derived for window function */
00087 
00088     /*
00089      * We need the len and byval info for the result of each function in order
00090      * to know how to copy/delete values.
00091      */
00092     int16       resulttypeLen;
00093     bool        resulttypeByVal;
00094 
00095     bool        plain_agg;      /* is it just a plain aggregate function? */
00096     int         aggno;          /* if so, index of its PerAggData */
00097 
00098     WindowObject winobj;        /* object used in window function API */
00099 }   WindowStatePerFuncData;
00100 
00101 /*
00102  * For plain aggregate window functions, we also have one of these.
00103  */
00104 typedef struct WindowStatePerAggData
00105 {
00106     /* Oids of transfer functions */
00107     Oid         transfn_oid;
00108     Oid         finalfn_oid;    /* may be InvalidOid */
00109 
00110     /*
00111      * fmgr lookup data for transfer functions --- only valid when
00112      * corresponding oid is not InvalidOid.  Note in particular that fn_strict
00113      * flags are kept here.
00114      */
00115     FmgrInfo    transfn;
00116     FmgrInfo    finalfn;
00117 
00118     /*
00119      * initial value from pg_aggregate entry
00120      */
00121     Datum       initValue;
00122     bool        initValueIsNull;
00123 
00124     /*
00125      * cached value for current frame boundaries
00126      */
00127     Datum       resultValue;
00128     bool        resultValueIsNull;
00129 
00130     /*
00131      * We need the len and byval info for the agg's input, result, and
00132      * transition data types in order to know how to copy/delete values.
00133      */
00134     int16       inputtypeLen,
00135                 resulttypeLen,
00136                 transtypeLen;
00137     bool        inputtypeByVal,
00138                 resulttypeByVal,
00139                 transtypeByVal;
00140 
00141     int         wfuncno;        /* index of associated PerFuncData */
00142 
00143     /* Current transition value */
00144     Datum       transValue;     /* current transition value */
00145     bool        transValueIsNull;
00146 
00147     bool        noTransValue;   /* true if transValue not set yet */
00148 } WindowStatePerAggData;
00149 
00150 static void initialize_windowaggregate(WindowAggState *winstate,
00151                            WindowStatePerFunc perfuncstate,
00152                            WindowStatePerAgg peraggstate);
00153 static void advance_windowaggregate(WindowAggState *winstate,
00154                         WindowStatePerFunc perfuncstate,
00155                         WindowStatePerAgg peraggstate);
00156 static void finalize_windowaggregate(WindowAggState *winstate,
00157                          WindowStatePerFunc perfuncstate,
00158                          WindowStatePerAgg peraggstate,
00159                          Datum *result, bool *isnull);
00160 
00161 static void eval_windowaggregates(WindowAggState *winstate);
00162 static void eval_windowfunction(WindowAggState *winstate,
00163                     WindowStatePerFunc perfuncstate,
00164                     Datum *result, bool *isnull);
00165 
00166 static void begin_partition(WindowAggState *winstate);
00167 static void spool_tuples(WindowAggState *winstate, int64 pos);
00168 static void release_partition(WindowAggState *winstate);
00169 
00170 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
00171                 TupleTableSlot *slot);
00172 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
00173 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
00174 
00175 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
00176                   WindowFunc *wfunc,
00177                   WindowStatePerAgg peraggstate);
00178 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
00179 
00180 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
00181           TupleTableSlot *slot2);
00182 static bool window_gettupleslot(WindowObject winobj, int64 pos,
00183                     TupleTableSlot *slot);
00184 
00185 
00186 /*
00187  * initialize_windowaggregate
00188  * parallel to initialize_aggregates in nodeAgg.c
00189  */
00190 static void
00191 initialize_windowaggregate(WindowAggState *winstate,
00192                            WindowStatePerFunc perfuncstate,
00193                            WindowStatePerAgg peraggstate)
00194 {
00195     MemoryContext oldContext;
00196 
00197     if (peraggstate->initValueIsNull)
00198         peraggstate->transValue = peraggstate->initValue;
00199     else
00200     {
00201         oldContext = MemoryContextSwitchTo(winstate->aggcontext);
00202         peraggstate->transValue = datumCopy(peraggstate->initValue,
00203                                             peraggstate->transtypeByVal,
00204                                             peraggstate->transtypeLen);
00205         MemoryContextSwitchTo(oldContext);
00206     }
00207     peraggstate->transValueIsNull = peraggstate->initValueIsNull;
00208     peraggstate->noTransValue = peraggstate->initValueIsNull;
00209     peraggstate->resultValueIsNull = true;
00210 }
00211 
00212 /*
00213  * advance_windowaggregate
00214  * parallel to advance_aggregates in nodeAgg.c
00215  */
00216 static void
00217 advance_windowaggregate(WindowAggState *winstate,
00218                         WindowStatePerFunc perfuncstate,
00219                         WindowStatePerAgg peraggstate)
00220 {
00221     WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
00222     int         numArguments = perfuncstate->numArguments;
00223     FunctionCallInfoData fcinfodata;
00224     FunctionCallInfo fcinfo = &fcinfodata;
00225     Datum       newVal;
00226     ListCell   *arg;
00227     int         i;
00228     MemoryContext oldContext;
00229     ExprContext *econtext = winstate->tmpcontext;
00230 
00231     oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
00232 
00233     /* We start from 1, since the 0th arg will be the transition value */
00234     i = 1;
00235     foreach(arg, wfuncstate->args)
00236     {
00237         ExprState  *argstate = (ExprState *) lfirst(arg);
00238 
00239         fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
00240                                       &fcinfo->argnull[i], NULL);
00241         i++;
00242     }
00243 
00244     if (peraggstate->transfn.fn_strict)
00245     {
00246         /*
00247          * For a strict transfn, nothing happens when there's a NULL input; we
00248          * just keep the prior transValue.
00249          */
00250         for (i = 1; i <= numArguments; i++)
00251         {
00252             if (fcinfo->argnull[i])
00253             {
00254                 MemoryContextSwitchTo(oldContext);
00255                 return;
00256             }
00257         }
00258         if (peraggstate->noTransValue)
00259         {
00260             /*
00261              * transValue has not been initialized. This is the first non-NULL
00262              * input value. We use it as the initial value for transValue. (We
00263              * already checked that the agg's input type is binary-compatible
00264              * with its transtype, so straight copy here is OK.)
00265              *
00266              * We must copy the datum into aggcontext if it is pass-by-ref. We
00267              * do not need to pfree the old transValue, since it's NULL.
00268              */
00269             MemoryContextSwitchTo(winstate->aggcontext);
00270             peraggstate->transValue = datumCopy(fcinfo->arg[1],
00271                                                 peraggstate->transtypeByVal,
00272                                                 peraggstate->transtypeLen);
00273             peraggstate->transValueIsNull = false;
00274             peraggstate->noTransValue = false;
00275             MemoryContextSwitchTo(oldContext);
00276             return;
00277         }
00278         if (peraggstate->transValueIsNull)
00279         {
00280             /*
00281              * Don't call a strict function with NULL inputs.  Note it is
00282              * possible to get here despite the above tests, if the transfn is
00283              * strict *and* returned a NULL on a prior cycle. If that happens
00284              * we will propagate the NULL all the way to the end.
00285              */
00286             MemoryContextSwitchTo(oldContext);
00287             return;
00288         }
00289     }
00290 
00291     /*
00292      * OK to call the transition function
00293      */
00294     InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
00295                              numArguments + 1,
00296                              perfuncstate->winCollation,
00297                              (void *) winstate, NULL);
00298     fcinfo->arg[0] = peraggstate->transValue;
00299     fcinfo->argnull[0] = peraggstate->transValueIsNull;
00300     newVal = FunctionCallInvoke(fcinfo);
00301 
00302     /*
00303      * If pass-by-ref datatype, must copy the new value into aggcontext and
00304      * pfree the prior transValue.  But if transfn returned a pointer to its
00305      * first input, we don't need to do anything.
00306      */
00307     if (!peraggstate->transtypeByVal &&
00308         DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
00309     {
00310         if (!fcinfo->isnull)
00311         {
00312             MemoryContextSwitchTo(winstate->aggcontext);
00313             newVal = datumCopy(newVal,
00314                                peraggstate->transtypeByVal,
00315                                peraggstate->transtypeLen);
00316         }
00317         if (!peraggstate->transValueIsNull)
00318             pfree(DatumGetPointer(peraggstate->transValue));
00319     }
00320 
00321     MemoryContextSwitchTo(oldContext);
00322     peraggstate->transValue = newVal;
00323     peraggstate->transValueIsNull = fcinfo->isnull;
00324 }
00325 
00326 /*
00327  * finalize_windowaggregate
00328  * parallel to finalize_aggregate in nodeAgg.c
00329  */
00330 static void
00331 finalize_windowaggregate(WindowAggState *winstate,
00332                          WindowStatePerFunc perfuncstate,
00333                          WindowStatePerAgg peraggstate,
00334                          Datum *result, bool *isnull)
00335 {
00336     MemoryContext oldContext;
00337 
00338     oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
00339 
00340     /*
00341      * Apply the agg's finalfn if one is provided, else return transValue.
00342      */
00343     if (OidIsValid(peraggstate->finalfn_oid))
00344     {
00345         FunctionCallInfoData fcinfo;
00346 
00347         InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
00348                                  perfuncstate->winCollation,
00349                                  (void *) winstate, NULL);
00350         fcinfo.arg[0] = peraggstate->transValue;
00351         fcinfo.argnull[0] = peraggstate->transValueIsNull;
00352         if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
00353         {
00354             /* don't call a strict function with NULL inputs */
00355             *result = (Datum) 0;
00356             *isnull = true;
00357         }
00358         else
00359         {
00360             *result = FunctionCallInvoke(&fcinfo);
00361             *isnull = fcinfo.isnull;
00362         }
00363     }
00364     else
00365     {
00366         *result = peraggstate->transValue;
00367         *isnull = peraggstate->transValueIsNull;
00368     }
00369 
00370     /*
00371      * If result is pass-by-ref, make sure it is in the right context.
00372      */
00373     if (!peraggstate->resulttypeByVal && !*isnull &&
00374         !MemoryContextContains(CurrentMemoryContext,
00375                                DatumGetPointer(*result)))
00376         *result = datumCopy(*result,
00377                             peraggstate->resulttypeByVal,
00378                             peraggstate->resulttypeLen);
00379     MemoryContextSwitchTo(oldContext);
00380 }
00381 
00382 /*
00383  * eval_windowaggregates
00384  * evaluate plain aggregates being used as window functions
00385  *
00386  * Much of this is duplicated from nodeAgg.c.  But NOTE that we expect to be
00387  * able to call aggregate final functions repeatedly after aggregating more
00388  * data onto the same transition value.  This is not a behavior required by
00389  * nodeAgg.c.
00390  */
00391 static void
00392 eval_windowaggregates(WindowAggState *winstate)
00393 {
00394     WindowStatePerAgg peraggstate;
00395     int         wfuncno,
00396                 numaggs;
00397     int         i;
00398     MemoryContext oldContext;
00399     ExprContext *econtext;
00400     WindowObject agg_winobj;
00401     TupleTableSlot *agg_row_slot;
00402 
00403     numaggs = winstate->numaggs;
00404     if (numaggs == 0)
00405         return;                 /* nothing to do */
00406 
00407     /* final output execution is in ps_ExprContext */
00408     econtext = winstate->ss.ps.ps_ExprContext;
00409     agg_winobj = winstate->agg_winobj;
00410     agg_row_slot = winstate->agg_row_slot;
00411 
00412     /*
00413      * Currently, we support only a subset of the SQL-standard window framing
00414      * rules.
00415      *
00416      * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
00417      * a contiguous group of rows extending forward from the start of the
00418      * partition, and rows only enter the frame, never exit it, as the current
00419      * row advances forward.  This makes it possible to use an incremental
00420      * strategy for evaluating aggregates: we run the transition function for
00421      * each row added to the frame, and run the final function whenever we
00422      * need the current aggregate value.  This is considerably more efficient
00423      * than the naive approach of re-running the entire aggregate calculation
00424      * for each current row.  It does assume that the final function doesn't
00425      * damage the running transition value, but we have the same assumption in
00426      * nodeAgg.c too (when it rescans an existing hash table).
00427      *
00428      * For other frame start rules, we discard the aggregate state and re-run
00429      * the aggregates whenever the frame head row moves.  We can still
00430      * optimize as above whenever successive rows share the same frame head.
00431      *
00432      * In many common cases, multiple rows share the same frame and hence the
00433      * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
00434      * window, then all rows are peers and so they all have window frame equal
00435      * to the whole partition.)  We optimize such cases by calculating the
00436      * aggregate value once when we reach the first row of a peer group, and
00437      * then returning the saved value for all subsequent rows.
00438      *
00439      * 'aggregatedupto' keeps track of the first row that has not yet been
00440      * accumulated into the aggregate transition values.  Whenever we start a
00441      * new peer group, we accumulate forward to the end of the peer group.
00442      *
00443      * TODO: Rerunning aggregates from the frame start can be pretty slow. For
00444      * some aggregates like SUM and COUNT we could avoid that by implementing
00445      * a "negative transition function" that would be called for each row as
00446      * it exits the frame.  We'd have to think about avoiding recalculation of
00447      * volatile arguments of aggregate functions, too.
00448      */
00449 
00450     /*
00451      * First, update the frame head position.
00452      */
00453     update_frameheadpos(agg_winobj, winstate->temp_slot_1);
00454 
00455     /*
00456      * Initialize aggregates on first call for partition, or if the frame head
00457      * position moved since last time.
00458      */
00459     if (winstate->currentpos == 0 ||
00460         winstate->frameheadpos != winstate->aggregatedbase)
00461     {
00462         /*
00463          * Discard transient aggregate values
00464          */
00465         MemoryContextResetAndDeleteChildren(winstate->aggcontext);
00466 
00467         for (i = 0; i < numaggs; i++)
00468         {
00469             peraggstate = &winstate->peragg[i];
00470             wfuncno = peraggstate->wfuncno;
00471             initialize_windowaggregate(winstate,
00472                                        &winstate->perfunc[wfuncno],
00473                                        peraggstate);
00474         }
00475 
00476         /*
00477          * If we created a mark pointer for aggregates, keep it pushed up to
00478          * frame head, so that tuplestore can discard unnecessary rows.
00479          */
00480         if (agg_winobj->markptr >= 0)
00481             WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
00482 
00483         /*
00484          * Initialize for loop below
00485          */
00486         ExecClearTuple(agg_row_slot);
00487         winstate->aggregatedbase = winstate->frameheadpos;
00488         winstate->aggregatedupto = winstate->frameheadpos;
00489     }
00490 
00491     /*
00492      * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
00493      * except when the frame head moves.  In END_CURRENT_ROW mode, we only
00494      * have to recalculate when the frame head moves or currentpos has
00495      * advanced past the place we'd aggregated up to.  Check for these cases
00496      * and if so, reuse the saved result values.
00497      */
00498     if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
00499                                    FRAMEOPTION_END_CURRENT_ROW)) &&
00500         winstate->aggregatedbase <= winstate->currentpos &&
00501         winstate->aggregatedupto > winstate->currentpos)
00502     {
00503         for (i = 0; i < numaggs; i++)
00504         {
00505             peraggstate = &winstate->peragg[i];
00506             wfuncno = peraggstate->wfuncno;
00507             econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
00508             econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
00509         }
00510         return;
00511     }
00512 
00513     /*
00514      * Advance until we reach a row not in frame (or end of partition).
00515      *
00516      * Note the loop invariant: agg_row_slot is either empty or holds the row
00517      * at position aggregatedupto.  We advance aggregatedupto after processing
00518      * a row.
00519      */
00520     for (;;)
00521     {
00522         /* Fetch next row if we didn't already */
00523         if (TupIsNull(agg_row_slot))
00524         {
00525             if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
00526                                      agg_row_slot))
00527                 break;          /* must be end of partition */
00528         }
00529 
00530         /* Exit loop (for now) if not in frame */
00531         if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
00532             break;
00533 
00534         /* Set tuple context for evaluation of aggregate arguments */
00535         winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
00536 
00537         /* Accumulate row into the aggregates */
00538         for (i = 0; i < numaggs; i++)
00539         {
00540             peraggstate = &winstate->peragg[i];
00541             wfuncno = peraggstate->wfuncno;
00542             advance_windowaggregate(winstate,
00543                                     &winstate->perfunc[wfuncno],
00544                                     peraggstate);
00545         }
00546 
00547         /* Reset per-input-tuple context after each tuple */
00548         ResetExprContext(winstate->tmpcontext);
00549 
00550         /* And advance the aggregated-row state */
00551         winstate->aggregatedupto++;
00552         ExecClearTuple(agg_row_slot);
00553     }
00554 
00555     /*
00556      * finalize aggregates and fill result/isnull fields.
00557      */
00558     for (i = 0; i < numaggs; i++)
00559     {
00560         Datum      *result;
00561         bool       *isnull;
00562 
00563         peraggstate = &winstate->peragg[i];
00564         wfuncno = peraggstate->wfuncno;
00565         result = &econtext->ecxt_aggvalues[wfuncno];
00566         isnull = &econtext->ecxt_aggnulls[wfuncno];
00567         finalize_windowaggregate(winstate,
00568                                  &winstate->perfunc[wfuncno],
00569                                  peraggstate,
00570                                  result, isnull);
00571 
00572         /*
00573          * save the result in case next row shares the same frame.
00574          *
00575          * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
00576          * advance that the next row can't possibly share the same frame. Is
00577          * it worth detecting that and skipping this code?
00578          */
00579         if (!peraggstate->resulttypeByVal)
00580         {
00581             /*
00582              * clear old resultValue in order not to leak memory.  (Note: the
00583              * new result can't possibly be the same datum as old resultValue,
00584              * because we never passed it to the trans function.)
00585              */
00586             if (!peraggstate->resultValueIsNull)
00587                 pfree(DatumGetPointer(peraggstate->resultValue));
00588 
00589             /*
00590              * If pass-by-ref, copy it into our aggregate context.
00591              */
00592             if (!*isnull)
00593             {
00594                 oldContext = MemoryContextSwitchTo(winstate->aggcontext);
00595                 peraggstate->resultValue =
00596                     datumCopy(*result,
00597                               peraggstate->resulttypeByVal,
00598                               peraggstate->resulttypeLen);
00599                 MemoryContextSwitchTo(oldContext);
00600             }
00601         }
00602         else
00603         {
00604             peraggstate->resultValue = *result;
00605         }
00606         peraggstate->resultValueIsNull = *isnull;
00607     }
00608 }
00609 
00610 /*
00611  * eval_windowfunction
00612  *
00613  * Arguments of window functions are not evaluated here, because a window
00614  * function can need random access to arbitrary rows in the partition.
00615  * The window function uses the special WinGetFuncArgInPartition and
00616  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
00617  * it wants.
00618  */
00619 static void
00620 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
00621                     Datum *result, bool *isnull)
00622 {
00623     FunctionCallInfoData fcinfo;
00624     MemoryContext oldContext;
00625 
00626     oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
00627 
00628     /*
00629      * We don't pass any normal arguments to a window function, but we do pass
00630      * it the number of arguments, in order to permit window function
00631      * implementations to support varying numbers of arguments.  The real info
00632      * goes through the WindowObject, which is passed via fcinfo->context.
00633      */
00634     InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
00635                              perfuncstate->numArguments,
00636                              perfuncstate->winCollation,
00637                              (void *) perfuncstate->winobj, NULL);
00638     /* Just in case, make all the regular argument slots be null */
00639     memset(fcinfo.argnull, true, perfuncstate->numArguments);
00640 
00641     *result = FunctionCallInvoke(&fcinfo);
00642     *isnull = fcinfo.isnull;
00643 
00644     /*
00645      * Make sure pass-by-ref data is allocated in the appropriate context. (We
00646      * need this in case the function returns a pointer into some short-lived
00647      * tuple, as is entirely possible.)
00648      */
00649     if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
00650         !MemoryContextContains(CurrentMemoryContext,
00651                                DatumGetPointer(*result)))
00652         *result = datumCopy(*result,
00653                             perfuncstate->resulttypeByVal,
00654                             perfuncstate->resulttypeLen);
00655 
00656     MemoryContextSwitchTo(oldContext);
00657 }
00658 
00659 /*
00660  * begin_partition
00661  * Start buffering rows of the next partition.
00662  */
00663 static void
00664 begin_partition(WindowAggState *winstate)
00665 {
00666     PlanState  *outerPlan = outerPlanState(winstate);
00667     int         numfuncs = winstate->numfuncs;
00668     int         i;
00669 
00670     winstate->partition_spooled = false;
00671     winstate->framehead_valid = false;
00672     winstate->frametail_valid = false;
00673     winstate->spooled_rows = 0;
00674     winstate->currentpos = 0;
00675     winstate->frameheadpos = 0;
00676     winstate->frametailpos = -1;
00677     ExecClearTuple(winstate->agg_row_slot);
00678 
00679     /*
00680      * If this is the very first partition, we need to fetch the first input
00681      * row to store in first_part_slot.
00682      */
00683     if (TupIsNull(winstate->first_part_slot))
00684     {
00685         TupleTableSlot *outerslot = ExecProcNode(outerPlan);
00686 
00687         if (!TupIsNull(outerslot))
00688             ExecCopySlot(winstate->first_part_slot, outerslot);
00689         else
00690         {
00691             /* outer plan is empty, so we have nothing to do */
00692             winstate->partition_spooled = true;
00693             winstate->more_partitions = false;
00694             return;
00695         }
00696     }
00697 
00698     /* Create new tuplestore for this partition */
00699     winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
00700 
00701     /*
00702      * Set up read pointers for the tuplestore.  The current pointer doesn't
00703      * need BACKWARD capability, but the per-window-function read pointers do,
00704      * and the aggregate pointer does if frame start is movable.
00705      */
00706     winstate->current_ptr = 0;  /* read pointer 0 is pre-allocated */
00707 
00708     /* reset default REWIND capability bit for current ptr */
00709     tuplestore_set_eflags(winstate->buffer, 0);
00710 
00711     /* create read pointers for aggregates, if needed */
00712     if (winstate->numaggs > 0)
00713     {
00714         WindowObject agg_winobj = winstate->agg_winobj;
00715         int         readptr_flags = 0;
00716 
00717         /* If the frame head is potentially movable ... */
00718         if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
00719         {
00720             /* ... create a mark pointer to track the frame head */
00721             agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
00722             /* and the read pointer will need BACKWARD capability */
00723             readptr_flags |= EXEC_FLAG_BACKWARD;
00724         }
00725 
00726         agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
00727                                                             readptr_flags);
00728         agg_winobj->markpos = -1;
00729         agg_winobj->seekpos = -1;
00730 
00731         /* Also reset the row counters for aggregates */
00732         winstate->aggregatedbase = 0;
00733         winstate->aggregatedupto = 0;
00734     }
00735 
00736     /* create mark and read pointers for each real window function */
00737     for (i = 0; i < numfuncs; i++)
00738     {
00739         WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
00740 
00741         if (!perfuncstate->plain_agg)
00742         {
00743             WindowObject winobj = perfuncstate->winobj;
00744 
00745             winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
00746                                                             0);
00747             winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
00748                                                          EXEC_FLAG_BACKWARD);
00749             winobj->markpos = -1;
00750             winobj->seekpos = -1;
00751         }
00752     }
00753 
00754     /*
00755      * Store the first tuple into the tuplestore (it's always available now;
00756      * we either read it above, or saved it at the end of previous partition)
00757      */
00758     tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
00759     winstate->spooled_rows++;
00760 }
00761 
00762 /*
00763  * Read tuples from the outer node, up to and including position 'pos', and
00764  * store them into the tuplestore. If pos is -1, reads the whole partition.
00765  */
00766 static void
00767 spool_tuples(WindowAggState *winstate, int64 pos)
00768 {
00769     WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
00770     PlanState  *outerPlan;
00771     TupleTableSlot *outerslot;
00772     MemoryContext oldcontext;
00773 
00774     if (!winstate->buffer)
00775         return;                 /* just a safety check */
00776     if (winstate->partition_spooled)
00777         return;                 /* whole partition done already */
00778 
00779     /*
00780      * If the tuplestore has spilled to disk, alternate reading and writing
00781      * becomes quite expensive due to frequent buffer flushes.  It's cheaper
00782      * to force the entire partition to get spooled in one go.
00783      *
00784      * XXX this is a horrid kluge --- it'd be better to fix the performance
00785      * problem inside tuplestore.  FIXME
00786      */
00787     if (!tuplestore_in_memory(winstate->buffer))
00788         pos = -1;
00789 
00790     outerPlan = outerPlanState(winstate);
00791 
00792     /* Must be in query context to call outerplan */
00793     oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
00794 
00795     while (winstate->spooled_rows <= pos || pos == -1)
00796     {
00797         outerslot = ExecProcNode(outerPlan);
00798         if (TupIsNull(outerslot))
00799         {
00800             /* reached the end of the last partition */
00801             winstate->partition_spooled = true;
00802             winstate->more_partitions = false;
00803             break;
00804         }
00805 
00806         if (node->partNumCols > 0)
00807         {
00808             /* Check if this tuple still belongs to the current partition */
00809             if (!execTuplesMatch(winstate->first_part_slot,
00810                                  outerslot,
00811                                  node->partNumCols, node->partColIdx,
00812                                  winstate->partEqfunctions,
00813                                  winstate->tmpcontext->ecxt_per_tuple_memory))
00814             {
00815                 /*
00816                  * end of partition; copy the tuple for the next cycle.
00817                  */
00818                 ExecCopySlot(winstate->first_part_slot, outerslot);
00819                 winstate->partition_spooled = true;
00820                 winstate->more_partitions = true;
00821                 break;
00822             }
00823         }
00824 
00825         /* Still in partition, so save it into the tuplestore */
00826         tuplestore_puttupleslot(winstate->buffer, outerslot);
00827         winstate->spooled_rows++;
00828     }
00829 
00830     MemoryContextSwitchTo(oldcontext);
00831 }
00832 
00833 /*
00834  * release_partition
00835  * clear information kept within a partition, including
00836  * tuplestore and aggregate results.
00837  */
00838 static void
00839 release_partition(WindowAggState *winstate)
00840 {
00841     int         i;
00842 
00843     for (i = 0; i < winstate->numfuncs; i++)
00844     {
00845         WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
00846 
00847         /* Release any partition-local state of this window function */
00848         if (perfuncstate->winobj)
00849             perfuncstate->winobj->localmem = NULL;
00850     }
00851 
00852     /*
00853      * Release all partition-local memory (in particular, any partition-local
00854      * state that we might have trashed our pointers to in the above loop, and
00855      * any aggregate temp data).  We don't rely on retail pfree because some
00856      * aggregates might have allocated data we don't have direct pointers to.
00857      */
00858     MemoryContextResetAndDeleteChildren(winstate->partcontext);
00859     MemoryContextResetAndDeleteChildren(winstate->aggcontext);
00860 
00861     if (winstate->buffer)
00862         tuplestore_end(winstate->buffer);
00863     winstate->buffer = NULL;
00864     winstate->partition_spooled = false;
00865 }
00866 
00867 /*
00868  * row_is_in_frame
00869  * Determine whether a row is in the current row's window frame according
00870  * to our window framing rule
00871  *
00872  * The caller must have already determined that the row is in the partition
00873  * and fetched it into a slot.  This function just encapsulates the framing
00874  * rules.
00875  */
00876 static bool
00877 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
00878 {
00879     int         frameOptions = winstate->frameOptions;
00880 
00881     Assert(pos >= 0);           /* else caller error */
00882 
00883     /* First, check frame starting conditions */
00884     if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
00885     {
00886         if (frameOptions & FRAMEOPTION_ROWS)
00887         {
00888             /* rows before current row are out of frame */
00889             if (pos < winstate->currentpos)
00890                 return false;
00891         }
00892         else if (frameOptions & FRAMEOPTION_RANGE)
00893         {
00894             /* preceding row that is not peer is out of frame */
00895             if (pos < winstate->currentpos &&
00896                 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
00897                 return false;
00898         }
00899         else
00900             Assert(false);
00901     }
00902     else if (frameOptions & FRAMEOPTION_START_VALUE)
00903     {
00904         if (frameOptions & FRAMEOPTION_ROWS)
00905         {
00906             int64       offset = DatumGetInt64(winstate->startOffsetValue);
00907 
00908             /* rows before current row + offset are out of frame */
00909             if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
00910                 offset = -offset;
00911 
00912             if (pos < winstate->currentpos + offset)
00913                 return false;
00914         }
00915         else if (frameOptions & FRAMEOPTION_RANGE)
00916         {
00917             /* parser should have rejected this */
00918             elog(ERROR, "window frame with value offset is not implemented");
00919         }
00920         else
00921             Assert(false);
00922     }
00923 
00924     /* Okay so far, now check frame ending conditions */
00925     if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
00926     {
00927         if (frameOptions & FRAMEOPTION_ROWS)
00928         {
00929             /* rows after current row are out of frame */
00930             if (pos > winstate->currentpos)
00931                 return false;
00932         }
00933         else if (frameOptions & FRAMEOPTION_RANGE)
00934         {
00935             /* following row that is not peer is out of frame */
00936             if (pos > winstate->currentpos &&
00937                 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
00938                 return false;
00939         }
00940         else
00941             Assert(false);
00942     }
00943     else if (frameOptions & FRAMEOPTION_END_VALUE)
00944     {
00945         if (frameOptions & FRAMEOPTION_ROWS)
00946         {
00947             int64       offset = DatumGetInt64(winstate->endOffsetValue);
00948 
00949             /* rows after current row + offset are out of frame */
00950             if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
00951                 offset = -offset;
00952 
00953             if (pos > winstate->currentpos + offset)
00954                 return false;
00955         }
00956         else if (frameOptions & FRAMEOPTION_RANGE)
00957         {
00958             /* parser should have rejected this */
00959             elog(ERROR, "window frame with value offset is not implemented");
00960         }
00961         else
00962             Assert(false);
00963     }
00964 
00965     /* If we get here, it's in frame */
00966     return true;
00967 }
00968 
00969 /*
00970  * update_frameheadpos
00971  * make frameheadpos valid for the current row
00972  *
00973  * Uses the winobj's read pointer for any required fetches; hence, if the
00974  * frame mode is one that requires row comparisons, the winobj's mark must
00975  * not be past the currently known frame head.  Also uses the specified slot
00976  * for any required fetches.
00977  */
00978 static void
00979 update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
00980 {
00981     WindowAggState *winstate = winobj->winstate;
00982     WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
00983     int         frameOptions = winstate->frameOptions;
00984 
00985     if (winstate->framehead_valid)
00986         return;                 /* already known for current row */
00987 
00988     if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
00989     {
00990         /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
00991         winstate->frameheadpos = 0;
00992         winstate->framehead_valid = true;
00993     }
00994     else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
00995     {
00996         if (frameOptions & FRAMEOPTION_ROWS)
00997         {
00998             /* In ROWS mode, frame head is the same as current */
00999             winstate->frameheadpos = winstate->currentpos;
01000             winstate->framehead_valid = true;
01001         }
01002         else if (frameOptions & FRAMEOPTION_RANGE)
01003         {
01004             int64       fhprev;
01005 
01006             /* If no ORDER BY, all rows are peers with each other */
01007             if (node->ordNumCols == 0)
01008             {
01009                 winstate->frameheadpos = 0;
01010                 winstate->framehead_valid = true;
01011                 return;
01012             }
01013 
01014             /*
01015              * In RANGE START_CURRENT mode, frame head is the first row that
01016              * is a peer of current row.  We search backwards from current,
01017              * which could be a bit inefficient if peer sets are large. Might
01018              * be better to have a separate read pointer that moves forward
01019              * tracking the frame head.
01020              */
01021             fhprev = winstate->currentpos - 1;
01022             for (;;)
01023             {
01024                 /* assume the frame head can't go backwards */
01025                 if (fhprev < winstate->frameheadpos)
01026                     break;
01027                 if (!window_gettupleslot(winobj, fhprev, slot))
01028                     break;      /* start of partition */
01029                 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
01030                     break;      /* not peer of current row */
01031                 fhprev--;
01032             }
01033             winstate->frameheadpos = fhprev + 1;
01034             winstate->framehead_valid = true;
01035         }
01036         else
01037             Assert(false);
01038     }
01039     else if (frameOptions & FRAMEOPTION_START_VALUE)
01040     {
01041         if (frameOptions & FRAMEOPTION_ROWS)
01042         {
01043             /* In ROWS mode, bound is physically n before/after current */
01044             int64       offset = DatumGetInt64(winstate->startOffsetValue);
01045 
01046             if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
01047                 offset = -offset;
01048 
01049             winstate->frameheadpos = winstate->currentpos + offset;
01050             /* frame head can't go before first row */
01051             if (winstate->frameheadpos < 0)
01052                 winstate->frameheadpos = 0;
01053             else if (winstate->frameheadpos > winstate->currentpos)
01054             {
01055                 /* make sure frameheadpos is not past end of partition */
01056                 spool_tuples(winstate, winstate->frameheadpos - 1);
01057                 if (winstate->frameheadpos > winstate->spooled_rows)
01058                     winstate->frameheadpos = winstate->spooled_rows;
01059             }
01060             winstate->framehead_valid = true;
01061         }
01062         else if (frameOptions & FRAMEOPTION_RANGE)
01063         {
01064             /* parser should have rejected this */
01065             elog(ERROR, "window frame with value offset is not implemented");
01066         }
01067         else
01068             Assert(false);
01069     }
01070     else
01071         Assert(false);
01072 }
01073 
01074 /*
01075  * update_frametailpos
01076  * make frametailpos valid for the current row
01077  *
01078  * Uses the winobj's read pointer for any required fetches; hence, if the
01079  * frame mode is one that requires row comparisons, the winobj's mark must
01080  * not be past the currently known frame tail.  Also uses the specified slot
01081  * for any required fetches.
01082  */
01083 static void
01084 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
01085 {
01086     WindowAggState *winstate = winobj->winstate;
01087     WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
01088     int         frameOptions = winstate->frameOptions;
01089 
01090     if (winstate->frametail_valid)
01091         return;                 /* already known for current row */
01092 
01093     if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
01094     {
01095         /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
01096         spool_tuples(winstate, -1);
01097         winstate->frametailpos = winstate->spooled_rows - 1;
01098         winstate->frametail_valid = true;
01099     }
01100     else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
01101     {
01102         if (frameOptions & FRAMEOPTION_ROWS)
01103         {
01104             /* In ROWS mode, exactly the rows up to current are in frame */
01105             winstate->frametailpos = winstate->currentpos;
01106             winstate->frametail_valid = true;
01107         }
01108         else if (frameOptions & FRAMEOPTION_RANGE)
01109         {
01110             int64       ftnext;
01111 
01112             /* If no ORDER BY, all rows are peers with each other */
01113             if (node->ordNumCols == 0)
01114             {
01115                 spool_tuples(winstate, -1);
01116                 winstate->frametailpos = winstate->spooled_rows - 1;
01117                 winstate->frametail_valid = true;
01118                 return;
01119             }
01120 
01121             /*
01122              * Else we have to search for the first non-peer of the current
01123              * row.  We assume the current value of frametailpos is a lower
01124              * bound on the possible frame tail location, ie, frame tail never
01125              * goes backward, and that currentpos is also a lower bound, ie,
01126              * frame end always >= current row.
01127              */
01128             ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
01129             for (;;)
01130             {
01131                 if (!window_gettupleslot(winobj, ftnext, slot))
01132                     break;      /* end of partition */
01133                 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
01134                     break;      /* not peer of current row */
01135                 ftnext++;
01136             }
01137             winstate->frametailpos = ftnext - 1;
01138             winstate->frametail_valid = true;
01139         }
01140         else
01141             Assert(false);
01142     }
01143     else if (frameOptions & FRAMEOPTION_END_VALUE)
01144     {
01145         if (frameOptions & FRAMEOPTION_ROWS)
01146         {
01147             /* In ROWS mode, bound is physically n before/after current */
01148             int64       offset = DatumGetInt64(winstate->endOffsetValue);
01149 
01150             if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
01151                 offset = -offset;
01152 
01153             winstate->frametailpos = winstate->currentpos + offset;
01154             /* smallest allowable value of frametailpos is -1 */
01155             if (winstate->frametailpos < 0)
01156                 winstate->frametailpos = -1;
01157             else if (winstate->frametailpos > winstate->currentpos)
01158             {
01159                 /* make sure frametailpos is not past last row of partition */
01160                 spool_tuples(winstate, winstate->frametailpos);
01161                 if (winstate->frametailpos >= winstate->spooled_rows)
01162                     winstate->frametailpos = winstate->spooled_rows - 1;
01163             }
01164             winstate->frametail_valid = true;
01165         }
01166         else if (frameOptions & FRAMEOPTION_RANGE)
01167         {
01168             /* parser should have rejected this */
01169             elog(ERROR, "window frame with value offset is not implemented");
01170         }
01171         else
01172             Assert(false);
01173     }
01174     else
01175         Assert(false);
01176 }
01177 
01178 
01179 /* -----------------
01180  * ExecWindowAgg
01181  *
01182  *  ExecWindowAgg receives tuples from its outer subplan and
01183  *  stores them into a tuplestore, then processes window functions.
01184  *  This node doesn't reduce nor qualify any row so the number of
01185  *  returned rows is exactly the same as its outer subplan's result
01186  *  (ignoring the case of SRFs in the targetlist, that is).
01187  * -----------------
01188  */
01189 TupleTableSlot *
01190 ExecWindowAgg(WindowAggState *winstate)
01191 {
01192     TupleTableSlot *result;
01193     ExprDoneCond isDone;
01194     ExprContext *econtext;
01195     int         i;
01196     int         numfuncs;
01197 
01198     if (winstate->all_done)
01199         return NULL;
01200 
01201     /*
01202      * Check to see if we're still projecting out tuples from a previous
01203      * output tuple (because there is a function-returning-set in the
01204      * projection expressions).  If so, try to project another one.
01205      */
01206     if (winstate->ss.ps.ps_TupFromTlist)
01207     {
01208         TupleTableSlot *result;
01209         ExprDoneCond isDone;
01210 
01211         result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
01212         if (isDone == ExprMultipleResult)
01213             return result;
01214         /* Done with that source tuple... */
01215         winstate->ss.ps.ps_TupFromTlist = false;
01216     }
01217 
01218     /*
01219      * Compute frame offset values, if any, during first call.
01220      */
01221     if (winstate->all_first)
01222     {
01223         int         frameOptions = winstate->frameOptions;
01224         ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
01225         Datum       value;
01226         bool        isnull;
01227         int16       len;
01228         bool        byval;
01229 
01230         if (frameOptions & FRAMEOPTION_START_VALUE)
01231         {
01232             Assert(winstate->startOffset != NULL);
01233             value = ExecEvalExprSwitchContext(winstate->startOffset,
01234                                               econtext,
01235                                               &isnull,
01236                                               NULL);
01237             if (isnull)
01238                 ereport(ERROR,
01239                         (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
01240                          errmsg("frame starting offset must not be null")));
01241             /* copy value into query-lifespan context */
01242             get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
01243                             &len, &byval);
01244             winstate->startOffsetValue = datumCopy(value, byval, len);
01245             if (frameOptions & FRAMEOPTION_ROWS)
01246             {
01247                 /* value is known to be int8 */
01248                 int64       offset = DatumGetInt64(value);
01249 
01250                 if (offset < 0)
01251                     ereport(ERROR,
01252                             (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01253                       errmsg("frame starting offset must not be negative")));
01254             }
01255         }
01256         if (frameOptions & FRAMEOPTION_END_VALUE)
01257         {
01258             Assert(winstate->endOffset != NULL);
01259             value = ExecEvalExprSwitchContext(winstate->endOffset,
01260                                               econtext,
01261                                               &isnull,
01262                                               NULL);
01263             if (isnull)
01264                 ereport(ERROR,
01265                         (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
01266                          errmsg("frame ending offset must not be null")));
01267             /* copy value into query-lifespan context */
01268             get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
01269                             &len, &byval);
01270             winstate->endOffsetValue = datumCopy(value, byval, len);
01271             if (frameOptions & FRAMEOPTION_ROWS)
01272             {
01273                 /* value is known to be int8 */
01274                 int64       offset = DatumGetInt64(value);
01275 
01276                 if (offset < 0)
01277                     ereport(ERROR,
01278                             (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01279                         errmsg("frame ending offset must not be negative")));
01280             }
01281         }
01282         winstate->all_first = false;
01283     }
01284 
01285 restart:
01286     if (winstate->buffer == NULL)
01287     {
01288         /* Initialize for first partition and set current row = 0 */
01289         begin_partition(winstate);
01290         /* If there are no input rows, we'll detect that and exit below */
01291     }
01292     else
01293     {
01294         /* Advance current row within partition */
01295         winstate->currentpos++;
01296         /* This might mean that the frame moves, too */
01297         winstate->framehead_valid = false;
01298         winstate->frametail_valid = false;
01299     }
01300 
01301     /*
01302      * Spool all tuples up to and including the current row, if we haven't
01303      * already
01304      */
01305     spool_tuples(winstate, winstate->currentpos);
01306 
01307     /* Move to the next partition if we reached the end of this partition */
01308     if (winstate->partition_spooled &&
01309         winstate->currentpos >= winstate->spooled_rows)
01310     {
01311         release_partition(winstate);
01312 
01313         if (winstate->more_partitions)
01314         {
01315             begin_partition(winstate);
01316             Assert(winstate->spooled_rows > 0);
01317         }
01318         else
01319         {
01320             winstate->all_done = true;
01321             return NULL;
01322         }
01323     }
01324 
01325     /* final output execution is in ps_ExprContext */
01326     econtext = winstate->ss.ps.ps_ExprContext;
01327 
01328     /* Clear the per-output-tuple context for current row */
01329     ResetExprContext(econtext);
01330 
01331     /*
01332      * Read the current row from the tuplestore, and save in ScanTupleSlot.
01333      * (We can't rely on the outerplan's output slot because we may have to
01334      * read beyond the current row.  Also, we have to actually copy the row
01335      * out of the tuplestore, since window function evaluation might cause the
01336      * tuplestore to dump its state to disk.)
01337      *
01338      * Current row must be in the tuplestore, since we spooled it above.
01339      */
01340     tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
01341     if (!tuplestore_gettupleslot(winstate->buffer, true, true,
01342                                  winstate->ss.ss_ScanTupleSlot))
01343         elog(ERROR, "unexpected end of tuplestore");
01344 
01345     /*
01346      * Evaluate true window functions
01347      */
01348     numfuncs = winstate->numfuncs;
01349     for (i = 0; i < numfuncs; i++)
01350     {
01351         WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
01352 
01353         if (perfuncstate->plain_agg)
01354             continue;
01355         eval_windowfunction(winstate, perfuncstate,
01356               &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
01357               &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
01358     }
01359 
01360     /*
01361      * Evaluate aggregates
01362      */
01363     if (winstate->numaggs > 0)
01364         eval_windowaggregates(winstate);
01365 
01366     /*
01367      * Truncate any no-longer-needed rows from the tuplestore.
01368      */
01369     tuplestore_trim(winstate->buffer);
01370 
01371     /*
01372      * Form and return a projection tuple using the windowfunc results and the
01373      * current row.  Setting ecxt_outertuple arranges that any Vars will be
01374      * evaluated with respect to that row.
01375      */
01376     econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
01377     result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
01378 
01379     if (isDone == ExprEndResult)
01380     {
01381         /* SRF in tlist returned no rows, so advance to next input tuple */
01382         goto restart;
01383     }
01384 
01385     winstate->ss.ps.ps_TupFromTlist =
01386         (isDone == ExprMultipleResult);
01387     return result;
01388 }
01389 
01390 /* -----------------
01391  * ExecInitWindowAgg
01392  *
01393  *  Creates the run-time information for the WindowAgg node produced by the
01394  *  planner and initializes its outer subtree
01395  * -----------------
01396  */
01397 WindowAggState *
01398 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
01399 {
01400     WindowAggState *winstate;
01401     Plan       *outerPlan;
01402     ExprContext *econtext;
01403     ExprContext *tmpcontext;
01404     WindowStatePerFunc perfunc;
01405     WindowStatePerAgg peragg;
01406     int         numfuncs,
01407                 wfuncno,
01408                 numaggs,
01409                 aggno;
01410     ListCell   *l;
01411 
01412     /* check for unsupported flags */
01413     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
01414 
01415     /*
01416      * create state structure
01417      */
01418     winstate = makeNode(WindowAggState);
01419     winstate->ss.ps.plan = (Plan *) node;
01420     winstate->ss.ps.state = estate;
01421 
01422     /*
01423      * Create expression contexts.  We need two, one for per-input-tuple
01424      * processing and one for per-output-tuple processing.  We cheat a little
01425      * by using ExecAssignExprContext() to build both.
01426      */
01427     ExecAssignExprContext(estate, &winstate->ss.ps);
01428     tmpcontext = winstate->ss.ps.ps_ExprContext;
01429     winstate->tmpcontext = tmpcontext;
01430     ExecAssignExprContext(estate, &winstate->ss.ps);
01431 
01432     /* Create long-lived context for storage of partition-local memory etc */
01433     winstate->partcontext =
01434         AllocSetContextCreate(CurrentMemoryContext,
01435                               "WindowAgg_Partition",
01436                               ALLOCSET_DEFAULT_MINSIZE,
01437                               ALLOCSET_DEFAULT_INITSIZE,
01438                               ALLOCSET_DEFAULT_MAXSIZE);
01439 
01440     /* Create mid-lived context for aggregate trans values etc */
01441     winstate->aggcontext =
01442         AllocSetContextCreate(CurrentMemoryContext,
01443                               "WindowAgg_Aggregates",
01444                               ALLOCSET_DEFAULT_MINSIZE,
01445                               ALLOCSET_DEFAULT_INITSIZE,
01446                               ALLOCSET_DEFAULT_MAXSIZE);
01447 
01448     /*
01449      * tuple table initialization
01450      */
01451     ExecInitScanTupleSlot(estate, &winstate->ss);
01452     ExecInitResultTupleSlot(estate, &winstate->ss.ps);
01453     winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
01454     winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
01455     winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
01456     winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
01457 
01458     winstate->ss.ps.targetlist = (List *)
01459         ExecInitExpr((Expr *) node->plan.targetlist,
01460                      (PlanState *) winstate);
01461 
01462     /*
01463      * WindowAgg nodes never have quals, since they can only occur at the
01464      * logical top level of a query (ie, after any WHERE or HAVING filters)
01465      */
01466     Assert(node->plan.qual == NIL);
01467     winstate->ss.ps.qual = NIL;
01468 
01469     /*
01470      * initialize child nodes
01471      */
01472     outerPlan = outerPlan(node);
01473     outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
01474 
01475     /*
01476      * initialize source tuple type (which is also the tuple type that we'll
01477      * store in the tuplestore and use in all our working slots).
01478      */
01479     ExecAssignScanTypeFromOuterPlan(&winstate->ss);
01480 
01481     ExecSetSlotDescriptor(winstate->first_part_slot,
01482                           winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
01483     ExecSetSlotDescriptor(winstate->agg_row_slot,
01484                           winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
01485     ExecSetSlotDescriptor(winstate->temp_slot_1,
01486                           winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
01487     ExecSetSlotDescriptor(winstate->temp_slot_2,
01488                           winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
01489 
01490     /*
01491      * Initialize result tuple type and projection info.
01492      */
01493     ExecAssignResultTypeFromTL(&winstate->ss.ps);
01494     ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
01495 
01496     winstate->ss.ps.ps_TupFromTlist = false;
01497 
01498     /* Set up data for comparing tuples */
01499     if (node->partNumCols > 0)
01500         winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
01501                                                         node->partOperators);
01502     if (node->ordNumCols > 0)
01503         winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
01504                                                           node->ordOperators);
01505 
01506     /*
01507      * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
01508      */
01509     numfuncs = winstate->numfuncs;
01510     numaggs = winstate->numaggs;
01511     econtext = winstate->ss.ps.ps_ExprContext;
01512     econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
01513     econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
01514 
01515     /*
01516      * allocate per-wfunc/per-agg state information.
01517      */
01518     perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
01519     peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
01520     winstate->perfunc = perfunc;
01521     winstate->peragg = peragg;
01522 
01523     wfuncno = -1;
01524     aggno = -1;
01525     foreach(l, winstate->funcs)
01526     {
01527         WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
01528         WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
01529         WindowStatePerFunc perfuncstate;
01530         AclResult   aclresult;
01531         int         i;
01532 
01533         if (wfunc->winref != node->winref)      /* planner screwed up? */
01534             elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
01535                  wfunc->winref, node->winref);
01536 
01537         /* Look for a previous duplicate window function */
01538         for (i = 0; i <= wfuncno; i++)
01539         {
01540             if (equal(wfunc, perfunc[i].wfunc) &&
01541                 !contain_volatile_functions((Node *) wfunc))
01542                 break;
01543         }
01544         if (i <= wfuncno)
01545         {
01546             /* Found a match to an existing entry, so just mark it */
01547             wfuncstate->wfuncno = i;
01548             continue;
01549         }
01550 
01551         /* Nope, so assign a new PerAgg record */
01552         perfuncstate = &perfunc[++wfuncno];
01553 
01554         /* Mark WindowFunc state node with assigned index in the result array */
01555         wfuncstate->wfuncno = wfuncno;
01556 
01557         /* Check permission to call window function */
01558         aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
01559                                      ACL_EXECUTE);
01560         if (aclresult != ACLCHECK_OK)
01561             aclcheck_error(aclresult, ACL_KIND_PROC,
01562                            get_func_name(wfunc->winfnoid));
01563         InvokeFunctionExecuteHook(wfunc->winfnoid);
01564 
01565         /* Fill in the perfuncstate data */
01566         perfuncstate->wfuncstate = wfuncstate;
01567         perfuncstate->wfunc = wfunc;
01568         perfuncstate->numArguments = list_length(wfuncstate->args);
01569 
01570         fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
01571                       econtext->ecxt_per_query_memory);
01572         fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
01573 
01574         perfuncstate->winCollation = wfunc->inputcollid;
01575 
01576         get_typlenbyval(wfunc->wintype,
01577                         &perfuncstate->resulttypeLen,
01578                         &perfuncstate->resulttypeByVal);
01579 
01580         /*
01581          * If it's really just a plain aggregate function, we'll emulate the
01582          * Agg environment for it.
01583          */
01584         perfuncstate->plain_agg = wfunc->winagg;
01585         if (wfunc->winagg)
01586         {
01587             WindowStatePerAgg peraggstate;
01588 
01589             perfuncstate->aggno = ++aggno;
01590             peraggstate = &winstate->peragg[aggno];
01591             initialize_peragg(winstate, wfunc, peraggstate);
01592             peraggstate->wfuncno = wfuncno;
01593         }
01594         else
01595         {
01596             WindowObject winobj = makeNode(WindowObjectData);
01597 
01598             winobj->winstate = winstate;
01599             winobj->argstates = wfuncstate->args;
01600             winobj->localmem = NULL;
01601             perfuncstate->winobj = winobj;
01602         }
01603     }
01604 
01605     /* Update numfuncs, numaggs to match number of unique functions found */
01606     winstate->numfuncs = wfuncno + 1;
01607     winstate->numaggs = aggno + 1;
01608 
01609     /* Set up WindowObject for aggregates, if needed */
01610     if (winstate->numaggs > 0)
01611     {
01612         WindowObject agg_winobj = makeNode(WindowObjectData);
01613 
01614         agg_winobj->winstate = winstate;
01615         agg_winobj->argstates = NIL;
01616         agg_winobj->localmem = NULL;
01617         /* make sure markptr = -1 to invalidate. It may not get used */
01618         agg_winobj->markptr = -1;
01619         agg_winobj->readptr = -1;
01620         winstate->agg_winobj = agg_winobj;
01621     }
01622 
01623     /* copy frame options to state node for easy access */
01624     winstate->frameOptions = node->frameOptions;
01625 
01626     /* initialize frame bound offset expressions */
01627     winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
01628                                          (PlanState *) winstate);
01629     winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
01630                                        (PlanState *) winstate);
01631 
01632     winstate->all_first = true;
01633     winstate->partition_spooled = false;
01634     winstate->more_partitions = false;
01635 
01636     return winstate;
01637 }
01638 
01639 /* -----------------
01640  * ExecEndWindowAgg
01641  * -----------------
01642  */
01643 void
01644 ExecEndWindowAgg(WindowAggState *node)
01645 {
01646     PlanState  *outerPlan;
01647 
01648     release_partition(node);
01649 
01650     pfree(node->perfunc);
01651     pfree(node->peragg);
01652 
01653     ExecClearTuple(node->ss.ss_ScanTupleSlot);
01654     ExecClearTuple(node->first_part_slot);
01655     ExecClearTuple(node->agg_row_slot);
01656     ExecClearTuple(node->temp_slot_1);
01657     ExecClearTuple(node->temp_slot_2);
01658 
01659     /*
01660      * Free both the expr contexts.
01661      */
01662     ExecFreeExprContext(&node->ss.ps);
01663     node->ss.ps.ps_ExprContext = node->tmpcontext;
01664     ExecFreeExprContext(&node->ss.ps);
01665 
01666     MemoryContextDelete(node->partcontext);
01667     MemoryContextDelete(node->aggcontext);
01668 
01669     outerPlan = outerPlanState(node);
01670     ExecEndNode(outerPlan);
01671 }
01672 
01673 /* -----------------
01674  * ExecReScanWindowAgg
01675  * -----------------
01676  */
01677 void
01678 ExecReScanWindowAgg(WindowAggState *node)
01679 {
01680     ExprContext *econtext = node->ss.ps.ps_ExprContext;
01681 
01682     node->all_done = false;
01683 
01684     node->ss.ps.ps_TupFromTlist = false;
01685     node->all_first = true;
01686 
01687     /* release tuplestore et al */
01688     release_partition(node);
01689 
01690     /* release all temp tuples, but especially first_part_slot */
01691     ExecClearTuple(node->ss.ss_ScanTupleSlot);
01692     ExecClearTuple(node->first_part_slot);
01693     ExecClearTuple(node->agg_row_slot);
01694     ExecClearTuple(node->temp_slot_1);
01695     ExecClearTuple(node->temp_slot_2);
01696 
01697     /* Forget current wfunc values */
01698     MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
01699     MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
01700 
01701     /*
01702      * if chgParam of subnode is not null then plan will be re-scanned by
01703      * first ExecProcNode.
01704      */
01705     if (node->ss.ps.lefttree->chgParam == NULL)
01706         ExecReScan(node->ss.ps.lefttree);
01707 }
01708 
01709 /*
01710  * initialize_peragg
01711  *
01712  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
01713  */
01714 static WindowStatePerAggData *
01715 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
01716                   WindowStatePerAgg peraggstate)
01717 {
01718     Oid         inputTypes[FUNC_MAX_ARGS];
01719     int         numArguments;
01720     HeapTuple   aggTuple;
01721     Form_pg_aggregate aggform;
01722     Oid         aggtranstype;
01723     AclResult   aclresult;
01724     Oid         transfn_oid,
01725                 finalfn_oid;
01726     Expr       *transfnexpr,
01727                *finalfnexpr;
01728     Datum       textInitVal;
01729     int         i;
01730     ListCell   *lc;
01731 
01732     numArguments = list_length(wfunc->args);
01733 
01734     i = 0;
01735     foreach(lc, wfunc->args)
01736     {
01737         inputTypes[i++] = exprType((Node *) lfirst(lc));
01738     }
01739 
01740     aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
01741     if (!HeapTupleIsValid(aggTuple))
01742         elog(ERROR, "cache lookup failed for aggregate %u",
01743              wfunc->winfnoid);
01744     aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
01745 
01746     /*
01747      * ExecInitWindowAgg already checked permission to call aggregate function
01748      * ... but we still need to check the component functions
01749      */
01750 
01751     peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
01752     peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
01753 
01754     /* Check that aggregate owner has permission to call component fns */
01755     {
01756         HeapTuple   procTuple;
01757         Oid         aggOwner;
01758 
01759         procTuple = SearchSysCache1(PROCOID,
01760                                     ObjectIdGetDatum(wfunc->winfnoid));
01761         if (!HeapTupleIsValid(procTuple))
01762             elog(ERROR, "cache lookup failed for function %u",
01763                  wfunc->winfnoid);
01764         aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
01765         ReleaseSysCache(procTuple);
01766 
01767         aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
01768                                      ACL_EXECUTE);
01769         if (aclresult != ACLCHECK_OK)
01770             aclcheck_error(aclresult, ACL_KIND_PROC,
01771                            get_func_name(transfn_oid));
01772         InvokeFunctionExecuteHook(transfn_oid);
01773         if (OidIsValid(finalfn_oid))
01774         {
01775             aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
01776                                          ACL_EXECUTE);
01777             if (aclresult != ACLCHECK_OK)
01778                 aclcheck_error(aclresult, ACL_KIND_PROC,
01779                                get_func_name(finalfn_oid));
01780             InvokeFunctionExecuteHook(finalfn_oid);
01781         }
01782     }
01783 
01784     /* resolve actual type of transition state, if polymorphic */
01785     aggtranstype = aggform->aggtranstype;
01786     if (IsPolymorphicType(aggtranstype))
01787     {
01788         /* have to fetch the agg's declared input types... */
01789         Oid        *declaredArgTypes;
01790         int         agg_nargs;
01791 
01792         get_func_signature(wfunc->winfnoid,
01793                            &declaredArgTypes, &agg_nargs);
01794         Assert(agg_nargs == numArguments);
01795         aggtranstype = enforce_generic_type_consistency(inputTypes,
01796                                                         declaredArgTypes,
01797                                                         agg_nargs,
01798                                                         aggtranstype,
01799                                                         false);
01800         pfree(declaredArgTypes);
01801     }
01802 
01803     /* build expression trees using actual argument & result types */
01804     build_aggregate_fnexprs(inputTypes,
01805                             numArguments,
01806                             aggtranstype,
01807                             wfunc->wintype,
01808                             wfunc->inputcollid,
01809                             transfn_oid,
01810                             finalfn_oid,
01811                             &transfnexpr,
01812                             &finalfnexpr);
01813 
01814     fmgr_info(transfn_oid, &peraggstate->transfn);
01815     fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
01816 
01817     if (OidIsValid(finalfn_oid))
01818     {
01819         fmgr_info(finalfn_oid, &peraggstate->finalfn);
01820         fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
01821     }
01822 
01823     get_typlenbyval(wfunc->wintype,
01824                     &peraggstate->resulttypeLen,
01825                     &peraggstate->resulttypeByVal);
01826     get_typlenbyval(aggtranstype,
01827                     &peraggstate->transtypeLen,
01828                     &peraggstate->transtypeByVal);
01829 
01830     /*
01831      * initval is potentially null, so don't try to access it as a struct
01832      * field. Must do it the hard way with SysCacheGetAttr.
01833      */
01834     textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
01835                                   Anum_pg_aggregate_agginitval,
01836                                   &peraggstate->initValueIsNull);
01837 
01838     if (peraggstate->initValueIsNull)
01839         peraggstate->initValue = (Datum) 0;
01840     else
01841         peraggstate->initValue = GetAggInitVal(textInitVal,
01842                                                aggtranstype);
01843 
01844     /*
01845      * If the transfn is strict and the initval is NULL, make sure input type
01846      * and transtype are the same (or at least binary-compatible), so that
01847      * it's OK to use the first input value as the initial transValue.  This
01848      * should have been checked at agg definition time, but just in case...
01849      */
01850     if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
01851     {
01852         if (numArguments < 1 ||
01853             !IsBinaryCoercible(inputTypes[0], aggtranstype))
01854             ereport(ERROR,
01855                     (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01856                      errmsg("aggregate %u needs to have compatible input type and transition type",
01857                             wfunc->winfnoid)));
01858     }
01859 
01860     ReleaseSysCache(aggTuple);
01861 
01862     return peraggstate;
01863 }
01864 
01865 static Datum
01866 GetAggInitVal(Datum textInitVal, Oid transtype)
01867 {
01868     Oid         typinput,
01869                 typioparam;
01870     char       *strInitVal;
01871     Datum       initVal;
01872 
01873     getTypeInputInfo(transtype, &typinput, &typioparam);
01874     strInitVal = TextDatumGetCString(textInitVal);
01875     initVal = OidInputFunctionCall(typinput, strInitVal,
01876                                    typioparam, -1);
01877     pfree(strInitVal);
01878     return initVal;
01879 }
01880 
01881 /*
01882  * are_peers
01883  * compare two rows to see if they are equal according to the ORDER BY clause
01884  *
01885  * NB: this does not consider the window frame mode.
01886  */
01887 static bool
01888 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
01889           TupleTableSlot *slot2)
01890 {
01891     WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
01892 
01893     /* If no ORDER BY, all rows are peers with each other */
01894     if (node->ordNumCols == 0)
01895         return true;
01896 
01897     return execTuplesMatch(slot1, slot2,
01898                            node->ordNumCols, node->ordColIdx,
01899                            winstate->ordEqfunctions,
01900                            winstate->tmpcontext->ecxt_per_tuple_memory);
01901 }
01902 
01903 /*
01904  * window_gettupleslot
01905  *  Fetch the pos'th tuple of the current partition into the slot,
01906  *  using the winobj's read pointer
01907  *
01908  * Returns true if successful, false if no such row
01909  */
01910 static bool
01911 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
01912 {
01913     WindowAggState *winstate = winobj->winstate;
01914     MemoryContext oldcontext;
01915 
01916     /* Don't allow passing -1 to spool_tuples here */
01917     if (pos < 0)
01918         return false;
01919 
01920     /* If necessary, fetch the tuple into the spool */
01921     spool_tuples(winstate, pos);
01922 
01923     if (pos >= winstate->spooled_rows)
01924         return false;
01925 
01926     if (pos < winobj->markpos)
01927         elog(ERROR, "cannot fetch row before WindowObject's mark position");
01928 
01929     oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
01930 
01931     tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
01932 
01933     /*
01934      * There's no API to refetch the tuple at the current position. We have to
01935      * move one tuple forward, and then one backward.  (We don't do it the
01936      * other way because we might try to fetch the row before our mark, which
01937      * isn't allowed.)  XXX this case could stand to be optimized.
01938      */
01939     if (winobj->seekpos == pos)
01940     {
01941         tuplestore_advance(winstate->buffer, true);
01942         winobj->seekpos++;
01943     }
01944 
01945     while (winobj->seekpos > pos)
01946     {
01947         if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
01948             elog(ERROR, "unexpected end of tuplestore");
01949         winobj->seekpos--;
01950     }
01951 
01952     while (winobj->seekpos < pos)
01953     {
01954         if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
01955             elog(ERROR, "unexpected end of tuplestore");
01956         winobj->seekpos++;
01957     }
01958 
01959     MemoryContextSwitchTo(oldcontext);
01960 
01961     return true;
01962 }
01963 
01964 
01965 /***********************************************************************
01966  * API exposed to window functions
01967  ***********************************************************************/
01968 
01969 
01970 /*
01971  * WinGetPartitionLocalMemory
01972  *      Get working memory that lives till end of partition processing
01973  *
01974  * On first call within a given partition, this allocates and zeroes the
01975  * requested amount of space.  Subsequent calls just return the same chunk.
01976  *
01977  * Memory obtained this way is normally used to hold state that should be
01978  * automatically reset for each new partition.  If a window function wants
01979  * to hold state across the whole query, fcinfo->fn_extra can be used in the
01980  * usual way for that.
01981  */
01982 void *
01983 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
01984 {
01985     Assert(WindowObjectIsValid(winobj));
01986     if (winobj->localmem == NULL)
01987         winobj->localmem =
01988             MemoryContextAllocZero(winobj->winstate->partcontext, sz);
01989     return winobj->localmem;
01990 }
01991 
01992 /*
01993  * WinGetCurrentPosition
01994  *      Return the current row's position (counting from 0) within the current
01995  *      partition.
01996  */
01997 int64
01998 WinGetCurrentPosition(WindowObject winobj)
01999 {
02000     Assert(WindowObjectIsValid(winobj));
02001     return winobj->winstate->currentpos;
02002 }
02003 
02004 /*
02005  * WinGetPartitionRowCount
02006  *      Return total number of rows contained in the current partition.
02007  *
02008  * Note: this is a relatively expensive operation because it forces the
02009  * whole partition to be "spooled" into the tuplestore at once.  Once
02010  * executed, however, additional calls within the same partition are cheap.
02011  */
02012 int64
02013 WinGetPartitionRowCount(WindowObject winobj)
02014 {
02015     Assert(WindowObjectIsValid(winobj));
02016     spool_tuples(winobj->winstate, -1);
02017     return winobj->winstate->spooled_rows;
02018 }
02019 
02020 /*
02021  * WinSetMarkPosition
02022  *      Set the "mark" position for the window object, which is the oldest row
02023  *      number (counting from 0) it is allowed to fetch during all subsequent
02024  *      operations within the current partition.
02025  *
02026  * Window functions do not have to call this, but are encouraged to move the
02027  * mark forward when possible to keep the tuplestore size down and prevent
02028  * having to spill rows to disk.
02029  */
02030 void
02031 WinSetMarkPosition(WindowObject winobj, int64 markpos)
02032 {
02033     WindowAggState *winstate;
02034 
02035     Assert(WindowObjectIsValid(winobj));
02036     winstate = winobj->winstate;
02037 
02038     if (markpos < winobj->markpos)
02039         elog(ERROR, "cannot move WindowObject's mark position backward");
02040     tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
02041     while (markpos > winobj->markpos)
02042     {
02043         tuplestore_advance(winstate->buffer, true);
02044         winobj->markpos++;
02045     }
02046     tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
02047     while (markpos > winobj->seekpos)
02048     {
02049         tuplestore_advance(winstate->buffer, true);
02050         winobj->seekpos++;
02051     }
02052 }
02053 
02054 /*
02055  * WinRowsArePeers
02056  *      Compare two rows (specified by absolute position in window) to see
02057  *      if they are equal according to the ORDER BY clause.
02058  *
02059  * NB: this does not consider the window frame mode.
02060  */
02061 bool
02062 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
02063 {
02064     WindowAggState *winstate;
02065     WindowAgg  *node;
02066     TupleTableSlot *slot1;
02067     TupleTableSlot *slot2;
02068     bool        res;
02069 
02070     Assert(WindowObjectIsValid(winobj));
02071     winstate = winobj->winstate;
02072     node = (WindowAgg *) winstate->ss.ps.plan;
02073 
02074     /* If no ORDER BY, all rows are peers; don't bother to fetch them */
02075     if (node->ordNumCols == 0)
02076         return true;
02077 
02078     slot1 = winstate->temp_slot_1;
02079     slot2 = winstate->temp_slot_2;
02080 
02081     if (!window_gettupleslot(winobj, pos1, slot1))
02082         elog(ERROR, "specified position is out of window: " INT64_FORMAT,
02083              pos1);
02084     if (!window_gettupleslot(winobj, pos2, slot2))
02085         elog(ERROR, "specified position is out of window: " INT64_FORMAT,
02086              pos2);
02087 
02088     res = are_peers(winstate, slot1, slot2);
02089 
02090     ExecClearTuple(slot1);
02091     ExecClearTuple(slot2);
02092 
02093     return res;
02094 }
02095 
02096 /*
02097  * WinGetFuncArgInPartition
02098  *      Evaluate a window function's argument expression on a specified
02099  *      row of the partition.  The row is identified in lseek(2) style,
02100  *      i.e. relative to the current, first, or last row.
02101  *
02102  * argno: argument number to evaluate (counted from 0)
02103  * relpos: signed rowcount offset from the seek position
02104  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
02105  * set_mark: If the row is found and set_mark is true, the mark is moved to
02106  *      the row as a side-effect.
02107  * isnull: output argument, receives isnull status of result
02108  * isout: output argument, set to indicate whether target row position
02109  *      is out of partition (can pass NULL if caller doesn't care about this)
02110  *
02111  * Specifying a nonexistent row is not an error, it just causes a null result
02112  * (plus setting *isout true, if isout isn't NULL).
02113  */
02114 Datum
02115 WinGetFuncArgInPartition(WindowObject winobj, int argno,
02116                          int relpos, int seektype, bool set_mark,
02117                          bool *isnull, bool *isout)
02118 {
02119     WindowAggState *winstate;
02120     ExprContext *econtext;
02121     TupleTableSlot *slot;
02122     bool        gottuple;
02123     int64       abs_pos;
02124 
02125     Assert(WindowObjectIsValid(winobj));
02126     winstate = winobj->winstate;
02127     econtext = winstate->ss.ps.ps_ExprContext;
02128     slot = winstate->temp_slot_1;
02129 
02130     switch (seektype)
02131     {
02132         case WINDOW_SEEK_CURRENT:
02133             abs_pos = winstate->currentpos + relpos;
02134             break;
02135         case WINDOW_SEEK_HEAD:
02136             abs_pos = relpos;
02137             break;
02138         case WINDOW_SEEK_TAIL:
02139             spool_tuples(winstate, -1);
02140             abs_pos = winstate->spooled_rows - 1 + relpos;
02141             break;
02142         default:
02143             elog(ERROR, "unrecognized window seek type: %d", seektype);
02144             abs_pos = 0;        /* keep compiler quiet */
02145             break;
02146     }
02147 
02148     gottuple = window_gettupleslot(winobj, abs_pos, slot);
02149 
02150     if (!gottuple)
02151     {
02152         if (isout)
02153             *isout = true;
02154         *isnull = true;
02155         return (Datum) 0;
02156     }
02157     else
02158     {
02159         if (isout)
02160             *isout = false;
02161         if (set_mark)
02162         {
02163             int         frameOptions = winstate->frameOptions;
02164             int64       mark_pos = abs_pos;
02165 
02166             /*
02167              * In RANGE mode with a moving frame head, we must not let the
02168              * mark advance past frameheadpos, since that row has to be
02169              * fetchable during future update_frameheadpos calls.
02170              *
02171              * XXX it is very ugly to pollute window functions' marks with
02172              * this consideration; it could for instance mask a logic bug that
02173              * lets a window function fetch rows before what it had claimed
02174              * was its mark.  Perhaps use a separate mark for frame head
02175              * probes?
02176              */
02177             if ((frameOptions & FRAMEOPTION_RANGE) &&
02178                 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
02179             {
02180                 update_frameheadpos(winobj, winstate->temp_slot_2);
02181                 if (mark_pos > winstate->frameheadpos)
02182                     mark_pos = winstate->frameheadpos;
02183             }
02184             WinSetMarkPosition(winobj, mark_pos);
02185         }
02186         econtext->ecxt_outertuple = slot;
02187         return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
02188                             econtext, isnull, NULL);
02189     }
02190 }
02191 
02192 /*
02193  * WinGetFuncArgInFrame
02194  *      Evaluate a window function's argument expression on a specified
02195  *      row of the window frame.  The row is identified in lseek(2) style,
02196  *      i.e. relative to the current, first, or last row.
02197  *
02198  * argno: argument number to evaluate (counted from 0)
02199  * relpos: signed rowcount offset from the seek position
02200  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
02201  * set_mark: If the row is found and set_mark is true, the mark is moved to
02202  *      the row as a side-effect.
02203  * isnull: output argument, receives isnull status of result
02204  * isout: output argument, set to indicate whether target row position
02205  *      is out of frame (can pass NULL if caller doesn't care about this)
02206  *
02207  * Specifying a nonexistent row is not an error, it just causes a null result
02208  * (plus setting *isout true, if isout isn't NULL).
02209  */
02210 Datum
02211 WinGetFuncArgInFrame(WindowObject winobj, int argno,
02212                      int relpos, int seektype, bool set_mark,
02213                      bool *isnull, bool *isout)
02214 {
02215     WindowAggState *winstate;
02216     ExprContext *econtext;
02217     TupleTableSlot *slot;
02218     bool        gottuple;
02219     int64       abs_pos;
02220 
02221     Assert(WindowObjectIsValid(winobj));
02222     winstate = winobj->winstate;
02223     econtext = winstate->ss.ps.ps_ExprContext;
02224     slot = winstate->temp_slot_1;
02225 
02226     switch (seektype)
02227     {
02228         case WINDOW_SEEK_CURRENT:
02229             abs_pos = winstate->currentpos + relpos;
02230             break;
02231         case WINDOW_SEEK_HEAD:
02232             update_frameheadpos(winobj, slot);
02233             abs_pos = winstate->frameheadpos + relpos;
02234             break;
02235         case WINDOW_SEEK_TAIL:
02236             update_frametailpos(winobj, slot);
02237             abs_pos = winstate->frametailpos + relpos;
02238             break;
02239         default:
02240             elog(ERROR, "unrecognized window seek type: %d", seektype);
02241             abs_pos = 0;        /* keep compiler quiet */
02242             break;
02243     }
02244 
02245     gottuple = window_gettupleslot(winobj, abs_pos, slot);
02246     if (gottuple)
02247         gottuple = row_is_in_frame(winstate, abs_pos, slot);
02248 
02249     if (!gottuple)
02250     {
02251         if (isout)
02252             *isout = true;
02253         *isnull = true;
02254         return (Datum) 0;
02255     }
02256     else
02257     {
02258         if (isout)
02259             *isout = false;
02260         if (set_mark)
02261         {
02262             int         frameOptions = winstate->frameOptions;
02263             int64       mark_pos = abs_pos;
02264 
02265             /*
02266              * In RANGE mode with a moving frame head, we must not let the
02267              * mark advance past frameheadpos, since that row has to be
02268              * fetchable during future update_frameheadpos calls.
02269              *
02270              * XXX it is very ugly to pollute window functions' marks with
02271              * this consideration; it could for instance mask a logic bug that
02272              * lets a window function fetch rows before what it had claimed
02273              * was its mark.  Perhaps use a separate mark for frame head
02274              * probes?
02275              */
02276             if ((frameOptions & FRAMEOPTION_RANGE) &&
02277                 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
02278             {
02279                 update_frameheadpos(winobj, winstate->temp_slot_2);
02280                 if (mark_pos > winstate->frameheadpos)
02281                     mark_pos = winstate->frameheadpos;
02282             }
02283             WinSetMarkPosition(winobj, mark_pos);
02284         }
02285         econtext->ecxt_outertuple = slot;
02286         return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
02287                             econtext, isnull, NULL);
02288     }
02289 }
02290 
02291 /*
02292  * WinGetFuncArgCurrent
02293  *      Evaluate a window function's argument expression on the current row.
02294  *
02295  * argno: argument number to evaluate (counted from 0)
02296  * isnull: output argument, receives isnull status of result
02297  *
02298  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
02299  * WinGetFuncArgInFrame targeting the current row, because it will succeed
02300  * even if the WindowObject's mark has been set beyond the current row.
02301  * This should generally be used for "ordinary" arguments of a window
02302  * function, such as the offset argument of lead() or lag().
02303  */
02304 Datum
02305 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
02306 {
02307     WindowAggState *winstate;
02308     ExprContext *econtext;
02309 
02310     Assert(WindowObjectIsValid(winobj));
02311     winstate = winobj->winstate;
02312 
02313     econtext = winstate->ss.ps.ps_ExprContext;
02314 
02315     econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
02316     return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
02317                         econtext, isnull, NULL);
02318 }