Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include "executor/execdebug.h"
00018 #include "executor/nodeRecursiveunion.h"
00019 #include "miscadmin.h"
00020 #include "utils/memutils.h"
00021
00022
00023
00024
00025
00026
00027 typedef struct RUHashEntryData *RUHashEntry;
00028
00029 typedef struct RUHashEntryData
00030 {
00031 TupleHashEntryData shared;
00032 } RUHashEntryData;
00033
00034
00035
00036
00037
00038 static void
00039 build_hash_table(RecursiveUnionState *rustate)
00040 {
00041 RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
00042
00043 Assert(node->numCols > 0);
00044 Assert(node->numGroups > 0);
00045
00046 rustate->hashtable = BuildTupleHashTable(node->numCols,
00047 node->dupColIdx,
00048 rustate->eqfunctions,
00049 rustate->hashfunctions,
00050 node->numGroups,
00051 sizeof(RUHashEntryData),
00052 rustate->tableContext,
00053 rustate->tempContext);
00054 }
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075 TupleTableSlot *
00076 ExecRecursiveUnion(RecursiveUnionState *node)
00077 {
00078 PlanState *outerPlan = outerPlanState(node);
00079 PlanState *innerPlan = innerPlanState(node);
00080 RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
00081 TupleTableSlot *slot;
00082 bool isnew;
00083
00084
00085 if (!node->recursing)
00086 {
00087 for (;;)
00088 {
00089 slot = ExecProcNode(outerPlan);
00090 if (TupIsNull(slot))
00091 break;
00092 if (plan->numCols > 0)
00093 {
00094
00095 LookupTupleHashEntry(node->hashtable, slot, &isnew);
00096
00097 MemoryContextReset(node->tempContext);
00098
00099 if (!isnew)
00100 continue;
00101 }
00102
00103 tuplestore_puttupleslot(node->working_table, slot);
00104
00105 return slot;
00106 }
00107 node->recursing = true;
00108 }
00109
00110
00111 for (;;)
00112 {
00113 slot = ExecProcNode(innerPlan);
00114 if (TupIsNull(slot))
00115 {
00116
00117 if (node->intermediate_empty)
00118 break;
00119
00120
00121 tuplestore_end(node->working_table);
00122
00123
00124 node->working_table = node->intermediate_table;
00125
00126
00127 node->intermediate_table = tuplestore_begin_heap(false, false,
00128 work_mem);
00129 node->intermediate_empty = true;
00130
00131
00132 innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
00133 plan->wtParam);
00134
00135
00136 continue;
00137 }
00138
00139 if (plan->numCols > 0)
00140 {
00141
00142 LookupTupleHashEntry(node->hashtable, slot, &isnew);
00143
00144 MemoryContextReset(node->tempContext);
00145
00146 if (!isnew)
00147 continue;
00148 }
00149
00150
00151 node->intermediate_empty = false;
00152 tuplestore_puttupleslot(node->intermediate_table, slot);
00153
00154 return slot;
00155 }
00156
00157 return NULL;
00158 }
00159
00160
00161
00162
00163
00164 RecursiveUnionState *
00165 ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
00166 {
00167 RecursiveUnionState *rustate;
00168 ParamExecData *prmdata;
00169
00170
00171 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
00172
00173
00174
00175
00176 rustate = makeNode(RecursiveUnionState);
00177 rustate->ps.plan = (Plan *) node;
00178 rustate->ps.state = estate;
00179
00180 rustate->eqfunctions = NULL;
00181 rustate->hashfunctions = NULL;
00182 rustate->hashtable = NULL;
00183 rustate->tempContext = NULL;
00184 rustate->tableContext = NULL;
00185
00186
00187 rustate->recursing = false;
00188 rustate->intermediate_empty = true;
00189 rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
00190 rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
00191
00192
00193
00194
00195
00196
00197
00198 if (node->numCols > 0)
00199 {
00200 rustate->tempContext =
00201 AllocSetContextCreate(CurrentMemoryContext,
00202 "RecursiveUnion",
00203 ALLOCSET_DEFAULT_MINSIZE,
00204 ALLOCSET_DEFAULT_INITSIZE,
00205 ALLOCSET_DEFAULT_MAXSIZE);
00206 rustate->tableContext =
00207 AllocSetContextCreate(CurrentMemoryContext,
00208 "RecursiveUnion hash table",
00209 ALLOCSET_DEFAULT_MINSIZE,
00210 ALLOCSET_DEFAULT_INITSIZE,
00211 ALLOCSET_DEFAULT_MAXSIZE);
00212 }
00213
00214
00215
00216
00217
00218 prmdata = &(estate->es_param_exec_vals[node->wtParam]);
00219 Assert(prmdata->execPlan == NULL);
00220 prmdata->value = PointerGetDatum(rustate);
00221 prmdata->isnull = false;
00222
00223
00224
00225
00226
00227
00228
00229 Assert(node->plan.qual == NIL);
00230
00231
00232
00233
00234
00235 ExecInitResultTupleSlot(estate, &rustate->ps);
00236
00237
00238
00239
00240
00241
00242 ExecAssignResultTypeFromTL(&rustate->ps);
00243 rustate->ps.ps_ProjInfo = NULL;
00244
00245
00246
00247
00248 outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags);
00249 innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags);
00250
00251
00252
00253
00254
00255 if (node->numCols > 0)
00256 {
00257 execTuplesHashPrepare(node->numCols,
00258 node->dupOperators,
00259 &rustate->eqfunctions,
00260 &rustate->hashfunctions);
00261 build_hash_table(rustate);
00262 }
00263
00264 return rustate;
00265 }
00266
00267
00268
00269
00270
00271
00272
00273 void
00274 ExecEndRecursiveUnion(RecursiveUnionState *node)
00275 {
00276
00277 tuplestore_end(node->working_table);
00278 tuplestore_end(node->intermediate_table);
00279
00280
00281 if (node->tempContext)
00282 MemoryContextDelete(node->tempContext);
00283 if (node->tableContext)
00284 MemoryContextDelete(node->tableContext);
00285
00286
00287
00288
00289 ExecClearTuple(node->ps.ps_ResultTupleSlot);
00290
00291
00292
00293
00294 ExecEndNode(outerPlanState(node));
00295 ExecEndNode(innerPlanState(node));
00296 }
00297
00298
00299
00300
00301
00302
00303
00304 void
00305 ExecReScanRecursiveUnion(RecursiveUnionState *node)
00306 {
00307 PlanState *outerPlan = outerPlanState(node);
00308 PlanState *innerPlan = innerPlanState(node);
00309 RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
00310
00311
00312
00313
00314
00315 innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam);
00316
00317
00318
00319
00320
00321
00322 if (outerPlan->chgParam == NULL)
00323 ExecReScan(outerPlan);
00324
00325
00326 if (node->tableContext)
00327 MemoryContextResetAndDeleteChildren(node->tableContext);
00328
00329
00330 if (plan->numCols > 0)
00331 build_hash_table(node);
00332
00333
00334 node->recursing = false;
00335 node->intermediate_empty = true;
00336 tuplestore_clear(node->working_table);
00337 tuplestore_clear(node->intermediate_table);
00338 }