00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "postgres.h"
00023
00024 #include <math.h>
00025 #include <limits.h>
00026
00027 #include "access/htup_details.h"
00028 #include "catalog/pg_statistic.h"
00029 #include "commands/tablespace.h"
00030 #include "executor/execdebug.h"
00031 #include "executor/hashjoin.h"
00032 #include "executor/nodeHash.h"
00033 #include "executor/nodeHashjoin.h"
00034 #include "miscadmin.h"
00035 #include "utils/dynahash.h"
00036 #include "utils/memutils.h"
00037 #include "utils/lsyscache.h"
00038 #include "utils/syscache.h"
00039
00040
00041 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
00042 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
00043 int mcvsToUse);
00044 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
00045 TupleTableSlot *slot,
00046 uint32 hashvalue,
00047 int bucketNumber);
00048 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
00049
00050
00051
00052
00053
00054
00055
00056
00057 TupleTableSlot *
00058 ExecHash(HashState *node)
00059 {
00060 elog(ERROR, "Hash node does not support ExecProcNode call convention");
00061 return NULL;
00062 }
00063
00064
00065
00066
00067
00068
00069
00070
00071 Node *
00072 MultiExecHash(HashState *node)
00073 {
00074 PlanState *outerNode;
00075 List *hashkeys;
00076 HashJoinTable hashtable;
00077 TupleTableSlot *slot;
00078 ExprContext *econtext;
00079 uint32 hashvalue;
00080
00081
00082 if (node->ps.instrument)
00083 InstrStartNode(node->ps.instrument);
00084
00085
00086
00087
00088 outerNode = outerPlanState(node);
00089 hashtable = node->hashtable;
00090
00091
00092
00093
00094 hashkeys = node->hashkeys;
00095 econtext = node->ps.ps_ExprContext;
00096
00097
00098
00099
00100 for (;;)
00101 {
00102 slot = ExecProcNode(outerNode);
00103 if (TupIsNull(slot))
00104 break;
00105
00106 econtext->ecxt_innertuple = slot;
00107 if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
00108 false, hashtable->keepNulls,
00109 &hashvalue))
00110 {
00111 int bucketNumber;
00112
00113 bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
00114 if (bucketNumber != INVALID_SKEW_BUCKET_NO)
00115 {
00116
00117 ExecHashSkewTableInsert(hashtable, slot, hashvalue,
00118 bucketNumber);
00119 }
00120 else
00121 {
00122
00123 ExecHashTableInsert(hashtable, slot, hashvalue);
00124 }
00125 hashtable->totalTuples += 1;
00126 }
00127 }
00128
00129
00130 if (node->ps.instrument)
00131 InstrStopNode(node->ps.instrument, hashtable->totalTuples);
00132
00133
00134
00135
00136
00137
00138
00139
00140 return NULL;
00141 }
00142
00143
00144
00145
00146
00147
00148
00149 HashState *
00150 ExecInitHash(Hash *node, EState *estate, int eflags)
00151 {
00152 HashState *hashstate;
00153
00154
00155 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
00156
00157
00158
00159
00160 hashstate = makeNode(HashState);
00161 hashstate->ps.plan = (Plan *) node;
00162 hashstate->ps.state = estate;
00163 hashstate->hashtable = NULL;
00164 hashstate->hashkeys = NIL;
00165
00166
00167
00168
00169
00170
00171 ExecAssignExprContext(estate, &hashstate->ps);
00172
00173
00174
00175
00176 ExecInitResultTupleSlot(estate, &hashstate->ps);
00177
00178
00179
00180
00181 hashstate->ps.targetlist = (List *)
00182 ExecInitExpr((Expr *) node->plan.targetlist,
00183 (PlanState *) hashstate);
00184 hashstate->ps.qual = (List *)
00185 ExecInitExpr((Expr *) node->plan.qual,
00186 (PlanState *) hashstate);
00187
00188
00189
00190
00191 outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
00192
00193
00194
00195
00196
00197 ExecAssignResultTypeFromTL(&hashstate->ps);
00198 hashstate->ps.ps_ProjInfo = NULL;
00199
00200 return hashstate;
00201 }
00202
00203
00204
00205
00206
00207
00208
00209 void
00210 ExecEndHash(HashState *node)
00211 {
00212 PlanState *outerPlan;
00213
00214
00215
00216
00217 ExecFreeExprContext(&node->ps);
00218
00219
00220
00221
00222 outerPlan = outerPlanState(node);
00223 ExecEndNode(outerPlan);
00224 }
00225
00226
00227
00228
00229
00230
00231
00232
00233 HashJoinTable
00234 ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
00235 {
00236 HashJoinTable hashtable;
00237 Plan *outerNode;
00238 int nbuckets;
00239 int nbatch;
00240 int num_skew_mcvs;
00241 int log2_nbuckets;
00242 int nkeys;
00243 int i;
00244 ListCell *ho;
00245 MemoryContext oldcxt;
00246
00247
00248
00249
00250
00251
00252 outerNode = outerPlan(node);
00253
00254 ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
00255 OidIsValid(node->skewTable),
00256 &nbuckets, &nbatch, &num_skew_mcvs);
00257
00258 #ifdef HJDEBUG
00259 printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets);
00260 #endif
00261
00262
00263 log2_nbuckets = my_log2(nbuckets);
00264 Assert(nbuckets == (1 << log2_nbuckets));
00265
00266
00267
00268
00269
00270
00271
00272 hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
00273 hashtable->nbuckets = nbuckets;
00274 hashtable->log2_nbuckets = log2_nbuckets;
00275 hashtable->buckets = NULL;
00276 hashtable->keepNulls = keepNulls;
00277 hashtable->skewEnabled = false;
00278 hashtable->skewBucket = NULL;
00279 hashtable->skewBucketLen = 0;
00280 hashtable->nSkewBuckets = 0;
00281 hashtable->skewBucketNums = NULL;
00282 hashtable->nbatch = nbatch;
00283 hashtable->curbatch = 0;
00284 hashtable->nbatch_original = nbatch;
00285 hashtable->nbatch_outstart = nbatch;
00286 hashtable->growEnabled = true;
00287 hashtable->totalTuples = 0;
00288 hashtable->innerBatchFile = NULL;
00289 hashtable->outerBatchFile = NULL;
00290 hashtable->spaceUsed = 0;
00291 hashtable->spacePeak = 0;
00292 hashtable->spaceAllowed = work_mem * 1024L;
00293 hashtable->spaceUsedSkew = 0;
00294 hashtable->spaceAllowedSkew =
00295 hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
00296
00297
00298
00299
00300
00301 nkeys = list_length(hashOperators);
00302 hashtable->outer_hashfunctions =
00303 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
00304 hashtable->inner_hashfunctions =
00305 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
00306 hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
00307 i = 0;
00308 foreach(ho, hashOperators)
00309 {
00310 Oid hashop = lfirst_oid(ho);
00311 Oid left_hashfn;
00312 Oid right_hashfn;
00313
00314 if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
00315 elog(ERROR, "could not find hash function for hash operator %u",
00316 hashop);
00317 fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
00318 fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
00319 hashtable->hashStrict[i] = op_strict(hashop);
00320 i++;
00321 }
00322
00323
00324
00325
00326
00327 hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
00328 "HashTableContext",
00329 ALLOCSET_DEFAULT_MINSIZE,
00330 ALLOCSET_DEFAULT_INITSIZE,
00331 ALLOCSET_DEFAULT_MAXSIZE);
00332
00333 hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
00334 "HashBatchContext",
00335 ALLOCSET_DEFAULT_MINSIZE,
00336 ALLOCSET_DEFAULT_INITSIZE,
00337 ALLOCSET_DEFAULT_MAXSIZE);
00338
00339
00340
00341 oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
00342
00343 if (nbatch > 1)
00344 {
00345
00346
00347
00348 hashtable->innerBatchFile = (BufFile **)
00349 palloc0(nbatch * sizeof(BufFile *));
00350 hashtable->outerBatchFile = (BufFile **)
00351 palloc0(nbatch * sizeof(BufFile *));
00352
00353
00354 PrepareTempTablespaces();
00355 }
00356
00357
00358
00359
00360
00361 MemoryContextSwitchTo(hashtable->batchCxt);
00362
00363 hashtable->buckets = (HashJoinTuple *)
00364 palloc0(nbuckets * sizeof(HashJoinTuple));
00365
00366
00367
00368
00369
00370 if (nbatch > 1)
00371 ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
00372
00373 MemoryContextSwitchTo(oldcxt);
00374
00375 return hashtable;
00376 }
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387 #define NTUP_PER_BUCKET 10
00388
00389 void
00390 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
00391 int *numbuckets,
00392 int *numbatches,
00393 int *num_skew_mcvs)
00394 {
00395 int tupsize;
00396 double inner_rel_bytes;
00397 long hash_table_bytes;
00398 long skew_table_bytes;
00399 long max_pointers;
00400 int nbatch;
00401 int nbuckets;
00402 int i;
00403
00404
00405 if (ntuples <= 0.0)
00406 ntuples = 1000.0;
00407
00408
00409
00410
00411
00412
00413 tupsize = HJTUPLE_OVERHEAD +
00414 MAXALIGN(sizeof(MinimalTupleData)) +
00415 MAXALIGN(tupwidth);
00416 inner_rel_bytes = ntuples * tupsize;
00417
00418
00419
00420
00421 hash_table_bytes = work_mem * 1024L;
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437 if (useskew)
00438 {
00439 skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449 *num_skew_mcvs = skew_table_bytes / (tupsize +
00450 (8 * sizeof(HashSkewBucket *)) +
00451 sizeof(int) +
00452 SKEW_BUCKET_OVERHEAD);
00453 if (*num_skew_mcvs > 0)
00454 hash_table_bytes -= skew_table_bytes;
00455 }
00456 else
00457 *num_skew_mcvs = 0;
00458
00459
00460
00461
00462
00463
00464
00465 max_pointers = (work_mem * 1024L) / sizeof(void *);
00466
00467 max_pointers = Min(max_pointers, INT_MAX / 2);
00468
00469 if (inner_rel_bytes > hash_table_bytes)
00470 {
00471
00472 long lbuckets;
00473 double dbatch;
00474 int minbatch;
00475
00476 lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET;
00477 lbuckets = Min(lbuckets, max_pointers);
00478 nbuckets = (int) lbuckets;
00479
00480 dbatch = ceil(inner_rel_bytes / hash_table_bytes);
00481 dbatch = Min(dbatch, max_pointers);
00482 minbatch = (int) dbatch;
00483 nbatch = 2;
00484 while (nbatch < minbatch)
00485 nbatch <<= 1;
00486 }
00487 else
00488 {
00489
00490 double dbuckets;
00491
00492 dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
00493 dbuckets = Min(dbuckets, max_pointers);
00494 nbuckets = (int) dbuckets;
00495
00496 nbatch = 1;
00497 }
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507 i = 10;
00508 while ((1 << i) < nbuckets)
00509 i++;
00510 nbuckets = (1 << i);
00511
00512 *numbuckets = nbuckets;
00513 *numbatches = nbatch;
00514 }
00515
00516
00517
00518
00519
00520
00521
00522
00523 void
00524 ExecHashTableDestroy(HashJoinTable hashtable)
00525 {
00526 int i;
00527
00528
00529
00530
00531
00532
00533 for (i = 1; i < hashtable->nbatch; i++)
00534 {
00535 if (hashtable->innerBatchFile[i])
00536 BufFileClose(hashtable->innerBatchFile[i]);
00537 if (hashtable->outerBatchFile[i])
00538 BufFileClose(hashtable->outerBatchFile[i]);
00539 }
00540
00541
00542 MemoryContextDelete(hashtable->hashCxt);
00543
00544
00545 pfree(hashtable);
00546 }
00547
00548
00549
00550
00551
00552
00553 static void
00554 ExecHashIncreaseNumBatches(HashJoinTable hashtable)
00555 {
00556 int oldnbatch = hashtable->nbatch;
00557 int curbatch = hashtable->curbatch;
00558 int nbatch;
00559 int i;
00560 MemoryContext oldcxt;
00561 long ninmemory;
00562 long nfreed;
00563
00564
00565 if (!hashtable->growEnabled)
00566 return;
00567
00568
00569 if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
00570 return;
00571
00572 nbatch = oldnbatch * 2;
00573 Assert(nbatch > 1);
00574
00575 #ifdef HJDEBUG
00576 printf("Increasing nbatch to %d because space = %lu\n",
00577 nbatch, (unsigned long) hashtable->spaceUsed);
00578 #endif
00579
00580 oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
00581
00582 if (hashtable->innerBatchFile == NULL)
00583 {
00584
00585 hashtable->innerBatchFile = (BufFile **)
00586 palloc0(nbatch * sizeof(BufFile *));
00587 hashtable->outerBatchFile = (BufFile **)
00588 palloc0(nbatch * sizeof(BufFile *));
00589
00590 PrepareTempTablespaces();
00591 }
00592 else
00593 {
00594
00595 hashtable->innerBatchFile = (BufFile **)
00596 repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
00597 hashtable->outerBatchFile = (BufFile **)
00598 repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
00599 MemSet(hashtable->innerBatchFile + oldnbatch, 0,
00600 (nbatch - oldnbatch) * sizeof(BufFile *));
00601 MemSet(hashtable->outerBatchFile + oldnbatch, 0,
00602 (nbatch - oldnbatch) * sizeof(BufFile *));
00603 }
00604
00605 MemoryContextSwitchTo(oldcxt);
00606
00607 hashtable->nbatch = nbatch;
00608
00609
00610
00611
00612
00613 ninmemory = nfreed = 0;
00614
00615 for (i = 0; i < hashtable->nbuckets; i++)
00616 {
00617 HashJoinTuple prevtuple;
00618 HashJoinTuple tuple;
00619
00620 prevtuple = NULL;
00621 tuple = hashtable->buckets[i];
00622
00623 while (tuple != NULL)
00624 {
00625
00626 HashJoinTuple nexttuple = tuple->next;
00627 int bucketno;
00628 int batchno;
00629
00630 ninmemory++;
00631 ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
00632 &bucketno, &batchno);
00633 Assert(bucketno == i);
00634 if (batchno == curbatch)
00635 {
00636
00637 prevtuple = tuple;
00638 }
00639 else
00640 {
00641
00642 Assert(batchno > curbatch);
00643 ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(tuple),
00644 tuple->hashvalue,
00645 &hashtable->innerBatchFile[batchno]);
00646
00647 if (prevtuple)
00648 prevtuple->next = nexttuple;
00649 else
00650 hashtable->buckets[i] = nexttuple;
00651
00652 hashtable->spaceUsed -=
00653 HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(tuple)->t_len;
00654 pfree(tuple);
00655 nfreed++;
00656 }
00657
00658 tuple = nexttuple;
00659 }
00660 }
00661
00662 #ifdef HJDEBUG
00663 printf("Freed %ld of %ld tuples, space now %lu\n",
00664 nfreed, ninmemory, (unsigned long) hashtable->spaceUsed);
00665 #endif
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675 if (nfreed == 0 || nfreed == ninmemory)
00676 {
00677 hashtable->growEnabled = false;
00678 #ifdef HJDEBUG
00679 printf("Disabling further increase of nbatch\n");
00680 #endif
00681 }
00682 }
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695 void
00696 ExecHashTableInsert(HashJoinTable hashtable,
00697 TupleTableSlot *slot,
00698 uint32 hashvalue)
00699 {
00700 MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
00701 int bucketno;
00702 int batchno;
00703
00704 ExecHashGetBucketAndBatch(hashtable, hashvalue,
00705 &bucketno, &batchno);
00706
00707
00708
00709
00710 if (batchno == hashtable->curbatch)
00711 {
00712
00713
00714
00715 HashJoinTuple hashTuple;
00716 int hashTupleSize;
00717
00718
00719 hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
00720 hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
00721 hashTupleSize);
00722 hashTuple->hashvalue = hashvalue;
00723 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
00724
00725
00726
00727
00728
00729
00730
00731 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
00732
00733
00734 hashTuple->next = hashtable->buckets[bucketno];
00735 hashtable->buckets[bucketno] = hashTuple;
00736
00737
00738 hashtable->spaceUsed += hashTupleSize;
00739 if (hashtable->spaceUsed > hashtable->spacePeak)
00740 hashtable->spacePeak = hashtable->spaceUsed;
00741 if (hashtable->spaceUsed > hashtable->spaceAllowed)
00742 ExecHashIncreaseNumBatches(hashtable);
00743 }
00744 else
00745 {
00746
00747
00748
00749 Assert(batchno > hashtable->curbatch);
00750 ExecHashJoinSaveTuple(tuple,
00751 hashvalue,
00752 &hashtable->innerBatchFile[batchno]);
00753 }
00754 }
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769 bool
00770 ExecHashGetHashValue(HashJoinTable hashtable,
00771 ExprContext *econtext,
00772 List *hashkeys,
00773 bool outer_tuple,
00774 bool keep_nulls,
00775 uint32 *hashvalue)
00776 {
00777 uint32 hashkey = 0;
00778 FmgrInfo *hashfunctions;
00779 ListCell *hk;
00780 int i = 0;
00781 MemoryContext oldContext;
00782
00783
00784
00785
00786
00787 ResetExprContext(econtext);
00788
00789 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
00790
00791 if (outer_tuple)
00792 hashfunctions = hashtable->outer_hashfunctions;
00793 else
00794 hashfunctions = hashtable->inner_hashfunctions;
00795
00796 foreach(hk, hashkeys)
00797 {
00798 ExprState *keyexpr = (ExprState *) lfirst(hk);
00799 Datum keyval;
00800 bool isNull;
00801
00802
00803 hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
00804
00805
00806
00807
00808 keyval = ExecEvalExpr(keyexpr, econtext, &isNull, NULL);
00809
00810
00811
00812
00813
00814
00815
00816
00817
00818
00819
00820
00821
00822
00823 if (isNull)
00824 {
00825 if (hashtable->hashStrict[i] && !keep_nulls)
00826 {
00827 MemoryContextSwitchTo(oldContext);
00828 return false;
00829 }
00830
00831 }
00832 else
00833 {
00834
00835 uint32 hkey;
00836
00837 hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], keyval));
00838 hashkey ^= hkey;
00839 }
00840
00841 i++;
00842 }
00843
00844 MemoryContextSwitchTo(oldContext);
00845
00846 *hashvalue = hashkey;
00847 return true;
00848 }
00849
00850
00851
00852
00853
00854
00855
00856
00857
00858
00859
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870 void
00871 ExecHashGetBucketAndBatch(HashJoinTable hashtable,
00872 uint32 hashvalue,
00873 int *bucketno,
00874 int *batchno)
00875 {
00876 uint32 nbuckets = (uint32) hashtable->nbuckets;
00877 uint32 nbatch = (uint32) hashtable->nbatch;
00878
00879 if (nbatch > 1)
00880 {
00881
00882 *bucketno = hashvalue & (nbuckets - 1);
00883 *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
00884 }
00885 else
00886 {
00887 *bucketno = hashvalue & (nbuckets - 1);
00888 *batchno = 0;
00889 }
00890 }
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900
00901
00902 bool
00903 ExecScanHashBucket(HashJoinState *hjstate,
00904 ExprContext *econtext)
00905 {
00906 List *hjclauses = hjstate->hashclauses;
00907 HashJoinTable hashtable = hjstate->hj_HashTable;
00908 HashJoinTuple hashTuple = hjstate->hj_CurTuple;
00909 uint32 hashvalue = hjstate->hj_CurHashValue;
00910
00911
00912
00913
00914
00915
00916
00917
00918 if (hashTuple != NULL)
00919 hashTuple = hashTuple->next;
00920 else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
00921 hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
00922 else
00923 hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
00924
00925 while (hashTuple != NULL)
00926 {
00927 if (hashTuple->hashvalue == hashvalue)
00928 {
00929 TupleTableSlot *inntuple;
00930
00931
00932 inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
00933 hjstate->hj_HashTupleSlot,
00934 false);
00935 econtext->ecxt_innertuple = inntuple;
00936
00937
00938 ResetExprContext(econtext);
00939
00940 if (ExecQual(hjclauses, econtext, false))
00941 {
00942 hjstate->hj_CurTuple = hashTuple;
00943 return true;
00944 }
00945 }
00946
00947 hashTuple = hashTuple->next;
00948 }
00949
00950
00951
00952
00953 return false;
00954 }
00955
00956
00957
00958
00959
00960 void
00961 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
00962 {
00963
00964
00965
00966
00967
00968
00969
00970 hjstate->hj_CurBucketNo = 0;
00971 hjstate->hj_CurSkewBucketNo = 0;
00972 hjstate->hj_CurTuple = NULL;
00973 }
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983 bool
00984 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
00985 {
00986 HashJoinTable hashtable = hjstate->hj_HashTable;
00987 HashJoinTuple hashTuple = hjstate->hj_CurTuple;
00988
00989 for (;;)
00990 {
00991
00992
00993
00994
00995
00996 if (hashTuple != NULL)
00997 hashTuple = hashTuple->next;
00998 else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
00999 {
01000 hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
01001 hjstate->hj_CurBucketNo++;
01002 }
01003 else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
01004 {
01005 int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
01006
01007 hashTuple = hashtable->skewBucket[j]->tuples;
01008 hjstate->hj_CurSkewBucketNo++;
01009 }
01010 else
01011 break;
01012
01013 while (hashTuple != NULL)
01014 {
01015 if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
01016 {
01017 TupleTableSlot *inntuple;
01018
01019
01020 inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
01021 hjstate->hj_HashTupleSlot,
01022 false);
01023 econtext->ecxt_innertuple = inntuple;
01024
01025
01026
01027
01028
01029
01030 ResetExprContext(econtext);
01031
01032 hjstate->hj_CurTuple = hashTuple;
01033 return true;
01034 }
01035
01036 hashTuple = hashTuple->next;
01037 }
01038 }
01039
01040
01041
01042
01043 return false;
01044 }
01045
01046
01047
01048
01049
01050
01051 void
01052 ExecHashTableReset(HashJoinTable hashtable)
01053 {
01054 MemoryContext oldcxt;
01055 int nbuckets = hashtable->nbuckets;
01056
01057
01058
01059
01060
01061 MemoryContextReset(hashtable->batchCxt);
01062 oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
01063
01064
01065 hashtable->buckets = (HashJoinTuple *)
01066 palloc0(nbuckets * sizeof(HashJoinTuple));
01067
01068 hashtable->spaceUsed = 0;
01069
01070 MemoryContextSwitchTo(oldcxt);
01071 }
01072
01073
01074
01075
01076
01077 void
01078 ExecHashTableResetMatchFlags(HashJoinTable hashtable)
01079 {
01080 HashJoinTuple tuple;
01081 int i;
01082
01083
01084 for (i = 0; i < hashtable->nbuckets; i++)
01085 {
01086 for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next)
01087 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
01088 }
01089
01090
01091 for (i = 0; i < hashtable->nSkewBuckets; i++)
01092 {
01093 int j = hashtable->skewBucketNums[i];
01094 HashSkewBucket *skewBucket = hashtable->skewBucket[j];
01095
01096 for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next)
01097 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
01098 }
01099 }
01100
01101
01102 void
01103 ExecReScanHash(HashState *node)
01104 {
01105
01106
01107
01108
01109 if (node->ps.lefttree->chgParam == NULL)
01110 ExecReScan(node->ps.lefttree);
01111 }
01112
01113
01114
01115
01116
01117
01118
01119
01120
01121
01122 static void
01123 ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
01124 {
01125 HeapTupleData *statsTuple;
01126 Datum *values;
01127 int nvalues;
01128 float4 *numbers;
01129 int nnumbers;
01130
01131
01132 if (!OidIsValid(node->skewTable))
01133 return;
01134
01135 if (mcvsToUse <= 0)
01136 return;
01137
01138
01139
01140
01141 statsTuple = SearchSysCache3(STATRELATTINH,
01142 ObjectIdGetDatum(node->skewTable),
01143 Int16GetDatum(node->skewColumn),
01144 BoolGetDatum(node->skewInherit));
01145 if (!HeapTupleIsValid(statsTuple))
01146 return;
01147
01148 if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod,
01149 STATISTIC_KIND_MCV, InvalidOid,
01150 NULL,
01151 &values, &nvalues,
01152 &numbers, &nnumbers))
01153 {
01154 double frac;
01155 int nbuckets;
01156 FmgrInfo *hashfunctions;
01157 int i;
01158
01159 if (mcvsToUse > nvalues)
01160 mcvsToUse = nvalues;
01161
01162
01163
01164
01165
01166
01167 frac = 0;
01168 for (i = 0; i < mcvsToUse; i++)
01169 frac += numbers[i];
01170 if (frac < SKEW_MIN_OUTER_FRACTION)
01171 {
01172 free_attstatsslot(node->skewColType,
01173 values, nvalues, numbers, nnumbers);
01174 ReleaseSysCache(statsTuple);
01175 return;
01176 }
01177
01178
01179
01180
01181
01182
01183
01184
01185
01186
01187
01188
01189
01190 nbuckets = 2;
01191 while (nbuckets <= mcvsToUse)
01192 nbuckets <<= 1;
01193
01194 nbuckets <<= 2;
01195
01196 hashtable->skewEnabled = true;
01197 hashtable->skewBucketLen = nbuckets;
01198
01199
01200
01201
01202
01203
01204 hashtable->skewBucket = (HashSkewBucket **)
01205 MemoryContextAllocZero(hashtable->batchCxt,
01206 nbuckets * sizeof(HashSkewBucket *));
01207 hashtable->skewBucketNums = (int *)
01208 MemoryContextAllocZero(hashtable->batchCxt,
01209 mcvsToUse * sizeof(int));
01210
01211 hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
01212 + mcvsToUse * sizeof(int);
01213 hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
01214 + mcvsToUse * sizeof(int);
01215 if (hashtable->spaceUsed > hashtable->spacePeak)
01216 hashtable->spacePeak = hashtable->spaceUsed;
01217
01218
01219
01220
01221
01222
01223
01224
01225
01226
01227 hashfunctions = hashtable->outer_hashfunctions;
01228
01229 for (i = 0; i < mcvsToUse; i++)
01230 {
01231 uint32 hashvalue;
01232 int bucket;
01233
01234 hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
01235 values[i]));
01236
01237
01238
01239
01240
01241
01242
01243 bucket = hashvalue & (nbuckets - 1);
01244 while (hashtable->skewBucket[bucket] != NULL &&
01245 hashtable->skewBucket[bucket]->hashvalue != hashvalue)
01246 bucket = (bucket + 1) & (nbuckets - 1);
01247
01248
01249
01250
01251
01252 if (hashtable->skewBucket[bucket] != NULL)
01253 continue;
01254
01255
01256 hashtable->skewBucket[bucket] = (HashSkewBucket *)
01257 MemoryContextAlloc(hashtable->batchCxt,
01258 sizeof(HashSkewBucket));
01259 hashtable->skewBucket[bucket]->hashvalue = hashvalue;
01260 hashtable->skewBucket[bucket]->tuples = NULL;
01261 hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
01262 hashtable->nSkewBuckets++;
01263 hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
01264 hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
01265 if (hashtable->spaceUsed > hashtable->spacePeak)
01266 hashtable->spacePeak = hashtable->spaceUsed;
01267 }
01268
01269 free_attstatsslot(node->skewColType,
01270 values, nvalues, numbers, nnumbers);
01271 }
01272
01273 ReleaseSysCache(statsTuple);
01274 }
01275
01276
01277
01278
01279
01280
01281
01282
01283 int
01284 ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
01285 {
01286 int bucket;
01287
01288
01289
01290
01291
01292 if (!hashtable->skewEnabled)
01293 return INVALID_SKEW_BUCKET_NO;
01294
01295
01296
01297
01298 bucket = hashvalue & (hashtable->skewBucketLen - 1);
01299
01300
01301
01302
01303
01304
01305 while (hashtable->skewBucket[bucket] != NULL &&
01306 hashtable->skewBucket[bucket]->hashvalue != hashvalue)
01307 bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
01308
01309
01310
01311
01312 if (hashtable->skewBucket[bucket] != NULL)
01313 return bucket;
01314
01315
01316
01317
01318 return INVALID_SKEW_BUCKET_NO;
01319 }
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329 static void
01330 ExecHashSkewTableInsert(HashJoinTable hashtable,
01331 TupleTableSlot *slot,
01332 uint32 hashvalue,
01333 int bucketNumber)
01334 {
01335 MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
01336 HashJoinTuple hashTuple;
01337 int hashTupleSize;
01338
01339
01340 hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
01341 hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
01342 hashTupleSize);
01343 hashTuple->hashvalue = hashvalue;
01344 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
01345 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
01346
01347
01348 hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
01349 hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
01350
01351
01352 hashtable->spaceUsed += hashTupleSize;
01353 hashtable->spaceUsedSkew += hashTupleSize;
01354 if (hashtable->spaceUsed > hashtable->spacePeak)
01355 hashtable->spacePeak = hashtable->spaceUsed;
01356 while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
01357 ExecHashRemoveNextSkewBucket(hashtable);
01358
01359
01360 if (hashtable->spaceUsed > hashtable->spaceAllowed)
01361 ExecHashIncreaseNumBatches(hashtable);
01362 }
01363
01364
01365
01366
01367
01368
01369
01370 static void
01371 ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
01372 {
01373 int bucketToRemove;
01374 HashSkewBucket *bucket;
01375 uint32 hashvalue;
01376 int bucketno;
01377 int batchno;
01378 HashJoinTuple hashTuple;
01379
01380
01381 bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
01382 bucket = hashtable->skewBucket[bucketToRemove];
01383
01384
01385
01386
01387
01388
01389
01390 hashvalue = bucket->hashvalue;
01391 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
01392
01393
01394 hashTuple = bucket->tuples;
01395 while (hashTuple != NULL)
01396 {
01397 HashJoinTuple nextHashTuple = hashTuple->next;
01398 MinimalTuple tuple;
01399 Size tupleSize;
01400
01401
01402
01403
01404
01405
01406 tuple = HJTUPLE_MINTUPLE(hashTuple);
01407 tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
01408
01409
01410 if (batchno == hashtable->curbatch)
01411 {
01412
01413 hashTuple->next = hashtable->buckets[bucketno];
01414 hashtable->buckets[bucketno] = hashTuple;
01415
01416 hashtable->spaceUsedSkew -= tupleSize;
01417 }
01418 else
01419 {
01420
01421 Assert(batchno > hashtable->curbatch);
01422 ExecHashJoinSaveTuple(tuple, hashvalue,
01423 &hashtable->innerBatchFile[batchno]);
01424 pfree(hashTuple);
01425 hashtable->spaceUsed -= tupleSize;
01426 hashtable->spaceUsedSkew -= tupleSize;
01427 }
01428
01429 hashTuple = nextHashTuple;
01430 }
01431
01432
01433
01434
01435
01436
01437
01438
01439
01440
01441
01442
01443
01444 hashtable->skewBucket[bucketToRemove] = NULL;
01445 hashtable->nSkewBuckets--;
01446 pfree(bucket);
01447 hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
01448 hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
01449
01450
01451
01452
01453
01454 if (hashtable->nSkewBuckets == 0)
01455 {
01456 hashtable->skewEnabled = false;
01457 pfree(hashtable->skewBucket);
01458 pfree(hashtable->skewBucketNums);
01459 hashtable->skewBucket = NULL;
01460 hashtable->skewBucketNums = NULL;
01461 hashtable->spaceUsed -= hashtable->spaceUsedSkew;
01462 hashtable->spaceUsedSkew = 0;
01463 }
01464 }