Header And Logo

PostgreSQL
| The world's most advanced open source database.

nodeMergeAppend.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * nodeMergeAppend.c
00004  *    routines to handle MergeAppend nodes.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/executor/nodeMergeAppend.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 /* INTERFACE ROUTINES
00016  *      ExecInitMergeAppend     - initialize the MergeAppend node
00017  *      ExecMergeAppend         - retrieve the next tuple from the node
00018  *      ExecEndMergeAppend      - shut down the MergeAppend node
00019  *      ExecReScanMergeAppend   - rescan the MergeAppend node
00020  *
00021  *   NOTES
00022  *      A MergeAppend node contains a list of one or more subplans.
00023  *      These are each expected to deliver tuples that are sorted according
00024  *      to a common sort key.  The MergeAppend node merges these streams
00025  *      to produce output sorted the same way.
00026  *
00027  *      MergeAppend nodes don't make use of their left and right
00028  *      subtrees, rather they maintain a list of subplans so
00029  *      a typical MergeAppend node looks like this in the plan tree:
00030  *
00031  *                 ...
00032  *                 /
00033  *              MergeAppend---+------+------+--- nil
00034  *              /   \         |      |      |
00035  *            nil   nil      ...    ...    ...
00036  *                               subplans
00037  */
00038 
00039 #include "postgres.h"
00040 
00041 #include "executor/execdebug.h"
00042 #include "executor/nodeMergeAppend.h"
00043 
00044 #include "lib/binaryheap.h"
00045 
00046 /*
00047  * We have one slot for each item in the heap array.  We use SlotNumber
00048  * to store slot indexes.  This doesn't actually provide any formal
00049  * type-safety, but it makes the code more self-documenting.
00050  */
00051 typedef int32 SlotNumber;
00052 
00053 static int  heap_compare_slots(Datum a, Datum b, void *arg);
00054 
00055 
00056 /* ----------------------------------------------------------------
00057  *      ExecInitMergeAppend
00058  *
00059  *      Begin all of the subscans of the MergeAppend node.
00060  * ----------------------------------------------------------------
00061  */
00062 MergeAppendState *
00063 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
00064 {
00065     MergeAppendState *mergestate = makeNode(MergeAppendState);
00066     PlanState **mergeplanstates;
00067     int         nplans;
00068     int         i;
00069     ListCell   *lc;
00070 
00071     /* check for unsupported flags */
00072     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
00073 
00074     /*
00075      * Set up empty vector of subplan states
00076      */
00077     nplans = list_length(node->mergeplans);
00078 
00079     mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
00080 
00081     /*
00082      * create new MergeAppendState for our node
00083      */
00084     mergestate->ps.plan = (Plan *) node;
00085     mergestate->ps.state = estate;
00086     mergestate->mergeplans = mergeplanstates;
00087     mergestate->ms_nplans = nplans;
00088 
00089     mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
00090     mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
00091                                               mergestate);
00092 
00093     /*
00094      * Miscellaneous initialization
00095      *
00096      * MergeAppend plans don't have expression contexts because they never
00097      * call ExecQual or ExecProject.
00098      */
00099 
00100     /*
00101      * MergeAppend nodes do have Result slots, which hold pointers to tuples,
00102      * so we have to initialize them.
00103      */
00104     ExecInitResultTupleSlot(estate, &mergestate->ps);
00105 
00106     /*
00107      * call ExecInitNode on each of the plans to be executed and save the
00108      * results into the array "mergeplans".
00109      */
00110     i = 0;
00111     foreach(lc, node->mergeplans)
00112     {
00113         Plan       *initNode = (Plan *) lfirst(lc);
00114 
00115         mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
00116         i++;
00117     }
00118 
00119     /*
00120      * initialize output tuple type
00121      */
00122     ExecAssignResultTypeFromTL(&mergestate->ps);
00123     mergestate->ps.ps_ProjInfo = NULL;
00124 
00125     /*
00126      * initialize sort-key information
00127      */
00128     mergestate->ms_nkeys = node->numCols;
00129     mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
00130 
00131     for (i = 0; i < node->numCols; i++)
00132     {
00133         SortSupport sortKey = mergestate->ms_sortkeys + i;
00134 
00135         sortKey->ssup_cxt = CurrentMemoryContext;
00136         sortKey->ssup_collation = node->collations[i];
00137         sortKey->ssup_nulls_first = node->nullsFirst[i];
00138         sortKey->ssup_attno = node->sortColIdx[i];
00139 
00140         PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
00141     }
00142 
00143     /*
00144      * initialize to show we have not run the subplans yet
00145      */
00146     mergestate->ms_initialized = false;
00147 
00148     return mergestate;
00149 }
00150 
00151 /* ----------------------------------------------------------------
00152  *     ExecMergeAppend
00153  *
00154  *      Handles iteration over multiple subplans.
00155  * ----------------------------------------------------------------
00156  */
00157 TupleTableSlot *
00158 ExecMergeAppend(MergeAppendState *node)
00159 {
00160     TupleTableSlot *result;
00161     SlotNumber  i;
00162 
00163     if (!node->ms_initialized)
00164     {
00165         /*
00166          * First time through: pull the first tuple from each subplan, and set
00167          * up the heap.
00168          */
00169         for (i = 0; i < node->ms_nplans; i++)
00170         {
00171             node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
00172             if (!TupIsNull(node->ms_slots[i]))
00173                 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
00174         }
00175         binaryheap_build(node->ms_heap);
00176         node->ms_initialized = true;
00177     }
00178     else
00179     {
00180         /*
00181          * Otherwise, pull the next tuple from whichever subplan we returned
00182          * from last time, and reinsert the subplan index into the heap,
00183          * because it might now compare differently against the existing
00184          * elements of the heap.  (We could perhaps simplify the logic a bit
00185          * by doing this before returning from the prior call, but it's better
00186          * to not pull tuples until necessary.)
00187          */
00188         i = DatumGetInt32(binaryheap_first(node->ms_heap));
00189         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
00190         if (!TupIsNull(node->ms_slots[i]))
00191             binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
00192         else
00193             (void) binaryheap_remove_first(node->ms_heap);
00194     }
00195 
00196     if (binaryheap_empty(node->ms_heap))
00197     {
00198         /* All the subplans are exhausted, and so is the heap */
00199         result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
00200     }
00201     else
00202     {
00203         i = DatumGetInt32(binaryheap_first(node->ms_heap));
00204         result = node->ms_slots[i];
00205     }
00206 
00207     return result;
00208 }
00209 
00210 /*
00211  * Compare the tuples in the two given slots.
00212  */
00213 static int32
00214 heap_compare_slots(Datum a, Datum b, void *arg)
00215 {
00216     MergeAppendState *node = (MergeAppendState *) arg;
00217     SlotNumber  slot1 = DatumGetInt32(a);
00218     SlotNumber  slot2 = DatumGetInt32(b);
00219 
00220     TupleTableSlot *s1 = node->ms_slots[slot1];
00221     TupleTableSlot *s2 = node->ms_slots[slot2];
00222     int         nkey;
00223 
00224     Assert(!TupIsNull(s1));
00225     Assert(!TupIsNull(s2));
00226 
00227     for (nkey = 0; nkey < node->ms_nkeys; nkey++)
00228     {
00229         SortSupport sortKey = node->ms_sortkeys + nkey;
00230         AttrNumber  attno = sortKey->ssup_attno;
00231         Datum       datum1,
00232                     datum2;
00233         bool        isNull1,
00234                     isNull2;
00235         int         compare;
00236 
00237         datum1 = slot_getattr(s1, attno, &isNull1);
00238         datum2 = slot_getattr(s2, attno, &isNull2);
00239 
00240         compare = ApplySortComparator(datum1, isNull1,
00241                                       datum2, isNull2,
00242                                       sortKey);
00243         if (compare != 0)
00244             return -compare;
00245     }
00246     return 0;
00247 }
00248 
00249 /* ----------------------------------------------------------------
00250  *      ExecEndMergeAppend
00251  *
00252  *      Shuts down the subscans of the MergeAppend node.
00253  *
00254  *      Returns nothing of interest.
00255  * ----------------------------------------------------------------
00256  */
00257 void
00258 ExecEndMergeAppend(MergeAppendState *node)
00259 {
00260     PlanState **mergeplans;
00261     int         nplans;
00262     int         i;
00263 
00264     /*
00265      * get information from the node
00266      */
00267     mergeplans = node->mergeplans;
00268     nplans = node->ms_nplans;
00269 
00270     /*
00271      * shut down each of the subscans
00272      */
00273     for (i = 0; i < nplans; i++)
00274         ExecEndNode(mergeplans[i]);
00275 }
00276 
00277 void
00278 ExecReScanMergeAppend(MergeAppendState *node)
00279 {
00280     int         i;
00281 
00282     for (i = 0; i < node->ms_nplans; i++)
00283     {
00284         PlanState  *subnode = node->mergeplans[i];
00285 
00286         /*
00287          * ExecReScan doesn't know about my subplans, so I have to do
00288          * changed-parameter signaling myself.
00289          */
00290         if (node->ps.chgParam != NULL)
00291             UpdateChangedParamSet(subnode, node->ps.chgParam);
00292 
00293         /*
00294          * If chgParam of subnode is not null then plan will be re-scanned by
00295          * first ExecProcNode.
00296          */
00297         if (subnode->chgParam == NULL)
00298             ExecReScan(subnode);
00299     }
00300     node->ms_initialized = false;
00301 }