#include "postgres.h"
#include "executor/execdebug.h"
#include "executor/nodeMergeAppend.h"
#include "lib/binaryheap.h"
Go to the source code of this file.
Typedefs | |
typedef int32 | SlotNumber |
Functions | |
static int | heap_compare_slots (Datum a, Datum b, void *arg) |
MergeAppendState * | ExecInitMergeAppend (MergeAppend *node, EState *estate, int eflags) |
TupleTableSlot * | ExecMergeAppend (MergeAppendState *node) |
void | ExecEndMergeAppend (MergeAppendState *node) |
void | ExecReScanMergeAppend (MergeAppendState *node) |
typedef int32 SlotNumber |
Definition at line 51 of file nodeMergeAppend.c.
void ExecEndMergeAppend | ( | MergeAppendState * | node | ) |
Definition at line 258 of file nodeMergeAppend.c.
References ExecEndNode(), i, MergeAppendState::mergeplans, and MergeAppendState::ms_nplans.
Referenced by ExecEndNode().
{ PlanState **mergeplans; int nplans; int i; /* * get information from the node */ mergeplans = node->mergeplans; nplans = node->ms_nplans; /* * shut down each of the subscans */ for (i = 0; i < nplans; i++) ExecEndNode(mergeplans[i]); }
MergeAppendState* ExecInitMergeAppend | ( | MergeAppend * | node, | |
EState * | estate, | |||
int | eflags | |||
) |
Definition at line 63 of file nodeMergeAppend.c.
References Assert, binaryheap_allocate(), MergeAppend::collations, CurrentMemoryContext, EXEC_FLAG_BACKWARD, EXEC_FLAG_MARK, ExecAssignResultTypeFromTL(), ExecInitNode(), ExecInitResultTupleSlot(), heap_compare_slots(), i, lfirst, list_length(), makeNode, MergeAppendState::mergeplans, MergeAppend::mergeplans, MergeAppendState::ms_heap, MergeAppendState::ms_initialized, MergeAppendState::ms_nkeys, MergeAppendState::ms_nplans, MergeAppendState::ms_slots, MergeAppendState::ms_sortkeys, MergeAppend::nullsFirst, MergeAppend::numCols, palloc0(), PlanState::plan, PrepareSortSupportFromOrderingOp(), MergeAppendState::ps, PlanState::ps_ProjInfo, MergeAppend::sortColIdx, MergeAppend::sortOperators, SortSupportData::ssup_attno, SortSupportData::ssup_collation, SortSupportData::ssup_cxt, SortSupportData::ssup_nulls_first, and PlanState::state.
Referenced by ExecInitNode().
{ MergeAppendState *mergestate = makeNode(MergeAppendState); PlanState **mergeplanstates; int nplans; int i; ListCell *lc; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); /* * Set up empty vector of subplan states */ nplans = list_length(node->mergeplans); mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *)); /* * create new MergeAppendState for our node */ mergestate->ps.plan = (Plan *) node; mergestate->ps.state = estate; mergestate->mergeplans = mergeplanstates; mergestate->ms_nplans = nplans; mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, mergestate); /* * Miscellaneous initialization * * MergeAppend plans don't have expression contexts because they never * call ExecQual or ExecProject. */ /* * MergeAppend nodes do have Result slots, which hold pointers to tuples, * so we have to initialize them. */ ExecInitResultTupleSlot(estate, &mergestate->ps); /* * call ExecInitNode on each of the plans to be executed and save the * results into the array "mergeplans". */ i = 0; foreach(lc, node->mergeplans) { Plan *initNode = (Plan *) lfirst(lc); mergeplanstates[i] = ExecInitNode(initNode, estate, eflags); i++; } /* * initialize output tuple type */ ExecAssignResultTypeFromTL(&mergestate->ps); mergestate->ps.ps_ProjInfo = NULL; /* * initialize sort-key information */ mergestate->ms_nkeys = node->numCols; mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols); for (i = 0; i < node->numCols; i++) { SortSupport sortKey = mergestate->ms_sortkeys + i; sortKey->ssup_cxt = CurrentMemoryContext; sortKey->ssup_collation = node->collations[i]; sortKey->ssup_nulls_first = node->nullsFirst[i]; sortKey->ssup_attno = node->sortColIdx[i]; PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); } /* * initialize to show we have not run the subplans yet */ mergestate->ms_initialized = false; return mergestate; }
TupleTableSlot* ExecMergeAppend | ( | MergeAppendState * | node | ) |
Definition at line 158 of file nodeMergeAppend.c.
References binaryheap_add_unordered(), binaryheap_build(), binaryheap_empty, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), DatumGetInt32, ExecClearTuple(), ExecProcNode(), i, Int32GetDatum, MergeAppendState::mergeplans, MergeAppendState::ms_heap, MergeAppendState::ms_initialized, MergeAppendState::ms_nplans, MergeAppendState::ms_slots, MergeAppendState::ps, PlanState::ps_ResultTupleSlot, and TupIsNull.
Referenced by ExecProcNode().
{ TupleTableSlot *result; SlotNumber i; if (!node->ms_initialized) { /* * First time through: pull the first tuple from each subplan, and set * up the heap. */ for (i = 0; i < node->ms_nplans; i++) { node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); } binaryheap_build(node->ms_heap); node->ms_initialized = true; } else { /* * Otherwise, pull the next tuple from whichever subplan we returned * from last time, and reinsert the subplan index into the heap, * because it might now compare differently against the existing * elements of the heap. (We could perhaps simplify the logic a bit * by doing this before returning from the prior call, but it's better * to not pull tuples until necessary.) */ i = DatumGetInt32(binaryheap_first(node->ms_heap)); node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); else (void) binaryheap_remove_first(node->ms_heap); } if (binaryheap_empty(node->ms_heap)) { /* All the subplans are exhausted, and so is the heap */ result = ExecClearTuple(node->ps.ps_ResultTupleSlot); } else { i = DatumGetInt32(binaryheap_first(node->ms_heap)); result = node->ms_slots[i]; } return result; }
void ExecReScanMergeAppend | ( | MergeAppendState * | node | ) |
Definition at line 278 of file nodeMergeAppend.c.
References PlanState::chgParam, ExecReScan(), i, MergeAppendState::mergeplans, MergeAppendState::ms_initialized, MergeAppendState::ms_nplans, NULL, MergeAppendState::ps, and UpdateChangedParamSet().
Referenced by ExecReScan().
{ int i; for (i = 0; i < node->ms_nplans; i++) { PlanState *subnode = node->mergeplans[i]; /* * ExecReScan doesn't know about my subplans, so I have to do * changed-parameter signaling myself. */ if (node->ps.chgParam != NULL) UpdateChangedParamSet(subnode, node->ps.chgParam); /* * If chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (subnode->chgParam == NULL) ExecReScan(subnode); } node->ms_initialized = false; }
Definition at line 214 of file nodeMergeAppend.c.
References ApplySortComparator(), Assert, DatumGetInt32, MergeAppendState::ms_nkeys, MergeAppendState::ms_slots, MergeAppendState::ms_sortkeys, s1, s2, slot_getattr(), SortSupportData::ssup_attno, and TupIsNull.
Referenced by ExecInitMergeAppend().
{ MergeAppendState *node = (MergeAppendState *) arg; SlotNumber slot1 = DatumGetInt32(a); SlotNumber slot2 = DatumGetInt32(b); TupleTableSlot *s1 = node->ms_slots[slot1]; TupleTableSlot *s2 = node->ms_slots[slot2]; int nkey; Assert(!TupIsNull(s1)); Assert(!TupIsNull(s2)); for (nkey = 0; nkey < node->ms_nkeys; nkey++) { SortSupport sortKey = node->ms_sortkeys + nkey; AttrNumber attno = sortKey->ssup_attno; Datum datum1, datum2; bool isNull1, isNull2; int compare; datum1 = slot_getattr(s1, attno, &isNull1); datum2 = slot_getattr(s2, attno, &isNull2); compare = ApplySortComparator(datum1, isNull1, datum2, isNull2, sortKey); if (compare != 0) return -compare; } return 0; }