Header And Logo

PostgreSQL
| The world's most advanced open source database.

nodeHash.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * nodeHash.c
00004  *    Routines to hash relations for hashjoin
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/executor/nodeHash.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 /*
00016  * INTERFACE ROUTINES
00017  *      MultiExecHash   - generate an in-memory hash table of the relation
00018  *      ExecInitHash    - initialize node and subnodes
00019  *      ExecEndHash     - shutdown node and subnodes
00020  */
00021 
00022 #include "postgres.h"
00023 
00024 #include <math.h>
00025 #include <limits.h>
00026 
00027 #include "access/htup_details.h"
00028 #include "catalog/pg_statistic.h"
00029 #include "commands/tablespace.h"
00030 #include "executor/execdebug.h"
00031 #include "executor/hashjoin.h"
00032 #include "executor/nodeHash.h"
00033 #include "executor/nodeHashjoin.h"
00034 #include "miscadmin.h"
00035 #include "utils/dynahash.h"
00036 #include "utils/memutils.h"
00037 #include "utils/lsyscache.h"
00038 #include "utils/syscache.h"
00039 
00040 
00041 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
00042 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
00043                       int mcvsToUse);
00044 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
00045                         TupleTableSlot *slot,
00046                         uint32 hashvalue,
00047                         int bucketNumber);
00048 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
00049 
00050 
00051 /* ----------------------------------------------------------------
00052  *      ExecHash
00053  *
00054  *      stub for pro forma compliance
00055  * ----------------------------------------------------------------
00056  */
00057 TupleTableSlot *
00058 ExecHash(HashState *node)
00059 {
00060     elog(ERROR, "Hash node does not support ExecProcNode call convention");
00061     return NULL;
00062 }
00063 
00064 /* ----------------------------------------------------------------
00065  *      MultiExecHash
00066  *
00067  *      build hash table for hashjoin, doing partitioning if more
00068  *      than one batch is required.
00069  * ----------------------------------------------------------------
00070  */
00071 Node *
00072 MultiExecHash(HashState *node)
00073 {
00074     PlanState  *outerNode;
00075     List       *hashkeys;
00076     HashJoinTable hashtable;
00077     TupleTableSlot *slot;
00078     ExprContext *econtext;
00079     uint32      hashvalue;
00080 
00081     /* must provide our own instrumentation support */
00082     if (node->ps.instrument)
00083         InstrStartNode(node->ps.instrument);
00084 
00085     /*
00086      * get state info from node
00087      */
00088     outerNode = outerPlanState(node);
00089     hashtable = node->hashtable;
00090 
00091     /*
00092      * set expression context
00093      */
00094     hashkeys = node->hashkeys;
00095     econtext = node->ps.ps_ExprContext;
00096 
00097     /*
00098      * get all inner tuples and insert into the hash table (or temp files)
00099      */
00100     for (;;)
00101     {
00102         slot = ExecProcNode(outerNode);
00103         if (TupIsNull(slot))
00104             break;
00105         /* We have to compute the hash value */
00106         econtext->ecxt_innertuple = slot;
00107         if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
00108                                  false, hashtable->keepNulls,
00109                                  &hashvalue))
00110         {
00111             int         bucketNumber;
00112 
00113             bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
00114             if (bucketNumber != INVALID_SKEW_BUCKET_NO)
00115             {
00116                 /* It's a skew tuple, so put it into that hash table */
00117                 ExecHashSkewTableInsert(hashtable, slot, hashvalue,
00118                                         bucketNumber);
00119             }
00120             else
00121             {
00122                 /* Not subject to skew optimization, so insert normally */
00123                 ExecHashTableInsert(hashtable, slot, hashvalue);
00124             }
00125             hashtable->totalTuples += 1;
00126         }
00127     }
00128 
00129     /* must provide our own instrumentation support */
00130     if (node->ps.instrument)
00131         InstrStopNode(node->ps.instrument, hashtable->totalTuples);
00132 
00133     /*
00134      * We do not return the hash table directly because it's not a subtype of
00135      * Node, and so would violate the MultiExecProcNode API.  Instead, our
00136      * parent Hashjoin node is expected to know how to fish it out of our node
00137      * state.  Ugly but not really worth cleaning up, since Hashjoin knows
00138      * quite a bit more about Hash besides that.
00139      */
00140     return NULL;
00141 }
00142 
00143 /* ----------------------------------------------------------------
00144  *      ExecInitHash
00145  *
00146  *      Init routine for Hash node
00147  * ----------------------------------------------------------------
00148  */
00149 HashState *
00150 ExecInitHash(Hash *node, EState *estate, int eflags)
00151 {
00152     HashState  *hashstate;
00153 
00154     /* check for unsupported flags */
00155     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
00156 
00157     /*
00158      * create state structure
00159      */
00160     hashstate = makeNode(HashState);
00161     hashstate->ps.plan = (Plan *) node;
00162     hashstate->ps.state = estate;
00163     hashstate->hashtable = NULL;
00164     hashstate->hashkeys = NIL;  /* will be set by parent HashJoin */
00165 
00166     /*
00167      * Miscellaneous initialization
00168      *
00169      * create expression context for node
00170      */
00171     ExecAssignExprContext(estate, &hashstate->ps);
00172 
00173     /*
00174      * initialize our result slot
00175      */
00176     ExecInitResultTupleSlot(estate, &hashstate->ps);
00177 
00178     /*
00179      * initialize child expressions
00180      */
00181     hashstate->ps.targetlist = (List *)
00182         ExecInitExpr((Expr *) node->plan.targetlist,
00183                      (PlanState *) hashstate);
00184     hashstate->ps.qual = (List *)
00185         ExecInitExpr((Expr *) node->plan.qual,
00186                      (PlanState *) hashstate);
00187 
00188     /*
00189      * initialize child nodes
00190      */
00191     outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
00192 
00193     /*
00194      * initialize tuple type. no need to initialize projection info because
00195      * this node doesn't do projections
00196      */
00197     ExecAssignResultTypeFromTL(&hashstate->ps);
00198     hashstate->ps.ps_ProjInfo = NULL;
00199 
00200     return hashstate;
00201 }
00202 
00203 /* ---------------------------------------------------------------
00204  *      ExecEndHash
00205  *
00206  *      clean up routine for Hash node
00207  * ----------------------------------------------------------------
00208  */
00209 void
00210 ExecEndHash(HashState *node)
00211 {
00212     PlanState  *outerPlan;
00213 
00214     /*
00215      * free exprcontext
00216      */
00217     ExecFreeExprContext(&node->ps);
00218 
00219     /*
00220      * shut down the subplan
00221      */
00222     outerPlan = outerPlanState(node);
00223     ExecEndNode(outerPlan);
00224 }
00225 
00226 
00227 /* ----------------------------------------------------------------
00228  *      ExecHashTableCreate
00229  *
00230  *      create an empty hashtable data structure for hashjoin.
00231  * ----------------------------------------------------------------
00232  */
00233 HashJoinTable
00234 ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
00235 {
00236     HashJoinTable hashtable;
00237     Plan       *outerNode;
00238     int         nbuckets;
00239     int         nbatch;
00240     int         num_skew_mcvs;
00241     int         log2_nbuckets;
00242     int         nkeys;
00243     int         i;
00244     ListCell   *ho;
00245     MemoryContext oldcxt;
00246 
00247     /*
00248      * Get information about the size of the relation to be hashed (it's the
00249      * "outer" subtree of this node, but the inner relation of the hashjoin).
00250      * Compute the appropriate size of the hash table.
00251      */
00252     outerNode = outerPlan(node);
00253 
00254     ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
00255                             OidIsValid(node->skewTable),
00256                             &nbuckets, &nbatch, &num_skew_mcvs);
00257 
00258 #ifdef HJDEBUG
00259     printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets);
00260 #endif
00261 
00262     /* nbuckets must be a power of 2 */
00263     log2_nbuckets = my_log2(nbuckets);
00264     Assert(nbuckets == (1 << log2_nbuckets));
00265 
00266     /*
00267      * Initialize the hash table control block.
00268      *
00269      * The hashtable control block is just palloc'd from the executor's
00270      * per-query memory context.
00271      */
00272     hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
00273     hashtable->nbuckets = nbuckets;
00274     hashtable->log2_nbuckets = log2_nbuckets;
00275     hashtable->buckets = NULL;
00276     hashtable->keepNulls = keepNulls;
00277     hashtable->skewEnabled = false;
00278     hashtable->skewBucket = NULL;
00279     hashtable->skewBucketLen = 0;
00280     hashtable->nSkewBuckets = 0;
00281     hashtable->skewBucketNums = NULL;
00282     hashtable->nbatch = nbatch;
00283     hashtable->curbatch = 0;
00284     hashtable->nbatch_original = nbatch;
00285     hashtable->nbatch_outstart = nbatch;
00286     hashtable->growEnabled = true;
00287     hashtable->totalTuples = 0;
00288     hashtable->innerBatchFile = NULL;
00289     hashtable->outerBatchFile = NULL;
00290     hashtable->spaceUsed = 0;
00291     hashtable->spacePeak = 0;
00292     hashtable->spaceAllowed = work_mem * 1024L;
00293     hashtable->spaceUsedSkew = 0;
00294     hashtable->spaceAllowedSkew =
00295         hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
00296 
00297     /*
00298      * Get info about the hash functions to be used for each hash key. Also
00299      * remember whether the join operators are strict.
00300      */
00301     nkeys = list_length(hashOperators);
00302     hashtable->outer_hashfunctions =
00303         (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
00304     hashtable->inner_hashfunctions =
00305         (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
00306     hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
00307     i = 0;
00308     foreach(ho, hashOperators)
00309     {
00310         Oid         hashop = lfirst_oid(ho);
00311         Oid         left_hashfn;
00312         Oid         right_hashfn;
00313 
00314         if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
00315             elog(ERROR, "could not find hash function for hash operator %u",
00316                  hashop);
00317         fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
00318         fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
00319         hashtable->hashStrict[i] = op_strict(hashop);
00320         i++;
00321     }
00322 
00323     /*
00324      * Create temporary memory contexts in which to keep the hashtable working
00325      * storage.  See notes in executor/hashjoin.h.
00326      */
00327     hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
00328                                                "HashTableContext",
00329                                                ALLOCSET_DEFAULT_MINSIZE,
00330                                                ALLOCSET_DEFAULT_INITSIZE,
00331                                                ALLOCSET_DEFAULT_MAXSIZE);
00332 
00333     hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
00334                                                 "HashBatchContext",
00335                                                 ALLOCSET_DEFAULT_MINSIZE,
00336                                                 ALLOCSET_DEFAULT_INITSIZE,
00337                                                 ALLOCSET_DEFAULT_MAXSIZE);
00338 
00339     /* Allocate data that will live for the life of the hashjoin */
00340 
00341     oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
00342 
00343     if (nbatch > 1)
00344     {
00345         /*
00346          * allocate and initialize the file arrays in hashCxt
00347          */
00348         hashtable->innerBatchFile = (BufFile **)
00349             palloc0(nbatch * sizeof(BufFile *));
00350         hashtable->outerBatchFile = (BufFile **)
00351             palloc0(nbatch * sizeof(BufFile *));
00352         /* The files will not be opened until needed... */
00353         /* ... but make sure we have temp tablespaces established for them */
00354         PrepareTempTablespaces();
00355     }
00356 
00357     /*
00358      * Prepare context for the first-scan space allocations; allocate the
00359      * hashbucket array therein, and set each bucket "empty".
00360      */
00361     MemoryContextSwitchTo(hashtable->batchCxt);
00362 
00363     hashtable->buckets = (HashJoinTuple *)
00364         palloc0(nbuckets * sizeof(HashJoinTuple));
00365 
00366     /*
00367      * Set up for skew optimization, if possible and there's a need for more
00368      * than one batch.  (In a one-batch join, there's no point in it.)
00369      */
00370     if (nbatch > 1)
00371         ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
00372 
00373     MemoryContextSwitchTo(oldcxt);
00374 
00375     return hashtable;
00376 }
00377 
00378 
00379 /*
00380  * Compute appropriate size for hashtable given the estimated size of the
00381  * relation to be hashed (number of rows and average row width).
00382  *
00383  * This is exported so that the planner's costsize.c can use it.
00384  */
00385 
00386 /* Target bucket loading (tuples per bucket) */
00387 #define NTUP_PER_BUCKET         10
00388 
00389 void
00390 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
00391                         int *numbuckets,
00392                         int *numbatches,
00393                         int *num_skew_mcvs)
00394 {
00395     int         tupsize;
00396     double      inner_rel_bytes;
00397     long        hash_table_bytes;
00398     long        skew_table_bytes;
00399     long        max_pointers;
00400     int         nbatch;
00401     int         nbuckets;
00402     int         i;
00403 
00404     /* Force a plausible relation size if no info */
00405     if (ntuples <= 0.0)
00406         ntuples = 1000.0;
00407 
00408     /*
00409      * Estimate tupsize based on footprint of tuple in hashtable... note this
00410      * does not allow for any palloc overhead.  The manipulations of spaceUsed
00411      * don't count palloc overhead either.
00412      */
00413     tupsize = HJTUPLE_OVERHEAD +
00414         MAXALIGN(sizeof(MinimalTupleData)) +
00415         MAXALIGN(tupwidth);
00416     inner_rel_bytes = ntuples * tupsize;
00417 
00418     /*
00419      * Target in-memory hashtable size is work_mem kilobytes.
00420      */
00421     hash_table_bytes = work_mem * 1024L;
00422 
00423     /*
00424      * If skew optimization is possible, estimate the number of skew buckets
00425      * that will fit in the memory allowed, and decrement the assumed space
00426      * available for the main hash table accordingly.
00427      *
00428      * We make the optimistic assumption that each skew bucket will contain
00429      * one inner-relation tuple.  If that turns out to be low, we will recover
00430      * at runtime by reducing the number of skew buckets.
00431      *
00432      * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
00433      * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
00434      * will round up to the next power of 2 and then multiply by 4 to reduce
00435      * collisions.
00436      */
00437     if (useskew)
00438     {
00439         skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
00440 
00441         /*----------
00442          * Divisor is:
00443          * size of a hash tuple +
00444          * worst-case size of skewBucket[] per MCV +
00445          * size of skewBucketNums[] entry +
00446          * size of skew bucket struct itself
00447          *----------
00448          */
00449         *num_skew_mcvs = skew_table_bytes / (tupsize +
00450                                              (8 * sizeof(HashSkewBucket *)) +
00451                                              sizeof(int) +
00452                                              SKEW_BUCKET_OVERHEAD);
00453         if (*num_skew_mcvs > 0)
00454             hash_table_bytes -= skew_table_bytes;
00455     }
00456     else
00457         *num_skew_mcvs = 0;
00458 
00459     /*
00460      * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
00461      * memory is filled.  Set nbatch to the smallest power of 2 that appears
00462      * sufficient.  The Min() steps limit the results so that the pointer
00463      * arrays we'll try to allocate do not exceed work_mem.
00464      */
00465     max_pointers = (work_mem * 1024L) / sizeof(void *);
00466     /* also ensure we avoid integer overflow in nbatch and nbuckets */
00467     max_pointers = Min(max_pointers, INT_MAX / 2);
00468 
00469     if (inner_rel_bytes > hash_table_bytes)
00470     {
00471         /* We'll need multiple batches */
00472         long        lbuckets;
00473         double      dbatch;
00474         int         minbatch;
00475 
00476         lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET;
00477         lbuckets = Min(lbuckets, max_pointers);
00478         nbuckets = (int) lbuckets;
00479 
00480         dbatch = ceil(inner_rel_bytes / hash_table_bytes);
00481         dbatch = Min(dbatch, max_pointers);
00482         minbatch = (int) dbatch;
00483         nbatch = 2;
00484         while (nbatch < minbatch)
00485             nbatch <<= 1;
00486     }
00487     else
00488     {
00489         /* We expect the hashtable to fit in memory */
00490         double      dbuckets;
00491 
00492         dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
00493         dbuckets = Min(dbuckets, max_pointers);
00494         nbuckets = (int) dbuckets;
00495 
00496         nbatch = 1;
00497     }
00498 
00499     /*
00500      * Both nbuckets and nbatch must be powers of 2 to make
00501      * ExecHashGetBucketAndBatch fast.  We already fixed nbatch; now inflate
00502      * nbuckets to the next larger power of 2.  We also force nbuckets to not
00503      * be real small, by starting the search at 2^10.  (Note: above we made
00504      * sure that nbuckets is not more than INT_MAX / 2, so this loop cannot
00505      * overflow, nor can the final shift to recalculate nbuckets.)
00506      */
00507     i = 10;
00508     while ((1 << i) < nbuckets)
00509         i++;
00510     nbuckets = (1 << i);
00511 
00512     *numbuckets = nbuckets;
00513     *numbatches = nbatch;
00514 }
00515 
00516 
00517 /* ----------------------------------------------------------------
00518  *      ExecHashTableDestroy
00519  *
00520  *      destroy a hash table
00521  * ----------------------------------------------------------------
00522  */
00523 void
00524 ExecHashTableDestroy(HashJoinTable hashtable)
00525 {
00526     int         i;
00527 
00528     /*
00529      * Make sure all the temp files are closed.  We skip batch 0, since it
00530      * can't have any temp files (and the arrays might not even exist if
00531      * nbatch is only 1).
00532      */
00533     for (i = 1; i < hashtable->nbatch; i++)
00534     {
00535         if (hashtable->innerBatchFile[i])
00536             BufFileClose(hashtable->innerBatchFile[i]);
00537         if (hashtable->outerBatchFile[i])
00538             BufFileClose(hashtable->outerBatchFile[i]);
00539     }
00540 
00541     /* Release working memory (batchCxt is a child, so it goes away too) */
00542     MemoryContextDelete(hashtable->hashCxt);
00543 
00544     /* And drop the control block */
00545     pfree(hashtable);
00546 }
00547 
00548 /*
00549  * ExecHashIncreaseNumBatches
00550  *      increase the original number of batches in order to reduce
00551  *      current memory consumption
00552  */
00553 static void
00554 ExecHashIncreaseNumBatches(HashJoinTable hashtable)
00555 {
00556     int         oldnbatch = hashtable->nbatch;
00557     int         curbatch = hashtable->curbatch;
00558     int         nbatch;
00559     int         i;
00560     MemoryContext oldcxt;
00561     long        ninmemory;
00562     long        nfreed;
00563 
00564     /* do nothing if we've decided to shut off growth */
00565     if (!hashtable->growEnabled)
00566         return;
00567 
00568     /* safety check to avoid overflow */
00569     if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
00570         return;
00571 
00572     nbatch = oldnbatch * 2;
00573     Assert(nbatch > 1);
00574 
00575 #ifdef HJDEBUG
00576     printf("Increasing nbatch to %d because space = %lu\n",
00577            nbatch, (unsigned long) hashtable->spaceUsed);
00578 #endif
00579 
00580     oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
00581 
00582     if (hashtable->innerBatchFile == NULL)
00583     {
00584         /* we had no file arrays before */
00585         hashtable->innerBatchFile = (BufFile **)
00586             palloc0(nbatch * sizeof(BufFile *));
00587         hashtable->outerBatchFile = (BufFile **)
00588             palloc0(nbatch * sizeof(BufFile *));
00589         /* time to establish the temp tablespaces, too */
00590         PrepareTempTablespaces();
00591     }
00592     else
00593     {
00594         /* enlarge arrays and zero out added entries */
00595         hashtable->innerBatchFile = (BufFile **)
00596             repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
00597         hashtable->outerBatchFile = (BufFile **)
00598             repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
00599         MemSet(hashtable->innerBatchFile + oldnbatch, 0,
00600                (nbatch - oldnbatch) * sizeof(BufFile *));
00601         MemSet(hashtable->outerBatchFile + oldnbatch, 0,
00602                (nbatch - oldnbatch) * sizeof(BufFile *));
00603     }
00604 
00605     MemoryContextSwitchTo(oldcxt);
00606 
00607     hashtable->nbatch = nbatch;
00608 
00609     /*
00610      * Scan through the existing hash table entries and dump out any that are
00611      * no longer of the current batch.
00612      */
00613     ninmemory = nfreed = 0;
00614 
00615     for (i = 0; i < hashtable->nbuckets; i++)
00616     {
00617         HashJoinTuple prevtuple;
00618         HashJoinTuple tuple;
00619 
00620         prevtuple = NULL;
00621         tuple = hashtable->buckets[i];
00622 
00623         while (tuple != NULL)
00624         {
00625             /* save link in case we delete */
00626             HashJoinTuple nexttuple = tuple->next;
00627             int         bucketno;
00628             int         batchno;
00629 
00630             ninmemory++;
00631             ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
00632                                       &bucketno, &batchno);
00633             Assert(bucketno == i);
00634             if (batchno == curbatch)
00635             {
00636                 /* keep tuple */
00637                 prevtuple = tuple;
00638             }
00639             else
00640             {
00641                 /* dump it out */
00642                 Assert(batchno > curbatch);
00643                 ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(tuple),
00644                                       tuple->hashvalue,
00645                                       &hashtable->innerBatchFile[batchno]);
00646                 /* and remove from hash table */
00647                 if (prevtuple)
00648                     prevtuple->next = nexttuple;
00649                 else
00650                     hashtable->buckets[i] = nexttuple;
00651                 /* prevtuple doesn't change */
00652                 hashtable->spaceUsed -=
00653                     HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(tuple)->t_len;
00654                 pfree(tuple);
00655                 nfreed++;
00656             }
00657 
00658             tuple = nexttuple;
00659         }
00660     }
00661 
00662 #ifdef HJDEBUG
00663     printf("Freed %ld of %ld tuples, space now %lu\n",
00664            nfreed, ninmemory, (unsigned long) hashtable->spaceUsed);
00665 #endif
00666 
00667     /*
00668      * If we dumped out either all or none of the tuples in the table, disable
00669      * further expansion of nbatch.  This situation implies that we have
00670      * enough tuples of identical hashvalues to overflow spaceAllowed.
00671      * Increasing nbatch will not fix it since there's no way to subdivide the
00672      * group any more finely. We have to just gut it out and hope the server
00673      * has enough RAM.
00674      */
00675     if (nfreed == 0 || nfreed == ninmemory)
00676     {
00677         hashtable->growEnabled = false;
00678 #ifdef HJDEBUG
00679         printf("Disabling further increase of nbatch\n");
00680 #endif
00681     }
00682 }
00683 
00684 /*
00685  * ExecHashTableInsert
00686  *      insert a tuple into the hash table depending on the hash value
00687  *      it may just go to a temp file for later batches
00688  *
00689  * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
00690  * tuple; the minimal case in particular is certain to happen while reloading
00691  * tuples from batch files.  We could save some cycles in the regular-tuple
00692  * case by not forcing the slot contents into minimal form; not clear if it's
00693  * worth the messiness required.
00694  */
00695 void
00696 ExecHashTableInsert(HashJoinTable hashtable,
00697                     TupleTableSlot *slot,
00698                     uint32 hashvalue)
00699 {
00700     MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
00701     int         bucketno;
00702     int         batchno;
00703 
00704     ExecHashGetBucketAndBatch(hashtable, hashvalue,
00705                               &bucketno, &batchno);
00706 
00707     /*
00708      * decide whether to put the tuple in the hash table or a temp file
00709      */
00710     if (batchno == hashtable->curbatch)
00711     {
00712         /*
00713          * put the tuple in hash table
00714          */
00715         HashJoinTuple hashTuple;
00716         int         hashTupleSize;
00717 
00718         /* Create the HashJoinTuple */
00719         hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
00720         hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
00721                                                        hashTupleSize);
00722         hashTuple->hashvalue = hashvalue;
00723         memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
00724 
00725         /*
00726          * We always reset the tuple-matched flag on insertion.  This is okay
00727          * even when reloading a tuple from a batch file, since the tuple
00728          * could not possibly have been matched to an outer tuple before it
00729          * went into the batch file.
00730          */
00731         HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
00732 
00733         /* Push it onto the front of the bucket's list */
00734         hashTuple->next = hashtable->buckets[bucketno];
00735         hashtable->buckets[bucketno] = hashTuple;
00736 
00737         /* Account for space used, and back off if we've used too much */
00738         hashtable->spaceUsed += hashTupleSize;
00739         if (hashtable->spaceUsed > hashtable->spacePeak)
00740             hashtable->spacePeak = hashtable->spaceUsed;
00741         if (hashtable->spaceUsed > hashtable->spaceAllowed)
00742             ExecHashIncreaseNumBatches(hashtable);
00743     }
00744     else
00745     {
00746         /*
00747          * put the tuple into a temp file for later batches
00748          */
00749         Assert(batchno > hashtable->curbatch);
00750         ExecHashJoinSaveTuple(tuple,
00751                               hashvalue,
00752                               &hashtable->innerBatchFile[batchno]);
00753     }
00754 }
00755 
00756 /*
00757  * ExecHashGetHashValue
00758  *      Compute the hash value for a tuple
00759  *
00760  * The tuple to be tested must be in either econtext->ecxt_outertuple or
00761  * econtext->ecxt_innertuple.  Vars in the hashkeys expressions should have
00762  * varno either OUTER_VAR or INNER_VAR.
00763  *
00764  * A TRUE result means the tuple's hash value has been successfully computed
00765  * and stored at *hashvalue.  A FALSE result means the tuple cannot match
00766  * because it contains a null attribute, and hence it should be discarded
00767  * immediately.  (If keep_nulls is true then FALSE is never returned.)
00768  */
00769 bool
00770 ExecHashGetHashValue(HashJoinTable hashtable,
00771                      ExprContext *econtext,
00772                      List *hashkeys,
00773                      bool outer_tuple,
00774                      bool keep_nulls,
00775                      uint32 *hashvalue)
00776 {
00777     uint32      hashkey = 0;
00778     FmgrInfo   *hashfunctions;
00779     ListCell   *hk;
00780     int         i = 0;
00781     MemoryContext oldContext;
00782 
00783     /*
00784      * We reset the eval context each time to reclaim any memory leaked in the
00785      * hashkey expressions.
00786      */
00787     ResetExprContext(econtext);
00788 
00789     oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
00790 
00791     if (outer_tuple)
00792         hashfunctions = hashtable->outer_hashfunctions;
00793     else
00794         hashfunctions = hashtable->inner_hashfunctions;
00795 
00796     foreach(hk, hashkeys)
00797     {
00798         ExprState  *keyexpr = (ExprState *) lfirst(hk);
00799         Datum       keyval;
00800         bool        isNull;
00801 
00802         /* rotate hashkey left 1 bit at each step */
00803         hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
00804 
00805         /*
00806          * Get the join attribute value of the tuple
00807          */
00808         keyval = ExecEvalExpr(keyexpr, econtext, &isNull, NULL);
00809 
00810         /*
00811          * If the attribute is NULL, and the join operator is strict, then
00812          * this tuple cannot pass the join qual so we can reject it
00813          * immediately (unless we're scanning the outside of an outer join, in
00814          * which case we must not reject it).  Otherwise we act like the
00815          * hashcode of NULL is zero (this will support operators that act like
00816          * IS NOT DISTINCT, though not any more-random behavior).  We treat
00817          * the hash support function as strict even if the operator is not.
00818          *
00819          * Note: currently, all hashjoinable operators must be strict since
00820          * the hash index AM assumes that.  However, it takes so little extra
00821          * code here to allow non-strict that we may as well do it.
00822          */
00823         if (isNull)
00824         {
00825             if (hashtable->hashStrict[i] && !keep_nulls)
00826             {
00827                 MemoryContextSwitchTo(oldContext);
00828                 return false;   /* cannot match */
00829             }
00830             /* else, leave hashkey unmodified, equivalent to hashcode 0 */
00831         }
00832         else
00833         {
00834             /* Compute the hash function */
00835             uint32      hkey;
00836 
00837             hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], keyval));
00838             hashkey ^= hkey;
00839         }
00840 
00841         i++;
00842     }
00843 
00844     MemoryContextSwitchTo(oldContext);
00845 
00846     *hashvalue = hashkey;
00847     return true;
00848 }
00849 
00850 /*
00851  * ExecHashGetBucketAndBatch
00852  *      Determine the bucket number and batch number for a hash value
00853  *
00854  * Note: on-the-fly increases of nbatch must not change the bucket number
00855  * for a given hash code (since we don't move tuples to different hash
00856  * chains), and must only cause the batch number to remain the same or
00857  * increase.  Our algorithm is
00858  *      bucketno = hashvalue MOD nbuckets
00859  *      batchno = (hashvalue DIV nbuckets) MOD nbatch
00860  * where nbuckets and nbatch are both expected to be powers of 2, so we can
00861  * do the computations by shifting and masking.  (This assumes that all hash
00862  * functions are good about randomizing all their output bits, else we are
00863  * likely to have very skewed bucket or batch occupancy.)
00864  *
00865  * nbuckets doesn't change over the course of the join.
00866  *
00867  * nbatch is always a power of 2; we increase it only by doubling it.  This
00868  * effectively adds one more bit to the top of the batchno.
00869  */
00870 void
00871 ExecHashGetBucketAndBatch(HashJoinTable hashtable,
00872                           uint32 hashvalue,
00873                           int *bucketno,
00874                           int *batchno)
00875 {
00876     uint32      nbuckets = (uint32) hashtable->nbuckets;
00877     uint32      nbatch = (uint32) hashtable->nbatch;
00878 
00879     if (nbatch > 1)
00880     {
00881         /* we can do MOD by masking, DIV by shifting */
00882         *bucketno = hashvalue & (nbuckets - 1);
00883         *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
00884     }
00885     else
00886     {
00887         *bucketno = hashvalue & (nbuckets - 1);
00888         *batchno = 0;
00889     }
00890 }
00891 
00892 /*
00893  * ExecScanHashBucket
00894  *      scan a hash bucket for matches to the current outer tuple
00895  *
00896  * The current outer tuple must be stored in econtext->ecxt_outertuple.
00897  *
00898  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
00899  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
00900  * for the latter.
00901  */
00902 bool
00903 ExecScanHashBucket(HashJoinState *hjstate,
00904                    ExprContext *econtext)
00905 {
00906     List       *hjclauses = hjstate->hashclauses;
00907     HashJoinTable hashtable = hjstate->hj_HashTable;
00908     HashJoinTuple hashTuple = hjstate->hj_CurTuple;
00909     uint32      hashvalue = hjstate->hj_CurHashValue;
00910 
00911     /*
00912      * hj_CurTuple is the address of the tuple last returned from the current
00913      * bucket, or NULL if it's time to start scanning a new bucket.
00914      *
00915      * If the tuple hashed to a skew bucket then scan the skew bucket
00916      * otherwise scan the standard hashtable bucket.
00917      */
00918     if (hashTuple != NULL)
00919         hashTuple = hashTuple->next;
00920     else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
00921         hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
00922     else
00923         hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
00924 
00925     while (hashTuple != NULL)
00926     {
00927         if (hashTuple->hashvalue == hashvalue)
00928         {
00929             TupleTableSlot *inntuple;
00930 
00931             /* insert hashtable's tuple into exec slot so ExecQual sees it */
00932             inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
00933                                              hjstate->hj_HashTupleSlot,
00934                                              false);    /* do not pfree */
00935             econtext->ecxt_innertuple = inntuple;
00936 
00937             /* reset temp memory each time to avoid leaks from qual expr */
00938             ResetExprContext(econtext);
00939 
00940             if (ExecQual(hjclauses, econtext, false))
00941             {
00942                 hjstate->hj_CurTuple = hashTuple;
00943                 return true;
00944             }
00945         }
00946 
00947         hashTuple = hashTuple->next;
00948     }
00949 
00950     /*
00951      * no match
00952      */
00953     return false;
00954 }
00955 
00956 /*
00957  * ExecPrepHashTableForUnmatched
00958  *      set up for a series of ExecScanHashTableForUnmatched calls
00959  */
00960 void
00961 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
00962 {
00963     /*
00964      * ---------- During this scan we use the HashJoinState fields as follows:
00965      *
00966      * hj_CurBucketNo: next regular bucket to scan hj_CurSkewBucketNo: next
00967      * skew bucket (an index into skewBucketNums) hj_CurTuple: last tuple
00968      * returned, or NULL to start next bucket ----------
00969      */
00970     hjstate->hj_CurBucketNo = 0;
00971     hjstate->hj_CurSkewBucketNo = 0;
00972     hjstate->hj_CurTuple = NULL;
00973 }
00974 
00975 /*
00976  * ExecScanHashTableForUnmatched
00977  *      scan the hash table for unmatched inner tuples
00978  *
00979  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
00980  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
00981  * for the latter.
00982  */
00983 bool
00984 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
00985 {
00986     HashJoinTable hashtable = hjstate->hj_HashTable;
00987     HashJoinTuple hashTuple = hjstate->hj_CurTuple;
00988 
00989     for (;;)
00990     {
00991         /*
00992          * hj_CurTuple is the address of the tuple last returned from the
00993          * current bucket, or NULL if it's time to start scanning a new
00994          * bucket.
00995          */
00996         if (hashTuple != NULL)
00997             hashTuple = hashTuple->next;
00998         else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
00999         {
01000             hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
01001             hjstate->hj_CurBucketNo++;
01002         }
01003         else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
01004         {
01005             int         j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
01006 
01007             hashTuple = hashtable->skewBucket[j]->tuples;
01008             hjstate->hj_CurSkewBucketNo++;
01009         }
01010         else
01011             break;              /* finished all buckets */
01012 
01013         while (hashTuple != NULL)
01014         {
01015             if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
01016             {
01017                 TupleTableSlot *inntuple;
01018 
01019                 /* insert hashtable's tuple into exec slot */
01020                 inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
01021                                                  hjstate->hj_HashTupleSlot,
01022                                                  false);        /* do not pfree */
01023                 econtext->ecxt_innertuple = inntuple;
01024 
01025                 /*
01026                  * Reset temp memory each time; although this function doesn't
01027                  * do any qual eval, the caller will, so let's keep it
01028                  * parallel to ExecScanHashBucket.
01029                  */
01030                 ResetExprContext(econtext);
01031 
01032                 hjstate->hj_CurTuple = hashTuple;
01033                 return true;
01034             }
01035 
01036             hashTuple = hashTuple->next;
01037         }
01038     }
01039 
01040     /*
01041      * no more unmatched tuples
01042      */
01043     return false;
01044 }
01045 
01046 /*
01047  * ExecHashTableReset
01048  *
01049  *      reset hash table header for new batch
01050  */
01051 void
01052 ExecHashTableReset(HashJoinTable hashtable)
01053 {
01054     MemoryContext oldcxt;
01055     int         nbuckets = hashtable->nbuckets;
01056 
01057     /*
01058      * Release all the hash buckets and tuples acquired in the prior pass, and
01059      * reinitialize the context for a new pass.
01060      */
01061     MemoryContextReset(hashtable->batchCxt);
01062     oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
01063 
01064     /* Reallocate and reinitialize the hash bucket headers. */
01065     hashtable->buckets = (HashJoinTuple *)
01066         palloc0(nbuckets * sizeof(HashJoinTuple));
01067 
01068     hashtable->spaceUsed = 0;
01069 
01070     MemoryContextSwitchTo(oldcxt);
01071 }
01072 
01073 /*
01074  * ExecHashTableResetMatchFlags
01075  *      Clear all the HeapTupleHeaderHasMatch flags in the table
01076  */
01077 void
01078 ExecHashTableResetMatchFlags(HashJoinTable hashtable)
01079 {
01080     HashJoinTuple tuple;
01081     int         i;
01082 
01083     /* Reset all flags in the main table ... */
01084     for (i = 0; i < hashtable->nbuckets; i++)
01085     {
01086         for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next)
01087             HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
01088     }
01089 
01090     /* ... and the same for the skew buckets, if any */
01091     for (i = 0; i < hashtable->nSkewBuckets; i++)
01092     {
01093         int         j = hashtable->skewBucketNums[i];
01094         HashSkewBucket *skewBucket = hashtable->skewBucket[j];
01095 
01096         for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next)
01097             HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
01098     }
01099 }
01100 
01101 
01102 void
01103 ExecReScanHash(HashState *node)
01104 {
01105     /*
01106      * if chgParam of subnode is not null then plan will be re-scanned by
01107      * first ExecProcNode.
01108      */
01109     if (node->ps.lefttree->chgParam == NULL)
01110         ExecReScan(node->ps.lefttree);
01111 }
01112 
01113 
01114 /*
01115  * ExecHashBuildSkewHash
01116  *
01117  *      Set up for skew optimization if we can identify the most common values
01118  *      (MCVs) of the outer relation's join key.  We make a skew hash bucket
01119  *      for the hash value of each MCV, up to the number of slots allowed
01120  *      based on available memory.
01121  */
01122 static void
01123 ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
01124 {
01125     HeapTupleData *statsTuple;
01126     Datum      *values;
01127     int         nvalues;
01128     float4     *numbers;
01129     int         nnumbers;
01130 
01131     /* Do nothing if planner didn't identify the outer relation's join key */
01132     if (!OidIsValid(node->skewTable))
01133         return;
01134     /* Also, do nothing if we don't have room for at least one skew bucket */
01135     if (mcvsToUse <= 0)
01136         return;
01137 
01138     /*
01139      * Try to find the MCV statistics for the outer relation's join key.
01140      */
01141     statsTuple = SearchSysCache3(STATRELATTINH,
01142                                  ObjectIdGetDatum(node->skewTable),
01143                                  Int16GetDatum(node->skewColumn),
01144                                  BoolGetDatum(node->skewInherit));
01145     if (!HeapTupleIsValid(statsTuple))
01146         return;
01147 
01148     if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod,
01149                          STATISTIC_KIND_MCV, InvalidOid,
01150                          NULL,
01151                          &values, &nvalues,
01152                          &numbers, &nnumbers))
01153     {
01154         double      frac;
01155         int         nbuckets;
01156         FmgrInfo   *hashfunctions;
01157         int         i;
01158 
01159         if (mcvsToUse > nvalues)
01160             mcvsToUse = nvalues;
01161 
01162         /*
01163          * Calculate the expected fraction of outer relation that will
01164          * participate in the skew optimization.  If this isn't at least
01165          * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
01166          */
01167         frac = 0;
01168         for (i = 0; i < mcvsToUse; i++)
01169             frac += numbers[i];
01170         if (frac < SKEW_MIN_OUTER_FRACTION)
01171         {
01172             free_attstatsslot(node->skewColType,
01173                               values, nvalues, numbers, nnumbers);
01174             ReleaseSysCache(statsTuple);
01175             return;
01176         }
01177 
01178         /*
01179          * Okay, set up the skew hashtable.
01180          *
01181          * skewBucket[] is an open addressing hashtable with a power of 2 size
01182          * that is greater than the number of MCV values.  (This ensures there
01183          * will be at least one null entry, so searches will always
01184          * terminate.)
01185          *
01186          * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or
01187          * MaxAllocSize/sizeof(void *)/8, but that is not currently possible
01188          * since we limit pg_statistic entries to much less than that.
01189          */
01190         nbuckets = 2;
01191         while (nbuckets <= mcvsToUse)
01192             nbuckets <<= 1;
01193         /* use two more bits just to help avoid collisions */
01194         nbuckets <<= 2;
01195 
01196         hashtable->skewEnabled = true;
01197         hashtable->skewBucketLen = nbuckets;
01198 
01199         /*
01200          * We allocate the bucket memory in the hashtable's batch context. It
01201          * is only needed during the first batch, and this ensures it will be
01202          * automatically removed once the first batch is done.
01203          */
01204         hashtable->skewBucket = (HashSkewBucket **)
01205             MemoryContextAllocZero(hashtable->batchCxt,
01206                                    nbuckets * sizeof(HashSkewBucket *));
01207         hashtable->skewBucketNums = (int *)
01208             MemoryContextAllocZero(hashtable->batchCxt,
01209                                    mcvsToUse * sizeof(int));
01210 
01211         hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
01212             + mcvsToUse * sizeof(int);
01213         hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
01214             + mcvsToUse * sizeof(int);
01215         if (hashtable->spaceUsed > hashtable->spacePeak)
01216             hashtable->spacePeak = hashtable->spaceUsed;
01217 
01218         /*
01219          * Create a skew bucket for each MCV hash value.
01220          *
01221          * Note: it is very important that we create the buckets in order of
01222          * decreasing MCV frequency.  If we have to remove some buckets, they
01223          * must be removed in reverse order of creation (see notes in
01224          * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to
01225          * be removed first.
01226          */
01227         hashfunctions = hashtable->outer_hashfunctions;
01228 
01229         for (i = 0; i < mcvsToUse; i++)
01230         {
01231             uint32      hashvalue;
01232             int         bucket;
01233 
01234             hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
01235                                                      values[i]));
01236 
01237             /*
01238              * While we have not hit a hole in the hashtable and have not hit
01239              * the desired bucket, we have collided with some previous hash
01240              * value, so try the next bucket location.  NB: this code must
01241              * match ExecHashGetSkewBucket.
01242              */
01243             bucket = hashvalue & (nbuckets - 1);
01244             while (hashtable->skewBucket[bucket] != NULL &&
01245                    hashtable->skewBucket[bucket]->hashvalue != hashvalue)
01246                 bucket = (bucket + 1) & (nbuckets - 1);
01247 
01248             /*
01249              * If we found an existing bucket with the same hashvalue, leave
01250              * it alone.  It's okay for two MCVs to share a hashvalue.
01251              */
01252             if (hashtable->skewBucket[bucket] != NULL)
01253                 continue;
01254 
01255             /* Okay, create a new skew bucket for this hashvalue. */
01256             hashtable->skewBucket[bucket] = (HashSkewBucket *)
01257                 MemoryContextAlloc(hashtable->batchCxt,
01258                                    sizeof(HashSkewBucket));
01259             hashtable->skewBucket[bucket]->hashvalue = hashvalue;
01260             hashtable->skewBucket[bucket]->tuples = NULL;
01261             hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
01262             hashtable->nSkewBuckets++;
01263             hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
01264             hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
01265             if (hashtable->spaceUsed > hashtable->spacePeak)
01266                 hashtable->spacePeak = hashtable->spaceUsed;
01267         }
01268 
01269         free_attstatsslot(node->skewColType,
01270                           values, nvalues, numbers, nnumbers);
01271     }
01272 
01273     ReleaseSysCache(statsTuple);
01274 }
01275 
01276 /*
01277  * ExecHashGetSkewBucket
01278  *
01279  *      Returns the index of the skew bucket for this hashvalue,
01280  *      or INVALID_SKEW_BUCKET_NO if the hashvalue is not
01281  *      associated with any active skew bucket.
01282  */
01283 int
01284 ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
01285 {
01286     int         bucket;
01287 
01288     /*
01289      * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
01290      * particular, this happens after the initial batch is done).
01291      */
01292     if (!hashtable->skewEnabled)
01293         return INVALID_SKEW_BUCKET_NO;
01294 
01295     /*
01296      * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
01297      */
01298     bucket = hashvalue & (hashtable->skewBucketLen - 1);
01299 
01300     /*
01301      * While we have not hit a hole in the hashtable and have not hit the
01302      * desired bucket, we have collided with some other hash value, so try the
01303      * next bucket location.
01304      */
01305     while (hashtable->skewBucket[bucket] != NULL &&
01306            hashtable->skewBucket[bucket]->hashvalue != hashvalue)
01307         bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
01308 
01309     /*
01310      * Found the desired bucket?
01311      */
01312     if (hashtable->skewBucket[bucket] != NULL)
01313         return bucket;
01314 
01315     /*
01316      * There must not be any hashtable entry for this hash value.
01317      */
01318     return INVALID_SKEW_BUCKET_NO;
01319 }
01320 
01321 /*
01322  * ExecHashSkewTableInsert
01323  *
01324  *      Insert a tuple into the skew hashtable.
01325  *
01326  * This should generally match up with the current-batch case in
01327  * ExecHashTableInsert.
01328  */
01329 static void
01330 ExecHashSkewTableInsert(HashJoinTable hashtable,
01331                         TupleTableSlot *slot,
01332                         uint32 hashvalue,
01333                         int bucketNumber)
01334 {
01335     MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
01336     HashJoinTuple hashTuple;
01337     int         hashTupleSize;
01338 
01339     /* Create the HashJoinTuple */
01340     hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
01341     hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
01342                                                    hashTupleSize);
01343     hashTuple->hashvalue = hashvalue;
01344     memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
01345     HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
01346 
01347     /* Push it onto the front of the skew bucket's list */
01348     hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
01349     hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
01350 
01351     /* Account for space used, and back off if we've used too much */
01352     hashtable->spaceUsed += hashTupleSize;
01353     hashtable->spaceUsedSkew += hashTupleSize;
01354     if (hashtable->spaceUsed > hashtable->spacePeak)
01355         hashtable->spacePeak = hashtable->spaceUsed;
01356     while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
01357         ExecHashRemoveNextSkewBucket(hashtable);
01358 
01359     /* Check we are not over the total spaceAllowed, either */
01360     if (hashtable->spaceUsed > hashtable->spaceAllowed)
01361         ExecHashIncreaseNumBatches(hashtable);
01362 }
01363 
01364 /*
01365  *      ExecHashRemoveNextSkewBucket
01366  *
01367  *      Remove the least valuable skew bucket by pushing its tuples into
01368  *      the main hash table.
01369  */
01370 static void
01371 ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
01372 {
01373     int         bucketToRemove;
01374     HashSkewBucket *bucket;
01375     uint32      hashvalue;
01376     int         bucketno;
01377     int         batchno;
01378     HashJoinTuple hashTuple;
01379 
01380     /* Locate the bucket to remove */
01381     bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
01382     bucket = hashtable->skewBucket[bucketToRemove];
01383 
01384     /*
01385      * Calculate which bucket and batch the tuples belong to in the main
01386      * hashtable.  They all have the same hash value, so it's the same for all
01387      * of them.  Also note that it's not possible for nbatch to increase while
01388      * we are processing the tuples.
01389      */
01390     hashvalue = bucket->hashvalue;
01391     ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
01392 
01393     /* Process all tuples in the bucket */
01394     hashTuple = bucket->tuples;
01395     while (hashTuple != NULL)
01396     {
01397         HashJoinTuple nextHashTuple = hashTuple->next;
01398         MinimalTuple tuple;
01399         Size        tupleSize;
01400 
01401         /*
01402          * This code must agree with ExecHashTableInsert.  We do not use
01403          * ExecHashTableInsert directly as ExecHashTableInsert expects a
01404          * TupleTableSlot while we already have HashJoinTuples.
01405          */
01406         tuple = HJTUPLE_MINTUPLE(hashTuple);
01407         tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
01408 
01409         /* Decide whether to put the tuple in the hash table or a temp file */
01410         if (batchno == hashtable->curbatch)
01411         {
01412             /* Move the tuple to the main hash table */
01413             hashTuple->next = hashtable->buckets[bucketno];
01414             hashtable->buckets[bucketno] = hashTuple;
01415             /* We have reduced skew space, but overall space doesn't change */
01416             hashtable->spaceUsedSkew -= tupleSize;
01417         }
01418         else
01419         {
01420             /* Put the tuple into a temp file for later batches */
01421             Assert(batchno > hashtable->curbatch);
01422             ExecHashJoinSaveTuple(tuple, hashvalue,
01423                                   &hashtable->innerBatchFile[batchno]);
01424             pfree(hashTuple);
01425             hashtable->spaceUsed -= tupleSize;
01426             hashtable->spaceUsedSkew -= tupleSize;
01427         }
01428 
01429         hashTuple = nextHashTuple;
01430     }
01431 
01432     /*
01433      * Free the bucket struct itself and reset the hashtable entry to NULL.
01434      *
01435      * NOTE: this is not nearly as simple as it looks on the surface, because
01436      * of the possibility of collisions in the hashtable.  Suppose that hash
01437      * values A and B collide at a particular hashtable entry, and that A was
01438      * entered first so B gets shifted to a different table entry.  If we were
01439      * to remove A first then ExecHashGetSkewBucket would mistakenly start
01440      * reporting that B is not in the hashtable, because it would hit the NULL
01441      * before finding B.  However, we always remove entries in the reverse
01442      * order of creation, so this failure cannot happen.
01443      */
01444     hashtable->skewBucket[bucketToRemove] = NULL;
01445     hashtable->nSkewBuckets--;
01446     pfree(bucket);
01447     hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
01448     hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
01449 
01450     /*
01451      * If we have removed all skew buckets then give up on skew optimization.
01452      * Release the arrays since they aren't useful any more.
01453      */
01454     if (hashtable->nSkewBuckets == 0)
01455     {
01456         hashtable->skewEnabled = false;
01457         pfree(hashtable->skewBucket);
01458         pfree(hashtable->skewBucketNums);
01459         hashtable->skewBucket = NULL;
01460         hashtable->skewBucketNums = NULL;
01461         hashtable->spaceUsed -= hashtable->spaceUsedSkew;
01462         hashtable->spaceUsedSkew = 0;
01463     }
01464 }