00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099 #include "postgres.h"
00100
00101 #include <limits.h>
00102
00103 #include "access/htup_details.h"
00104 #include "access/nbtree.h"
00105 #include "catalog/index.h"
00106 #include "commands/tablespace.h"
00107 #include "executor/executor.h"
00108 #include "miscadmin.h"
00109 #include "pg_trace.h"
00110 #include "utils/datum.h"
00111 #include "utils/logtape.h"
00112 #include "utils/lsyscache.h"
00113 #include "utils/memutils.h"
00114 #include "utils/pg_rusage.h"
00115 #include "utils/rel.h"
00116 #include "utils/sortsupport.h"
00117 #include "utils/tuplesort.h"
00118
00119
00120
00121 #define HEAP_SORT 0
00122 #define INDEX_SORT 1
00123 #define DATUM_SORT 2
00124 #define CLUSTER_SORT 3
00125
00126
00127 #ifdef TRACE_SORT
00128 bool trace_sort = false;
00129 #endif
00130
00131 #ifdef DEBUG_BOUNDED_SORT
00132 bool optimize_bounded_sort = true;
00133 #endif
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161 typedef struct
00162 {
00163 void *tuple;
00164 Datum datum1;
00165 bool isnull1;
00166 int tupindex;
00167 } SortTuple;
00168
00169
00170
00171
00172
00173
00174 typedef enum
00175 {
00176 TSS_INITIAL,
00177 TSS_BOUNDED,
00178 TSS_BUILDRUNS,
00179 TSS_SORTEDINMEM,
00180 TSS_SORTEDONTAPE,
00181 TSS_FINALMERGE
00182 } TupSortStatus;
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195 #define MINORDER 6
00196 #define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
00197 #define MERGE_BUFFER_SIZE (BLCKSZ * 32)
00198
00199 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
00200 Tuplesortstate *state);
00201
00202
00203
00204
00205 struct Tuplesortstate
00206 {
00207 TupSortStatus status;
00208 int nKeys;
00209 bool randomAccess;
00210 bool bounded;
00211
00212 bool boundUsed;
00213 int bound;
00214 long availMem;
00215 long allowedMem;
00216 int maxTapes;
00217 int tapeRange;
00218 MemoryContext sortcontext;
00219 LogicalTapeSet *tapeset;
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230 SortTupleComparator comparetup;
00231
00232
00233
00234
00235
00236
00237
00238 void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
00239
00240
00241
00242
00243
00244
00245
00246
00247 void (*writetup) (Tuplesortstate *state, int tapenum,
00248 SortTuple *stup);
00249
00250
00251
00252
00253
00254
00255
00256 void (*readtup) (Tuplesortstate *state, SortTuple *stup,
00257 int tapenum, unsigned int len);
00258
00259
00260
00261
00262
00263
00264 void (*reversedirection) (Tuplesortstate *state);
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276 SortTuple *memtuples;
00277 int memtupcount;
00278 int memtupsize;
00279 bool growmemtuples;
00280
00281
00282
00283
00284
00285 int currentRun;
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307 bool *mergeactive;
00308 int *mergenext;
00309 int *mergelast;
00310 int *mergeavailslots;
00311 long *mergeavailmem;
00312 int mergefreelist;
00313 int mergefirstfree;
00314
00315
00316
00317
00318
00319
00320 int Level;
00321 int destTape;
00322 int *tp_fib;
00323 int *tp_runs;
00324 int *tp_dummy;
00325 int *tp_tapenum;
00326 int activeTapes;
00327
00328
00329
00330
00331
00332
00333 int result_tape;
00334 int current;
00335 bool eof_reached;
00336
00337
00338 long markpos_block;
00339 int markpos_offset;
00340 bool markpos_eof;
00341
00342
00343
00344
00345
00346 TupleDesc tupDesc;
00347 SortSupport sortKeys;
00348
00349
00350
00351
00352
00353 SortSupport onlyKey;
00354
00355
00356
00357
00358
00359
00360 IndexInfo *indexInfo;
00361 EState *estate;
00362
00363
00364
00365
00366
00367 Relation heapRel;
00368 Relation indexRel;
00369
00370
00371 ScanKey indexScanKey;
00372 bool enforceUnique;
00373
00374
00375 uint32 hash_mask;
00376
00377
00378
00379
00380
00381 Oid datumType;
00382
00383 int datumTypeLen;
00384 bool datumTypeByVal;
00385
00386
00387
00388
00389 #ifdef TRACE_SORT
00390 PGRUsage ru_start;
00391 #endif
00392 };
00393
00394 #define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
00395 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
00396 #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
00397 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
00398 #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
00399 #define LACKMEM(state) ((state)->availMem < 0)
00400 #define USEMEM(state,amt) ((state)->availMem -= (amt))
00401 #define FREEMEM(state,amt) ((state)->availMem += (amt))
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
00446 do { \
00447 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
00448 elog(ERROR, "unexpected end of data"); \
00449 } while(0)
00450
00451
00452 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
00453 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
00454 static void inittapes(Tuplesortstate *state);
00455 static void selectnewtape(Tuplesortstate *state);
00456 static void mergeruns(Tuplesortstate *state);
00457 static void mergeonerun(Tuplesortstate *state);
00458 static void beginmerge(Tuplesortstate *state);
00459 static void mergepreread(Tuplesortstate *state);
00460 static void mergeprereadone(Tuplesortstate *state, int srcTape);
00461 static void dumptuples(Tuplesortstate *state, bool alltuples);
00462 static void make_bounded_heap(Tuplesortstate *state);
00463 static void sort_bounded_heap(Tuplesortstate *state);
00464 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
00465 int tupleindex, bool checkIndex);
00466 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
00467 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
00468 static void markrunend(Tuplesortstate *state, int tapenum);
00469 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
00470 Tuplesortstate *state);
00471 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
00472 static void writetup_heap(Tuplesortstate *state, int tapenum,
00473 SortTuple *stup);
00474 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
00475 int tapenum, unsigned int len);
00476 static void reversedirection_heap(Tuplesortstate *state);
00477 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
00478 Tuplesortstate *state);
00479 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
00480 static void writetup_cluster(Tuplesortstate *state, int tapenum,
00481 SortTuple *stup);
00482 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
00483 int tapenum, unsigned int len);
00484 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
00485 Tuplesortstate *state);
00486 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
00487 Tuplesortstate *state);
00488 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
00489 static void writetup_index(Tuplesortstate *state, int tapenum,
00490 SortTuple *stup);
00491 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
00492 int tapenum, unsigned int len);
00493 static void reversedirection_index_btree(Tuplesortstate *state);
00494 static void reversedirection_index_hash(Tuplesortstate *state);
00495 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
00496 Tuplesortstate *state);
00497 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
00498 static void writetup_datum(Tuplesortstate *state, int tapenum,
00499 SortTuple *stup);
00500 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
00501 int tapenum, unsigned int len);
00502 static void reversedirection_datum(Tuplesortstate *state);
00503 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
00504
00505
00506
00507
00508
00509
00510
00511
00512 #include "qsort_tuple.c"
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534 static Tuplesortstate *
00535 tuplesort_begin_common(int workMem, bool randomAccess)
00536 {
00537 Tuplesortstate *state;
00538 MemoryContext sortcontext;
00539 MemoryContext oldcontext;
00540
00541
00542
00543
00544
00545 sortcontext = AllocSetContextCreate(CurrentMemoryContext,
00546 "TupleSort",
00547 ALLOCSET_DEFAULT_MINSIZE,
00548 ALLOCSET_DEFAULT_INITSIZE,
00549 ALLOCSET_DEFAULT_MAXSIZE);
00550
00551
00552
00553
00554
00555 oldcontext = MemoryContextSwitchTo(sortcontext);
00556
00557 state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
00558
00559 #ifdef TRACE_SORT
00560 if (trace_sort)
00561 pg_rusage_init(&state->ru_start);
00562 #endif
00563
00564 state->status = TSS_INITIAL;
00565 state->randomAccess = randomAccess;
00566 state->bounded = false;
00567 state->boundUsed = false;
00568 state->allowedMem = workMem * 1024L;
00569 state->availMem = state->allowedMem;
00570 state->sortcontext = sortcontext;
00571 state->tapeset = NULL;
00572
00573 state->memtupcount = 0;
00574 state->memtupsize = 1024;
00575 state->growmemtuples = true;
00576 state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
00577
00578 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
00579
00580
00581 if (LACKMEM(state))
00582 elog(ERROR, "insufficient memory allowed for sort");
00583
00584 state->currentRun = 0;
00585
00586
00587
00588
00589
00590
00591 state->result_tape = -1;
00592
00593 MemoryContextSwitchTo(oldcontext);
00594
00595 return state;
00596 }
00597
00598 Tuplesortstate *
00599 tuplesort_begin_heap(TupleDesc tupDesc,
00600 int nkeys, AttrNumber *attNums,
00601 Oid *sortOperators, Oid *sortCollations,
00602 bool *nullsFirstFlags,
00603 int workMem, bool randomAccess)
00604 {
00605 Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
00606 MemoryContext oldcontext;
00607 int i;
00608
00609 oldcontext = MemoryContextSwitchTo(state->sortcontext);
00610
00611 AssertArg(nkeys > 0);
00612
00613 #ifdef TRACE_SORT
00614 if (trace_sort)
00615 elog(LOG,
00616 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
00617 nkeys, workMem, randomAccess ? 't' : 'f');
00618 #endif
00619
00620 state->nKeys = nkeys;
00621
00622 TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
00623 false,
00624 nkeys,
00625 workMem,
00626 randomAccess);
00627
00628 state->comparetup = comparetup_heap;
00629 state->copytup = copytup_heap;
00630 state->writetup = writetup_heap;
00631 state->readtup = readtup_heap;
00632 state->reversedirection = reversedirection_heap;
00633
00634 state->tupDesc = tupDesc;
00635
00636
00637 state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
00638
00639 for (i = 0; i < nkeys; i++)
00640 {
00641 SortSupport sortKey = state->sortKeys + i;
00642
00643 AssertArg(attNums[i] != 0);
00644 AssertArg(sortOperators[i] != 0);
00645
00646 sortKey->ssup_cxt = CurrentMemoryContext;
00647 sortKey->ssup_collation = sortCollations[i];
00648 sortKey->ssup_nulls_first = nullsFirstFlags[i];
00649 sortKey->ssup_attno = attNums[i];
00650
00651 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
00652 }
00653
00654 if (nkeys == 1)
00655 state->onlyKey = state->sortKeys;
00656
00657 MemoryContextSwitchTo(oldcontext);
00658
00659 return state;
00660 }
00661
00662 Tuplesortstate *
00663 tuplesort_begin_cluster(TupleDesc tupDesc,
00664 Relation indexRel,
00665 int workMem, bool randomAccess)
00666 {
00667 Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
00668 MemoryContext oldcontext;
00669
00670 Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
00671
00672 oldcontext = MemoryContextSwitchTo(state->sortcontext);
00673
00674 #ifdef TRACE_SORT
00675 if (trace_sort)
00676 elog(LOG,
00677 "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
00678 RelationGetNumberOfAttributes(indexRel),
00679 workMem, randomAccess ? 't' : 'f');
00680 #endif
00681
00682 state->nKeys = RelationGetNumberOfAttributes(indexRel);
00683
00684 TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
00685 false,
00686 state->nKeys,
00687 workMem,
00688 randomAccess);
00689
00690 state->comparetup = comparetup_cluster;
00691 state->copytup = copytup_cluster;
00692 state->writetup = writetup_cluster;
00693 state->readtup = readtup_cluster;
00694 state->reversedirection = reversedirection_index_btree;
00695
00696 state->indexInfo = BuildIndexInfo(indexRel);
00697 state->indexScanKey = _bt_mkscankey_nodata(indexRel);
00698
00699 state->tupDesc = tupDesc;
00700
00701 if (state->indexInfo->ii_Expressions != NULL)
00702 {
00703 TupleTableSlot *slot;
00704 ExprContext *econtext;
00705
00706
00707
00708
00709
00710
00711
00712 state->estate = CreateExecutorState();
00713 slot = MakeSingleTupleTableSlot(tupDesc);
00714 econtext = GetPerTupleExprContext(state->estate);
00715 econtext->ecxt_scantuple = slot;
00716 }
00717
00718 MemoryContextSwitchTo(oldcontext);
00719
00720 return state;
00721 }
00722
00723 Tuplesortstate *
00724 tuplesort_begin_index_btree(Relation heapRel,
00725 Relation indexRel,
00726 bool enforceUnique,
00727 int workMem, bool randomAccess)
00728 {
00729 Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
00730 MemoryContext oldcontext;
00731
00732 oldcontext = MemoryContextSwitchTo(state->sortcontext);
00733
00734 #ifdef TRACE_SORT
00735 if (trace_sort)
00736 elog(LOG,
00737 "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
00738 enforceUnique ? 't' : 'f',
00739 workMem, randomAccess ? 't' : 'f');
00740 #endif
00741
00742 state->nKeys = RelationGetNumberOfAttributes(indexRel);
00743
00744 TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
00745 enforceUnique,
00746 state->nKeys,
00747 workMem,
00748 randomAccess);
00749
00750 state->comparetup = comparetup_index_btree;
00751 state->copytup = copytup_index;
00752 state->writetup = writetup_index;
00753 state->readtup = readtup_index;
00754 state->reversedirection = reversedirection_index_btree;
00755
00756 state->heapRel = heapRel;
00757 state->indexRel = indexRel;
00758 state->indexScanKey = _bt_mkscankey_nodata(indexRel);
00759 state->enforceUnique = enforceUnique;
00760
00761 MemoryContextSwitchTo(oldcontext);
00762
00763 return state;
00764 }
00765
00766 Tuplesortstate *
00767 tuplesort_begin_index_hash(Relation heapRel,
00768 Relation indexRel,
00769 uint32 hash_mask,
00770 int workMem, bool randomAccess)
00771 {
00772 Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
00773 MemoryContext oldcontext;
00774
00775 oldcontext = MemoryContextSwitchTo(state->sortcontext);
00776
00777 #ifdef TRACE_SORT
00778 if (trace_sort)
00779 elog(LOG,
00780 "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
00781 hash_mask,
00782 workMem, randomAccess ? 't' : 'f');
00783 #endif
00784
00785 state->nKeys = 1;
00786
00787 state->comparetup = comparetup_index_hash;
00788 state->copytup = copytup_index;
00789 state->writetup = writetup_index;
00790 state->readtup = readtup_index;
00791 state->reversedirection = reversedirection_index_hash;
00792
00793 state->heapRel = heapRel;
00794 state->indexRel = indexRel;
00795
00796 state->hash_mask = hash_mask;
00797
00798 MemoryContextSwitchTo(oldcontext);
00799
00800 return state;
00801 }
00802
00803 Tuplesortstate *
00804 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
00805 bool nullsFirstFlag,
00806 int workMem, bool randomAccess)
00807 {
00808 Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
00809 MemoryContext oldcontext;
00810 int16 typlen;
00811 bool typbyval;
00812
00813 oldcontext = MemoryContextSwitchTo(state->sortcontext);
00814
00815 #ifdef TRACE_SORT
00816 if (trace_sort)
00817 elog(LOG,
00818 "begin datum sort: workMem = %d, randomAccess = %c",
00819 workMem, randomAccess ? 't' : 'f');
00820 #endif
00821
00822 state->nKeys = 1;
00823
00824 TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
00825 false,
00826 1,
00827 workMem,
00828 randomAccess);
00829
00830 state->comparetup = comparetup_datum;
00831 state->copytup = copytup_datum;
00832 state->writetup = writetup_datum;
00833 state->readtup = readtup_datum;
00834 state->reversedirection = reversedirection_datum;
00835
00836 state->datumType = datumType;
00837
00838
00839 state->onlyKey = (SortSupport) palloc0(sizeof(SortSupportData));
00840
00841 state->onlyKey->ssup_cxt = CurrentMemoryContext;
00842 state->onlyKey->ssup_collation = sortCollation;
00843 state->onlyKey->ssup_nulls_first = nullsFirstFlag;
00844
00845 PrepareSortSupportFromOrderingOp(sortOperator, state->onlyKey);
00846
00847
00848 get_typlenbyval(datumType, &typlen, &typbyval);
00849 state->datumTypeLen = typlen;
00850 state->datumTypeByVal = typbyval;
00851
00852 MemoryContextSwitchTo(oldcontext);
00853
00854 return state;
00855 }
00856
00857
00858
00859
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869 void
00870 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
00871 {
00872
00873 Assert(state->status == TSS_INITIAL);
00874 Assert(state->memtupcount == 0);
00875 Assert(!state->bounded);
00876
00877 #ifdef DEBUG_BOUNDED_SORT
00878
00879 if (!optimize_bounded_sort)
00880 return;
00881 #endif
00882
00883
00884 if (bound > (int64) (INT_MAX / 2))
00885 return;
00886
00887 state->bounded = true;
00888 state->bound = (int) bound;
00889 }
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900 void
00901 tuplesort_end(Tuplesortstate *state)
00902 {
00903
00904 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
00905
00906 #ifdef TRACE_SORT
00907 long spaceUsed;
00908
00909 if (state->tapeset)
00910 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
00911 else
00912 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
00913 #endif
00914
00915
00916
00917
00918
00919
00920
00921 if (state->tapeset)
00922 LogicalTapeSetClose(state->tapeset);
00923
00924 #ifdef TRACE_SORT
00925 if (trace_sort)
00926 {
00927 if (state->tapeset)
00928 elog(LOG, "external sort ended, %ld disk blocks used: %s",
00929 spaceUsed, pg_rusage_show(&state->ru_start));
00930 else
00931 elog(LOG, "internal sort ended, %ld KB used: %s",
00932 spaceUsed, pg_rusage_show(&state->ru_start));
00933 }
00934
00935 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
00936 #else
00937
00938
00939
00940
00941
00942 TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
00943 #endif
00944
00945
00946 if (state->estate != NULL)
00947 {
00948 ExprContext *econtext = GetPerTupleExprContext(state->estate);
00949
00950 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
00951 FreeExecutorState(state->estate);
00952 }
00953
00954 MemoryContextSwitchTo(oldcontext);
00955
00956
00957
00958
00959
00960 MemoryContextDelete(state->sortcontext);
00961 }
00962
00963
00964
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974
00975
00976
00977 static bool
00978 grow_memtuples(Tuplesortstate *state)
00979 {
00980 int newmemtupsize;
00981 int memtupsize = state->memtupsize;
00982 long memNowUsed = state->allowedMem - state->availMem;
00983
00984
00985 if (!state->growmemtuples)
00986 return false;
00987
00988
00989 if (memNowUsed <= state->availMem)
00990 {
00991
00992
00993
00994
00995
00996
00997
00998
00999 newmemtupsize = memtupsize * 2;
01000 }
01001 else
01002 {
01003
01004
01005
01006
01007
01008
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029 double grow_ratio;
01030
01031 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
01032 newmemtupsize = (int) (memtupsize * grow_ratio);
01033
01034
01035 state->growmemtuples = false;
01036 }
01037
01038
01039 if (newmemtupsize <= memtupsize)
01040 goto noalloc;
01041
01042
01043
01044
01045
01046 if ((Size) newmemtupsize >= MaxAllocSize / sizeof(SortTuple))
01047 {
01048 newmemtupsize = (int) (MaxAllocSize / sizeof(SortTuple));
01049 state->growmemtuples = false;
01050 }
01051
01052
01053
01054
01055
01056
01057
01058
01059
01060
01061
01062
01063 if (state->availMem < (long) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
01064 goto noalloc;
01065
01066
01067 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
01068 state->memtupsize = newmemtupsize;
01069 state->memtuples = (SortTuple *)
01070 repalloc(state->memtuples,
01071 state->memtupsize * sizeof(SortTuple));
01072 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
01073 if (LACKMEM(state))
01074 elog(ERROR, "unexpected out-of-memory situation during sort");
01075 return true;
01076
01077 noalloc:
01078
01079 state->growmemtuples = false;
01080 return false;
01081 }
01082
01083
01084
01085
01086
01087
01088 void
01089 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
01090 {
01091 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01092 SortTuple stup;
01093
01094
01095
01096
01097
01098 COPYTUP(state, &stup, (void *) slot);
01099
01100 puttuple_common(state, &stup);
01101
01102 MemoryContextSwitchTo(oldcontext);
01103 }
01104
01105
01106
01107
01108
01109
01110 void
01111 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
01112 {
01113 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01114 SortTuple stup;
01115
01116
01117
01118
01119
01120 COPYTUP(state, &stup, (void *) tup);
01121
01122 puttuple_common(state, &stup);
01123
01124 MemoryContextSwitchTo(oldcontext);
01125 }
01126
01127
01128
01129
01130
01131
01132 void
01133 tuplesort_putindextuple(Tuplesortstate *state, IndexTuple tuple)
01134 {
01135 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01136 SortTuple stup;
01137
01138
01139
01140
01141
01142 COPYTUP(state, &stup, (void *) tuple);
01143
01144 puttuple_common(state, &stup);
01145
01146 MemoryContextSwitchTo(oldcontext);
01147 }
01148
01149
01150
01151
01152
01153
01154 void
01155 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
01156 {
01157 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01158 SortTuple stup;
01159
01160
01161
01162
01163
01164 if (isNull || state->datumTypeByVal)
01165 {
01166 stup.datum1 = val;
01167 stup.isnull1 = isNull;
01168 stup.tuple = NULL;
01169 }
01170 else
01171 {
01172 stup.datum1 = datumCopy(val, false, state->datumTypeLen);
01173 stup.isnull1 = false;
01174 stup.tuple = DatumGetPointer(stup.datum1);
01175 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
01176 }
01177
01178 puttuple_common(state, &stup);
01179
01180 MemoryContextSwitchTo(oldcontext);
01181 }
01182
01183
01184
01185
01186 static void
01187 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
01188 {
01189 switch (state->status)
01190 {
01191 case TSS_INITIAL:
01192
01193
01194
01195
01196
01197
01198
01199
01200 if (state->memtupcount >= state->memtupsize - 1)
01201 {
01202 (void) grow_memtuples(state);
01203 Assert(state->memtupcount < state->memtupsize);
01204 }
01205 state->memtuples[state->memtupcount++] = *tuple;
01206
01207
01208
01209
01210
01211
01212
01213
01214
01215
01216
01217
01218
01219 if (state->bounded &&
01220 (state->memtupcount > state->bound * 2 ||
01221 (state->memtupcount > state->bound && LACKMEM(state))))
01222 {
01223 #ifdef TRACE_SORT
01224 if (trace_sort)
01225 elog(LOG, "switching to bounded heapsort at %d tuples: %s",
01226 state->memtupcount,
01227 pg_rusage_show(&state->ru_start));
01228 #endif
01229 make_bounded_heap(state);
01230 return;
01231 }
01232
01233
01234
01235
01236 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
01237 return;
01238
01239
01240
01241
01242 inittapes(state);
01243
01244
01245
01246
01247 dumptuples(state, false);
01248 break;
01249
01250 case TSS_BOUNDED:
01251
01252
01253
01254
01255
01256
01257
01258
01259
01260 if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
01261 {
01262
01263 free_sort_tuple(state, tuple);
01264 CHECK_FOR_INTERRUPTS();
01265 }
01266 else
01267 {
01268
01269 free_sort_tuple(state, &state->memtuples[0]);
01270 tuplesort_heap_siftup(state, false);
01271 tuplesort_heap_insert(state, tuple, 0, false);
01272 }
01273 break;
01274
01275 case TSS_BUILDRUNS:
01276
01277
01278
01279
01280
01281
01282
01283
01284
01285
01286
01287
01288
01289
01290 Assert(state->memtupcount > 0);
01291 if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
01292 tuplesort_heap_insert(state, tuple, state->currentRun, true);
01293 else
01294 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
01295
01296
01297
01298
01299 dumptuples(state, false);
01300 break;
01301
01302 default:
01303 elog(ERROR, "invalid tuplesort state");
01304 break;
01305 }
01306 }
01307
01308
01309
01310
01311 void
01312 tuplesort_performsort(Tuplesortstate *state)
01313 {
01314 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01315
01316 #ifdef TRACE_SORT
01317 if (trace_sort)
01318 elog(LOG, "performsort starting: %s",
01319 pg_rusage_show(&state->ru_start));
01320 #endif
01321
01322 switch (state->status)
01323 {
01324 case TSS_INITIAL:
01325
01326
01327
01328
01329
01330 if (state->memtupcount > 1)
01331 {
01332
01333 if (state->onlyKey != NULL)
01334 qsort_ssup(state->memtuples, state->memtupcount,
01335 state->onlyKey);
01336 else
01337 qsort_tuple(state->memtuples,
01338 state->memtupcount,
01339 state->comparetup,
01340 state);
01341 }
01342 state->current = 0;
01343 state->eof_reached = false;
01344 state->markpos_offset = 0;
01345 state->markpos_eof = false;
01346 state->status = TSS_SORTEDINMEM;
01347 break;
01348
01349 case TSS_BOUNDED:
01350
01351
01352
01353
01354
01355
01356 sort_bounded_heap(state);
01357 state->current = 0;
01358 state->eof_reached = false;
01359 state->markpos_offset = 0;
01360 state->markpos_eof = false;
01361 state->status = TSS_SORTEDINMEM;
01362 break;
01363
01364 case TSS_BUILDRUNS:
01365
01366
01367
01368
01369
01370
01371
01372 dumptuples(state, true);
01373 mergeruns(state);
01374 state->eof_reached = false;
01375 state->markpos_block = 0L;
01376 state->markpos_offset = 0;
01377 state->markpos_eof = false;
01378 break;
01379
01380 default:
01381 elog(ERROR, "invalid tuplesort state");
01382 break;
01383 }
01384
01385 #ifdef TRACE_SORT
01386 if (trace_sort)
01387 {
01388 if (state->status == TSS_FINALMERGE)
01389 elog(LOG, "performsort done (except %d-way final merge): %s",
01390 state->activeTapes,
01391 pg_rusage_show(&state->ru_start));
01392 else
01393 elog(LOG, "performsort done: %s",
01394 pg_rusage_show(&state->ru_start));
01395 }
01396 #endif
01397
01398 MemoryContextSwitchTo(oldcontext);
01399 }
01400
01401
01402
01403
01404
01405
01406 static bool
01407 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
01408 SortTuple *stup, bool *should_free)
01409 {
01410 unsigned int tuplen;
01411
01412 switch (state->status)
01413 {
01414 case TSS_SORTEDINMEM:
01415 Assert(forward || state->randomAccess);
01416 *should_free = false;
01417 if (forward)
01418 {
01419 if (state->current < state->memtupcount)
01420 {
01421 *stup = state->memtuples[state->current++];
01422 return true;
01423 }
01424 state->eof_reached = true;
01425
01426
01427
01428
01429
01430
01431 if (state->bounded && state->current >= state->bound)
01432 elog(ERROR, "retrieved too many tuples in a bounded sort");
01433
01434 return false;
01435 }
01436 else
01437 {
01438 if (state->current <= 0)
01439 return false;
01440
01441
01442
01443
01444
01445 if (state->eof_reached)
01446 state->eof_reached = false;
01447 else
01448 {
01449 state->current--;
01450 if (state->current <= 0)
01451 return false;
01452 }
01453 *stup = state->memtuples[state->current - 1];
01454 return true;
01455 }
01456 break;
01457
01458 case TSS_SORTEDONTAPE:
01459 Assert(forward || state->randomAccess);
01460 *should_free = true;
01461 if (forward)
01462 {
01463 if (state->eof_reached)
01464 return false;
01465 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
01466 {
01467 READTUP(state, stup, state->result_tape, tuplen);
01468 return true;
01469 }
01470 else
01471 {
01472 state->eof_reached = true;
01473 return false;
01474 }
01475 }
01476
01477
01478
01479
01480
01481
01482
01483 if (state->eof_reached)
01484 {
01485
01486
01487
01488
01489
01490 if (!LogicalTapeBackspace(state->tapeset,
01491 state->result_tape,
01492 2 * sizeof(unsigned int)))
01493 return false;
01494 state->eof_reached = false;
01495 }
01496 else
01497 {
01498
01499
01500
01501
01502 if (!LogicalTapeBackspace(state->tapeset,
01503 state->result_tape,
01504 sizeof(unsigned int)))
01505 return false;
01506 tuplen = getlen(state, state->result_tape, false);
01507
01508
01509
01510
01511 if (!LogicalTapeBackspace(state->tapeset,
01512 state->result_tape,
01513 tuplen + 2 * sizeof(unsigned int)))
01514 {
01515
01516
01517
01518
01519
01520
01521 if (!LogicalTapeBackspace(state->tapeset,
01522 state->result_tape,
01523 tuplen + sizeof(unsigned int)))
01524 elog(ERROR, "bogus tuple length in backward scan");
01525 return false;
01526 }
01527 }
01528
01529 tuplen = getlen(state, state->result_tape, false);
01530
01531
01532
01533
01534
01535
01536 if (!LogicalTapeBackspace(state->tapeset,
01537 state->result_tape,
01538 tuplen))
01539 elog(ERROR, "bogus tuple length in backward scan");
01540 READTUP(state, stup, state->result_tape, tuplen);
01541 return true;
01542
01543 case TSS_FINALMERGE:
01544 Assert(forward);
01545 *should_free = true;
01546
01547
01548
01549
01550 if (state->memtupcount > 0)
01551 {
01552 int srcTape = state->memtuples[0].tupindex;
01553 Size tuplen;
01554 int tupIndex;
01555 SortTuple *newtup;
01556
01557 *stup = state->memtuples[0];
01558
01559 if (stup->tuple)
01560 {
01561 tuplen = GetMemoryChunkSpace(stup->tuple);
01562 state->availMem += tuplen;
01563 state->mergeavailmem[srcTape] += tuplen;
01564 }
01565 tuplesort_heap_siftup(state, false);
01566 if ((tupIndex = state->mergenext[srcTape]) == 0)
01567 {
01568
01569
01570
01571
01572
01573
01574 mergeprereadone(state, srcTape);
01575
01576
01577
01578
01579 if ((tupIndex = state->mergenext[srcTape]) == 0)
01580 return true;
01581 }
01582
01583 newtup = &state->memtuples[tupIndex];
01584 state->mergenext[srcTape] = newtup->tupindex;
01585 if (state->mergenext[srcTape] == 0)
01586 state->mergelast[srcTape] = 0;
01587 tuplesort_heap_insert(state, newtup, srcTape, false);
01588
01589 newtup->tupindex = state->mergefreelist;
01590 state->mergefreelist = tupIndex;
01591 state->mergeavailslots[srcTape]++;
01592 return true;
01593 }
01594 return false;
01595
01596 default:
01597 elog(ERROR, "invalid tuplesort state");
01598 return false;
01599 }
01600 }
01601
01602
01603
01604
01605
01606
01607 bool
01608 tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
01609 TupleTableSlot *slot)
01610 {
01611 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01612 SortTuple stup;
01613 bool should_free;
01614
01615 if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
01616 stup.tuple = NULL;
01617
01618 MemoryContextSwitchTo(oldcontext);
01619
01620 if (stup.tuple)
01621 {
01622 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, should_free);
01623 return true;
01624 }
01625 else
01626 {
01627 ExecClearTuple(slot);
01628 return false;
01629 }
01630 }
01631
01632
01633
01634
01635
01636
01637 HeapTuple
01638 tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
01639 {
01640 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01641 SortTuple stup;
01642
01643 if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
01644 stup.tuple = NULL;
01645
01646 MemoryContextSwitchTo(oldcontext);
01647
01648 return stup.tuple;
01649 }
01650
01651
01652
01653
01654
01655
01656 IndexTuple
01657 tuplesort_getindextuple(Tuplesortstate *state, bool forward,
01658 bool *should_free)
01659 {
01660 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01661 SortTuple stup;
01662
01663 if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
01664 stup.tuple = NULL;
01665
01666 MemoryContextSwitchTo(oldcontext);
01667
01668 return (IndexTuple) stup.tuple;
01669 }
01670
01671
01672
01673
01674
01675
01676
01677
01678 bool
01679 tuplesort_getdatum(Tuplesortstate *state, bool forward,
01680 Datum *val, bool *isNull)
01681 {
01682 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
01683 SortTuple stup;
01684 bool should_free;
01685
01686 if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
01687 {
01688 MemoryContextSwitchTo(oldcontext);
01689 return false;
01690 }
01691
01692 if (stup.isnull1 || state->datumTypeByVal)
01693 {
01694 *val = stup.datum1;
01695 *isNull = stup.isnull1;
01696 }
01697 else
01698 {
01699 if (should_free)
01700 *val = stup.datum1;
01701 else
01702 *val = datumCopy(stup.datum1, false, state->datumTypeLen);
01703 *isNull = false;
01704 }
01705
01706 MemoryContextSwitchTo(oldcontext);
01707
01708 return true;
01709 }
01710
01711
01712
01713
01714
01715
01716
01717 int
01718 tuplesort_merge_order(long allowedMem)
01719 {
01720 int mOrder;
01721
01722
01723
01724
01725
01726
01727
01728
01729
01730
01731
01732 mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
01733 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
01734
01735
01736 mOrder = Max(mOrder, MINORDER);
01737
01738 return mOrder;
01739 }
01740
01741
01742
01743
01744
01745
01746 static void
01747 inittapes(Tuplesortstate *state)
01748 {
01749 int maxTapes,
01750 ntuples,
01751 j;
01752 long tapeSpace;
01753
01754
01755 maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
01756
01757
01758
01759
01760
01761
01762 maxTapes = Min(maxTapes, state->memtupsize / 2);
01763
01764 state->maxTapes = maxTapes;
01765 state->tapeRange = maxTapes - 1;
01766
01767 #ifdef TRACE_SORT
01768 if (trace_sort)
01769 elog(LOG, "switching to external sort with %d tapes: %s",
01770 maxTapes, pg_rusage_show(&state->ru_start));
01771 #endif
01772
01773
01774
01775
01776
01777
01778
01779
01780
01781
01782 tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD;
01783 if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
01784 USEMEM(state, tapeSpace);
01785
01786
01787
01788
01789
01790 PrepareTempTablespaces();
01791
01792
01793
01794
01795 state->tapeset = LogicalTapeSetCreate(maxTapes);
01796
01797 state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
01798 state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
01799 state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
01800 state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
01801 state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
01802 state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
01803 state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
01804 state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
01805 state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
01806
01807
01808
01809
01810
01811
01812
01813
01814
01815 ntuples = state->memtupcount;
01816 state->memtupcount = 0;
01817 for (j = 0; j < ntuples; j++)
01818 {
01819
01820 SortTuple stup = state->memtuples[j];
01821
01822 tuplesort_heap_insert(state, &stup, 0, false);
01823 }
01824 Assert(state->memtupcount == ntuples);
01825
01826 state->currentRun = 0;
01827
01828
01829
01830
01831 for (j = 0; j < maxTapes; j++)
01832 {
01833 state->tp_fib[j] = 1;
01834 state->tp_runs[j] = 0;
01835 state->tp_dummy[j] = 1;
01836 state->tp_tapenum[j] = j;
01837 }
01838 state->tp_fib[state->tapeRange] = 0;
01839 state->tp_dummy[state->tapeRange] = 0;
01840
01841 state->Level = 1;
01842 state->destTape = 0;
01843
01844 state->status = TSS_BUILDRUNS;
01845 }
01846
01847
01848
01849
01850
01851
01852
01853 static void
01854 selectnewtape(Tuplesortstate *state)
01855 {
01856 int j;
01857 int a;
01858
01859
01860 if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
01861 {
01862 state->destTape++;
01863 return;
01864 }
01865 if (state->tp_dummy[state->destTape] != 0)
01866 {
01867 state->destTape = 0;
01868 return;
01869 }
01870
01871
01872 state->Level++;
01873 a = state->tp_fib[0];
01874 for (j = 0; j < state->tapeRange; j++)
01875 {
01876 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
01877 state->tp_fib[j] = a + state->tp_fib[j + 1];
01878 }
01879 state->destTape = 0;
01880 }
01881
01882
01883
01884
01885
01886
01887
01888 static void
01889 mergeruns(Tuplesortstate *state)
01890 {
01891 int tapenum,
01892 svTape,
01893 svRuns,
01894 svDummy;
01895
01896 Assert(state->status == TSS_BUILDRUNS);
01897 Assert(state->memtupcount == 0);
01898
01899
01900
01901
01902
01903
01904
01905 if (state->currentRun == 1)
01906 {
01907 state->result_tape = state->tp_tapenum[state->destTape];
01908
01909 LogicalTapeFreeze(state->tapeset, state->result_tape);
01910 state->status = TSS_SORTEDONTAPE;
01911 return;
01912 }
01913
01914
01915 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
01916 LogicalTapeRewind(state->tapeset, tapenum, false);
01917
01918 for (;;)
01919 {
01920
01921
01922
01923
01924
01925
01926 if (!state->randomAccess)
01927 {
01928 bool allOneRun = true;
01929
01930 Assert(state->tp_runs[state->tapeRange] == 0);
01931 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
01932 {
01933 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
01934 {
01935 allOneRun = false;
01936 break;
01937 }
01938 }
01939 if (allOneRun)
01940 {
01941
01942 LogicalTapeSetForgetFreeSpace(state->tapeset);
01943
01944 beginmerge(state);
01945 state->status = TSS_FINALMERGE;
01946 return;
01947 }
01948 }
01949
01950
01951 while (state->tp_runs[state->tapeRange - 1] ||
01952 state->tp_dummy[state->tapeRange - 1])
01953 {
01954 bool allDummy = true;
01955
01956 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
01957 {
01958 if (state->tp_dummy[tapenum] == 0)
01959 {
01960 allDummy = false;
01961 break;
01962 }
01963 }
01964
01965 if (allDummy)
01966 {
01967 state->tp_dummy[state->tapeRange]++;
01968 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
01969 state->tp_dummy[tapenum]--;
01970 }
01971 else
01972 mergeonerun(state);
01973 }
01974
01975
01976 if (--state->Level == 0)
01977 break;
01978
01979 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
01980 false);
01981
01982 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
01983 true);
01984 state->tp_runs[state->tapeRange - 1] = 0;
01985
01986
01987
01988
01989 svTape = state->tp_tapenum[state->tapeRange];
01990 svDummy = state->tp_dummy[state->tapeRange];
01991 svRuns = state->tp_runs[state->tapeRange];
01992 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
01993 {
01994 state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
01995 state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
01996 state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
01997 }
01998 state->tp_tapenum[0] = svTape;
01999 state->tp_dummy[0] = svDummy;
02000 state->tp_runs[0] = svRuns;
02001 }
02002
02003
02004
02005
02006
02007
02008
02009
02010
02011 state->result_tape = state->tp_tapenum[state->tapeRange];
02012 LogicalTapeFreeze(state->tapeset, state->result_tape);
02013 state->status = TSS_SORTEDONTAPE;
02014 }
02015
02016
02017
02018
02019
02020
02021
02022 static void
02023 mergeonerun(Tuplesortstate *state)
02024 {
02025 int destTape = state->tp_tapenum[state->tapeRange];
02026 int srcTape;
02027 int tupIndex;
02028 SortTuple *tup;
02029 long priorAvail,
02030 spaceFreed;
02031
02032
02033
02034
02035
02036 beginmerge(state);
02037
02038
02039
02040
02041
02042
02043 while (state->memtupcount > 0)
02044 {
02045
02046 priorAvail = state->availMem;
02047 srcTape = state->memtuples[0].tupindex;
02048 WRITETUP(state, destTape, &state->memtuples[0]);
02049
02050 spaceFreed = state->availMem - priorAvail;
02051 state->mergeavailmem[srcTape] += spaceFreed;
02052
02053 tuplesort_heap_siftup(state, false);
02054 if ((tupIndex = state->mergenext[srcTape]) == 0)
02055 {
02056
02057 mergepreread(state);
02058
02059 if ((tupIndex = state->mergenext[srcTape]) == 0)
02060 continue;
02061 }
02062
02063 tup = &state->memtuples[tupIndex];
02064 state->mergenext[srcTape] = tup->tupindex;
02065 if (state->mergenext[srcTape] == 0)
02066 state->mergelast[srcTape] = 0;
02067 tuplesort_heap_insert(state, tup, srcTape, false);
02068
02069 tup->tupindex = state->mergefreelist;
02070 state->mergefreelist = tupIndex;
02071 state->mergeavailslots[srcTape]++;
02072 }
02073
02074
02075
02076
02077
02078 markrunend(state, destTape);
02079 state->tp_runs[state->tapeRange]++;
02080
02081 #ifdef TRACE_SORT
02082 if (trace_sort)
02083 elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
02084 pg_rusage_show(&state->ru_start));
02085 #endif
02086 }
02087
02088
02089
02090
02091
02092
02093
02094
02095
02096 static void
02097 beginmerge(Tuplesortstate *state)
02098 {
02099 int activeTapes;
02100 int tapenum;
02101 int srcTape;
02102 int slotsPerTape;
02103 long spacePerTape;
02104
02105
02106 Assert(state->memtupcount == 0);
02107
02108
02109 memset(state->mergeactive, 0,
02110 state->maxTapes * sizeof(*state->mergeactive));
02111 activeTapes = 0;
02112 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
02113 {
02114 if (state->tp_dummy[tapenum] > 0)
02115 state->tp_dummy[tapenum]--;
02116 else
02117 {
02118 Assert(state->tp_runs[tapenum] > 0);
02119 state->tp_runs[tapenum]--;
02120 srcTape = state->tp_tapenum[tapenum];
02121 state->mergeactive[srcTape] = true;
02122 activeTapes++;
02123 }
02124 }
02125 state->activeTapes = activeTapes;
02126
02127
02128 memset(state->mergenext, 0,
02129 state->maxTapes * sizeof(*state->mergenext));
02130 memset(state->mergelast, 0,
02131 state->maxTapes * sizeof(*state->mergelast));
02132 state->mergefreelist = 0;
02133 state->mergefirstfree = activeTapes;
02134
02135
02136
02137
02138
02139 Assert(activeTapes > 0);
02140 slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
02141 Assert(slotsPerTape > 0);
02142 spacePerTape = state->availMem / activeTapes;
02143 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
02144 {
02145 if (state->mergeactive[srcTape])
02146 {
02147 state->mergeavailslots[srcTape] = slotsPerTape;
02148 state->mergeavailmem[srcTape] = spacePerTape;
02149 }
02150 }
02151
02152
02153
02154
02155
02156 mergepreread(state);
02157
02158
02159 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
02160 {
02161 int tupIndex = state->mergenext[srcTape];
02162 SortTuple *tup;
02163
02164 if (tupIndex)
02165 {
02166 tup = &state->memtuples[tupIndex];
02167 state->mergenext[srcTape] = tup->tupindex;
02168 if (state->mergenext[srcTape] == 0)
02169 state->mergelast[srcTape] = 0;
02170 tuplesort_heap_insert(state, tup, srcTape, false);
02171
02172 tup->tupindex = state->mergefreelist;
02173 state->mergefreelist = tupIndex;
02174 state->mergeavailslots[srcTape]++;
02175 }
02176 }
02177 }
02178
02179
02180
02181
02182
02183
02184
02185
02186
02187
02188
02189
02190
02191
02192
02193
02194
02195
02196
02197
02198
02199
02200
02201
02202 static void
02203 mergepreread(Tuplesortstate *state)
02204 {
02205 int srcTape;
02206
02207 for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
02208 mergeprereadone(state, srcTape);
02209 }
02210
02211
02212
02213
02214
02215
02216
02217
02218 static void
02219 mergeprereadone(Tuplesortstate *state, int srcTape)
02220 {
02221 unsigned int tuplen;
02222 SortTuple stup;
02223 int tupIndex;
02224 long priorAvail,
02225 spaceUsed;
02226
02227 if (!state->mergeactive[srcTape])
02228 return;
02229 priorAvail = state->availMem;
02230 state->availMem = state->mergeavailmem[srcTape];
02231 while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
02232 state->mergenext[srcTape] == 0)
02233 {
02234
02235 if ((tuplen = getlen(state, srcTape, true)) == 0)
02236 {
02237 state->mergeactive[srcTape] = false;
02238 break;
02239 }
02240 READTUP(state, &stup, srcTape, tuplen);
02241
02242 tupIndex = state->mergefreelist;
02243 if (tupIndex)
02244 state->mergefreelist = state->memtuples[tupIndex].tupindex;
02245 else
02246 {
02247 tupIndex = state->mergefirstfree++;
02248 Assert(tupIndex < state->memtupsize);
02249 }
02250 state->mergeavailslots[srcTape]--;
02251
02252 stup.tupindex = 0;
02253 state->memtuples[tupIndex] = stup;
02254 if (state->mergelast[srcTape])
02255 state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
02256 else
02257 state->mergenext[srcTape] = tupIndex;
02258 state->mergelast[srcTape] = tupIndex;
02259 }
02260
02261 spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
02262 state->mergeavailmem[srcTape] = state->availMem;
02263 state->availMem = priorAvail - spaceUsed;
02264 }
02265
02266
02267
02268
02269
02270
02271
02272
02273
02274
02275
02276
02277
02278
02279
02280
02281
02282
02283 static void
02284 dumptuples(Tuplesortstate *state, bool alltuples)
02285 {
02286 while (alltuples ||
02287 (LACKMEM(state) && state->memtupcount > 1) ||
02288 state->memtupcount >= state->memtupsize)
02289 {
02290
02291
02292
02293
02294 Assert(state->memtupcount > 0);
02295 WRITETUP(state, state->tp_tapenum[state->destTape],
02296 &state->memtuples[0]);
02297 tuplesort_heap_siftup(state, true);
02298
02299
02300
02301
02302
02303 if (state->memtupcount == 0 ||
02304 state->currentRun != state->memtuples[0].tupindex)
02305 {
02306 markrunend(state, state->tp_tapenum[state->destTape]);
02307 state->currentRun++;
02308 state->tp_runs[state->destTape]++;
02309 state->tp_dummy[state->destTape]--;
02310
02311 #ifdef TRACE_SORT
02312 if (trace_sort)
02313 elog(LOG, "finished writing%s run %d to tape %d: %s",
02314 (state->memtupcount == 0) ? " final" : "",
02315 state->currentRun, state->destTape,
02316 pg_rusage_show(&state->ru_start));
02317 #endif
02318
02319
02320
02321
02322 if (state->memtupcount == 0)
02323 break;
02324 Assert(state->currentRun == state->memtuples[0].tupindex);
02325 selectnewtape(state);
02326 }
02327 }
02328 }
02329
02330
02331
02332
02333 void
02334 tuplesort_rescan(Tuplesortstate *state)
02335 {
02336 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
02337
02338 Assert(state->randomAccess);
02339
02340 switch (state->status)
02341 {
02342 case TSS_SORTEDINMEM:
02343 state->current = 0;
02344 state->eof_reached = false;
02345 state->markpos_offset = 0;
02346 state->markpos_eof = false;
02347 break;
02348 case TSS_SORTEDONTAPE:
02349 LogicalTapeRewind(state->tapeset,
02350 state->result_tape,
02351 false);
02352 state->eof_reached = false;
02353 state->markpos_block = 0L;
02354 state->markpos_offset = 0;
02355 state->markpos_eof = false;
02356 break;
02357 default:
02358 elog(ERROR, "invalid tuplesort state");
02359 break;
02360 }
02361
02362 MemoryContextSwitchTo(oldcontext);
02363 }
02364
02365
02366
02367
02368 void
02369 tuplesort_markpos(Tuplesortstate *state)
02370 {
02371 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
02372
02373 Assert(state->randomAccess);
02374
02375 switch (state->status)
02376 {
02377 case TSS_SORTEDINMEM:
02378 state->markpos_offset = state->current;
02379 state->markpos_eof = state->eof_reached;
02380 break;
02381 case TSS_SORTEDONTAPE:
02382 LogicalTapeTell(state->tapeset,
02383 state->result_tape,
02384 &state->markpos_block,
02385 &state->markpos_offset);
02386 state->markpos_eof = state->eof_reached;
02387 break;
02388 default:
02389 elog(ERROR, "invalid tuplesort state");
02390 break;
02391 }
02392
02393 MemoryContextSwitchTo(oldcontext);
02394 }
02395
02396
02397
02398
02399
02400 void
02401 tuplesort_restorepos(Tuplesortstate *state)
02402 {
02403 MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
02404
02405 Assert(state->randomAccess);
02406
02407 switch (state->status)
02408 {
02409 case TSS_SORTEDINMEM:
02410 state->current = state->markpos_offset;
02411 state->eof_reached = state->markpos_eof;
02412 break;
02413 case TSS_SORTEDONTAPE:
02414 if (!LogicalTapeSeek(state->tapeset,
02415 state->result_tape,
02416 state->markpos_block,
02417 state->markpos_offset))
02418 elog(ERROR, "tuplesort_restorepos failed");
02419 state->eof_reached = state->markpos_eof;
02420 break;
02421 default:
02422 elog(ERROR, "invalid tuplesort state");
02423 break;
02424 }
02425
02426 MemoryContextSwitchTo(oldcontext);
02427 }
02428
02429
02430
02431
02432
02433
02434
02435
02436 void
02437 tuplesort_get_stats(Tuplesortstate *state,
02438 const char **sortMethod,
02439 const char **spaceType,
02440 long *spaceUsed)
02441 {
02442
02443
02444
02445
02446
02447
02448
02449
02450
02451 if (state->tapeset)
02452 {
02453 *spaceType = "Disk";
02454 *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
02455 }
02456 else
02457 {
02458 *spaceType = "Memory";
02459 *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
02460 }
02461
02462 switch (state->status)
02463 {
02464 case TSS_SORTEDINMEM:
02465 if (state->boundUsed)
02466 *sortMethod = "top-N heapsort";
02467 else
02468 *sortMethod = "quicksort";
02469 break;
02470 case TSS_SORTEDONTAPE:
02471 *sortMethod = "external sort";
02472 break;
02473 case TSS_FINALMERGE:
02474 *sortMethod = "external merge";
02475 break;
02476 default:
02477 *sortMethod = "still in progress";
02478 break;
02479 }
02480 }
02481
02482
02483
02484
02485
02486
02487
02488
02489
02490 #define HEAPCOMPARE(tup1,tup2) \
02491 (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
02492 ((tup1)->tupindex) - ((tup2)->tupindex) : \
02493 COMPARETUP(state, tup1, tup2))
02494
02495
02496
02497
02498
02499
02500
02501
02502
02503
02504
02505
02506
02507
02508 static void
02509 make_bounded_heap(Tuplesortstate *state)
02510 {
02511 int tupcount = state->memtupcount;
02512 int i;
02513
02514 Assert(state->status == TSS_INITIAL);
02515 Assert(state->bounded);
02516 Assert(tupcount >= state->bound);
02517
02518
02519 REVERSEDIRECTION(state);
02520
02521 state->memtupcount = 0;
02522 for (i = 0; i < tupcount; i++)
02523 {
02524 if (state->memtupcount >= state->bound &&
02525 COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
02526 {
02527
02528 free_sort_tuple(state, &state->memtuples[i]);
02529 CHECK_FOR_INTERRUPTS();
02530 }
02531 else
02532 {
02533
02534
02535 SortTuple stup = state->memtuples[i];
02536
02537 tuplesort_heap_insert(state, &stup, 0, false);
02538
02539
02540 if (state->memtupcount > state->bound)
02541 {
02542 free_sort_tuple(state, &state->memtuples[0]);
02543 tuplesort_heap_siftup(state, false);
02544 }
02545 }
02546 }
02547
02548 Assert(state->memtupcount == state->bound);
02549 state->status = TSS_BOUNDED;
02550 }
02551
02552
02553
02554
02555 static void
02556 sort_bounded_heap(Tuplesortstate *state)
02557 {
02558 int tupcount = state->memtupcount;
02559
02560 Assert(state->status == TSS_BOUNDED);
02561 Assert(state->bounded);
02562 Assert(tupcount == state->bound);
02563
02564
02565
02566
02567
02568
02569 while (state->memtupcount > 1)
02570 {
02571 SortTuple stup = state->memtuples[0];
02572
02573
02574 tuplesort_heap_siftup(state, false);
02575 state->memtuples[state->memtupcount] = stup;
02576 }
02577 state->memtupcount = tupcount;
02578
02579
02580
02581
02582
02583 REVERSEDIRECTION(state);
02584
02585 state->status = TSS_SORTEDINMEM;
02586 state->boundUsed = true;
02587 }
02588
02589
02590
02591
02592
02593
02594
02595
02596
02597
02598
02599 static void
02600 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
02601 int tupleindex, bool checkIndex)
02602 {
02603 SortTuple *memtuples;
02604 int j;
02605
02606
02607
02608
02609
02610
02611
02612 tuple->tupindex = tupleindex;
02613
02614 memtuples = state->memtuples;
02615 Assert(state->memtupcount < state->memtupsize);
02616
02617 CHECK_FOR_INTERRUPTS();
02618
02619
02620
02621
02622
02623 j = state->memtupcount++;
02624 while (j > 0)
02625 {
02626 int i = (j - 1) >> 1;
02627
02628 if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
02629 break;
02630 memtuples[j] = memtuples[i];
02631 j = i;
02632 }
02633 memtuples[j] = *tuple;
02634 }
02635
02636
02637
02638
02639
02640 static void
02641 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
02642 {
02643 SortTuple *memtuples = state->memtuples;
02644 SortTuple *tuple;
02645 int i,
02646 n;
02647
02648 if (--state->memtupcount <= 0)
02649 return;
02650
02651 CHECK_FOR_INTERRUPTS();
02652
02653 n = state->memtupcount;
02654 tuple = &memtuples[n];
02655 i = 0;
02656 for (;;)
02657 {
02658 int j = 2 * i + 1;
02659
02660 if (j >= n)
02661 break;
02662 if (j + 1 < n &&
02663 HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
02664 j++;
02665 if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
02666 break;
02667 memtuples[i] = memtuples[j];
02668 i = j;
02669 }
02670 memtuples[i] = *tuple;
02671 }
02672
02673
02674
02675
02676
02677
02678 static unsigned int
02679 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
02680 {
02681 unsigned int len;
02682
02683 if (LogicalTapeRead(state->tapeset, tapenum,
02684 &len, sizeof(len)) != sizeof(len))
02685 elog(ERROR, "unexpected end of tape");
02686 if (len == 0 && !eofOK)
02687 elog(ERROR, "unexpected end of data");
02688 return len;
02689 }
02690
02691 static void
02692 markrunend(Tuplesortstate *state, int tapenum)
02693 {
02694 unsigned int len = 0;
02695
02696 LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
02697 }
02698
02699
02700
02701
02702
02703 static inline Datum
02704 myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
02705 {
02706 FunctionCallInfoData fcinfo;
02707 Datum result;
02708
02709 InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
02710
02711 fcinfo.arg[0] = arg1;
02712 fcinfo.arg[1] = arg2;
02713 fcinfo.argnull[0] = false;
02714 fcinfo.argnull[1] = false;
02715
02716 result = FunctionCallInvoke(&fcinfo);
02717
02718
02719 if (fcinfo.isnull)
02720 elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
02721
02722 return result;
02723 }
02724
02725
02726
02727
02728
02729
02730
02731 static inline int32
02732 inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
02733 Datum datum1, bool isNull1,
02734 Datum datum2, bool isNull2)
02735 {
02736 int32 compare;
02737
02738 if (isNull1)
02739 {
02740 if (isNull2)
02741 compare = 0;
02742 else if (sk_flags & SK_BT_NULLS_FIRST)
02743 compare = -1;
02744 else
02745 compare = 1;
02746 }
02747 else if (isNull2)
02748 {
02749 if (sk_flags & SK_BT_NULLS_FIRST)
02750 compare = 1;
02751 else
02752 compare = -1;
02753 }
02754 else
02755 {
02756 compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
02757 datum1, datum2));
02758
02759 if (sk_flags & SK_BT_DESC)
02760 compare = -compare;
02761 }
02762
02763 return compare;
02764 }
02765
02766
02767
02768
02769
02770
02771 static int
02772 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
02773 {
02774 SortSupport sortKey = state->sortKeys;
02775 HeapTupleData ltup;
02776 HeapTupleData rtup;
02777 TupleDesc tupDesc;
02778 int nkey;
02779 int32 compare;
02780
02781
02782 compare = ApplySortComparator(a->datum1, a->isnull1,
02783 b->datum1, b->isnull1,
02784 sortKey);
02785 if (compare != 0)
02786 return compare;
02787
02788
02789 ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
02790 ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
02791 rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
02792 rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
02793 tupDesc = state->tupDesc;
02794 sortKey++;
02795 for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
02796 {
02797 AttrNumber attno = sortKey->ssup_attno;
02798 Datum datum1,
02799 datum2;
02800 bool isnull1,
02801 isnull2;
02802
02803 datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
02804 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
02805
02806 compare = ApplySortComparator(datum1, isnull1,
02807 datum2, isnull2,
02808 sortKey);
02809 if (compare != 0)
02810 return compare;
02811 }
02812
02813 return 0;
02814 }
02815
02816 static void
02817 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
02818 {
02819
02820
02821
02822
02823 TupleTableSlot *slot = (TupleTableSlot *) tup;
02824 MinimalTuple tuple;
02825 HeapTupleData htup;
02826
02827
02828 tuple = ExecCopySlotMinimalTuple(slot);
02829 stup->tuple = (void *) tuple;
02830 USEMEM(state, GetMemoryChunkSpace(tuple));
02831
02832 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
02833 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
02834 stup->datum1 = heap_getattr(&htup,
02835 state->sortKeys[0].ssup_attno,
02836 state->tupDesc,
02837 &stup->isnull1);
02838 }
02839
02840 static void
02841 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
02842 {
02843 MinimalTuple tuple = (MinimalTuple) stup->tuple;
02844
02845
02846 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
02847 unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
02848
02849
02850 unsigned int tuplen = tupbodylen + sizeof(int);
02851
02852 LogicalTapeWrite(state->tapeset, tapenum,
02853 (void *) &tuplen, sizeof(tuplen));
02854 LogicalTapeWrite(state->tapeset, tapenum,
02855 (void *) tupbody, tupbodylen);
02856 if (state->randomAccess)
02857 LogicalTapeWrite(state->tapeset, tapenum,
02858 (void *) &tuplen, sizeof(tuplen));
02859
02860 FREEMEM(state, GetMemoryChunkSpace(tuple));
02861 heap_free_minimal_tuple(tuple);
02862 }
02863
02864 static void
02865 readtup_heap(Tuplesortstate *state, SortTuple *stup,
02866 int tapenum, unsigned int len)
02867 {
02868 unsigned int tupbodylen = len - sizeof(int);
02869 unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
02870 MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
02871 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
02872 HeapTupleData htup;
02873
02874 USEMEM(state, GetMemoryChunkSpace(tuple));
02875
02876 tuple->t_len = tuplen;
02877 LogicalTapeReadExact(state->tapeset, tapenum,
02878 tupbody, tupbodylen);
02879 if (state->randomAccess)
02880 LogicalTapeReadExact(state->tapeset, tapenum,
02881 &tuplen, sizeof(tuplen));
02882 stup->tuple = (void *) tuple;
02883
02884 htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
02885 htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
02886 stup->datum1 = heap_getattr(&htup,
02887 state->sortKeys[0].ssup_attno,
02888 state->tupDesc,
02889 &stup->isnull1);
02890 }
02891
02892 static void
02893 reversedirection_heap(Tuplesortstate *state)
02894 {
02895 SortSupport sortKey = state->sortKeys;
02896 int nkey;
02897
02898 for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
02899 {
02900 sortKey->ssup_reverse = !sortKey->ssup_reverse;
02901 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
02902 }
02903 }
02904
02905
02906
02907
02908
02909
02910
02911 static int
02912 comparetup_cluster(const SortTuple *a, const SortTuple *b,
02913 Tuplesortstate *state)
02914 {
02915 ScanKey scanKey = state->indexScanKey;
02916 HeapTuple ltup;
02917 HeapTuple rtup;
02918 TupleDesc tupDesc;
02919 int nkey;
02920 int32 compare;
02921
02922
02923 if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
02924 {
02925 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
02926 scanKey->sk_collation,
02927 a->datum1, a->isnull1,
02928 b->datum1, b->isnull1);
02929 if (compare != 0 || state->nKeys == 1)
02930 return compare;
02931
02932 scanKey++;
02933 nkey = 1;
02934 }
02935 else
02936 {
02937
02938 nkey = 0;
02939 }
02940
02941
02942 ltup = (HeapTuple) a->tuple;
02943 rtup = (HeapTuple) b->tuple;
02944
02945 if (state->indexInfo->ii_Expressions == NULL)
02946 {
02947
02948 tupDesc = state->tupDesc;
02949
02950 for (; nkey < state->nKeys; nkey++, scanKey++)
02951 {
02952 AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
02953 Datum datum1,
02954 datum2;
02955 bool isnull1,
02956 isnull2;
02957
02958 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
02959 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
02960
02961 compare = inlineApplySortFunction(&scanKey->sk_func,
02962 scanKey->sk_flags,
02963 scanKey->sk_collation,
02964 datum1, isnull1,
02965 datum2, isnull2);
02966 if (compare != 0)
02967 return compare;
02968 }
02969 }
02970 else
02971 {
02972
02973
02974
02975
02976
02977
02978 Datum l_index_values[INDEX_MAX_KEYS];
02979 bool l_index_isnull[INDEX_MAX_KEYS];
02980 Datum r_index_values[INDEX_MAX_KEYS];
02981 bool r_index_isnull[INDEX_MAX_KEYS];
02982 TupleTableSlot *ecxt_scantuple;
02983
02984
02985 ResetPerTupleExprContext(state->estate);
02986
02987 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
02988
02989 ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
02990 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
02991 l_index_values, l_index_isnull);
02992
02993 ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
02994 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
02995 r_index_values, r_index_isnull);
02996
02997 for (; nkey < state->nKeys; nkey++, scanKey++)
02998 {
02999 compare = inlineApplySortFunction(&scanKey->sk_func,
03000 scanKey->sk_flags,
03001 scanKey->sk_collation,
03002 l_index_values[nkey],
03003 l_index_isnull[nkey],
03004 r_index_values[nkey],
03005 r_index_isnull[nkey]);
03006 if (compare != 0)
03007 return compare;
03008 }
03009 }
03010
03011 return 0;
03012 }
03013
03014 static void
03015 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
03016 {
03017 HeapTuple tuple = (HeapTuple) tup;
03018
03019
03020 tuple = heap_copytuple(tuple);
03021 stup->tuple = (void *) tuple;
03022 USEMEM(state, GetMemoryChunkSpace(tuple));
03023
03024 if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
03025 stup->datum1 = heap_getattr(tuple,
03026 state->indexInfo->ii_KeyAttrNumbers[0],
03027 state->tupDesc,
03028 &stup->isnull1);
03029 }
03030
03031 static void
03032 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
03033 {
03034 HeapTuple tuple = (HeapTuple) stup->tuple;
03035 unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
03036
03037
03038 LogicalTapeWrite(state->tapeset, tapenum,
03039 &tuplen, sizeof(tuplen));
03040 LogicalTapeWrite(state->tapeset, tapenum,
03041 &tuple->t_self, sizeof(ItemPointerData));
03042 LogicalTapeWrite(state->tapeset, tapenum,
03043 tuple->t_data, tuple->t_len);
03044 if (state->randomAccess)
03045 LogicalTapeWrite(state->tapeset, tapenum,
03046 &tuplen, sizeof(tuplen));
03047
03048 FREEMEM(state, GetMemoryChunkSpace(tuple));
03049 heap_freetuple(tuple);
03050 }
03051
03052 static void
03053 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
03054 int tapenum, unsigned int tuplen)
03055 {
03056 unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
03057 HeapTuple tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
03058
03059 USEMEM(state, GetMemoryChunkSpace(tuple));
03060
03061 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
03062 tuple->t_len = t_len;
03063 LogicalTapeReadExact(state->tapeset, tapenum,
03064 &tuple->t_self, sizeof(ItemPointerData));
03065
03066 tuple->t_tableOid = InvalidOid;
03067
03068 LogicalTapeReadExact(state->tapeset, tapenum,
03069 tuple->t_data, tuple->t_len);
03070 if (state->randomAccess)
03071 LogicalTapeReadExact(state->tapeset, tapenum,
03072 &tuplen, sizeof(tuplen));
03073 stup->tuple = (void *) tuple;
03074
03075 if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
03076 stup->datum1 = heap_getattr(tuple,
03077 state->indexInfo->ii_KeyAttrNumbers[0],
03078 state->tupDesc,
03079 &stup->isnull1);
03080 }
03081
03082
03083
03084
03085
03086
03087
03088
03089
03090
03091 static int
03092 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
03093 Tuplesortstate *state)
03094 {
03095
03096
03097
03098
03099
03100
03101 ScanKey scanKey = state->indexScanKey;
03102 IndexTuple tuple1;
03103 IndexTuple tuple2;
03104 int keysz;
03105 TupleDesc tupDes;
03106 bool equal_hasnull = false;
03107 int nkey;
03108 int32 compare;
03109
03110
03111 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
03112 scanKey->sk_collation,
03113 a->datum1, a->isnull1,
03114 b->datum1, b->isnull1);
03115 if (compare != 0)
03116 return compare;
03117
03118
03119 if (a->isnull1)
03120 equal_hasnull = true;
03121
03122
03123 tuple1 = (IndexTuple) a->tuple;
03124 tuple2 = (IndexTuple) b->tuple;
03125 keysz = state->nKeys;
03126 tupDes = RelationGetDescr(state->indexRel);
03127 scanKey++;
03128 for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
03129 {
03130 Datum datum1,
03131 datum2;
03132 bool isnull1,
03133 isnull2;
03134
03135 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
03136 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
03137
03138 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
03139 scanKey->sk_collation,
03140 datum1, isnull1,
03141 datum2, isnull2);
03142 if (compare != 0)
03143 return compare;
03144
03145
03146 if (isnull1)
03147 equal_hasnull = true;
03148 }
03149
03150
03151
03152
03153
03154
03155
03156
03157
03158
03159 if (state->enforceUnique && !equal_hasnull)
03160 {
03161 Datum values[INDEX_MAX_KEYS];
03162 bool isnull[INDEX_MAX_KEYS];
03163
03164
03165
03166
03167
03168
03169
03170 Assert(tuple1 != tuple2);
03171
03172 index_deform_tuple(tuple1, tupDes, values, isnull);
03173 ereport(ERROR,
03174 (errcode(ERRCODE_UNIQUE_VIOLATION),
03175 errmsg("could not create unique index \"%s\"",
03176 RelationGetRelationName(state->indexRel)),
03177 errdetail("Key %s is duplicated.",
03178 BuildIndexValueDescription(state->indexRel,
03179 values, isnull)),
03180 errtableconstraint(state->heapRel,
03181 RelationGetRelationName(state->indexRel))));
03182 }
03183
03184
03185
03186
03187
03188
03189 {
03190 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
03191 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
03192
03193 if (blk1 != blk2)
03194 return (blk1 < blk2) ? -1 : 1;
03195 }
03196 {
03197 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
03198 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
03199
03200 if (pos1 != pos2)
03201 return (pos1 < pos2) ? -1 : 1;
03202 }
03203
03204 return 0;
03205 }
03206
03207 static int
03208 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
03209 Tuplesortstate *state)
03210 {
03211 uint32 hash1;
03212 uint32 hash2;
03213 IndexTuple tuple1;
03214 IndexTuple tuple2;
03215
03216
03217
03218
03219
03220 Assert(!a->isnull1);
03221 hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
03222 Assert(!b->isnull1);
03223 hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
03224
03225 if (hash1 > hash2)
03226 return 1;
03227 else if (hash1 < hash2)
03228 return -1;
03229
03230
03231
03232
03233
03234
03235 tuple1 = (IndexTuple) a->tuple;
03236 tuple2 = (IndexTuple) b->tuple;
03237
03238 {
03239 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
03240 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
03241
03242 if (blk1 != blk2)
03243 return (blk1 < blk2) ? -1 : 1;
03244 }
03245 {
03246 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
03247 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
03248
03249 if (pos1 != pos2)
03250 return (pos1 < pos2) ? -1 : 1;
03251 }
03252
03253 return 0;
03254 }
03255
03256 static void
03257 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
03258 {
03259 IndexTuple tuple = (IndexTuple) tup;
03260 unsigned int tuplen = IndexTupleSize(tuple);
03261 IndexTuple newtuple;
03262
03263
03264 newtuple = (IndexTuple) palloc(tuplen);
03265 memcpy(newtuple, tuple, tuplen);
03266 USEMEM(state, GetMemoryChunkSpace(newtuple));
03267 stup->tuple = (void *) newtuple;
03268
03269 stup->datum1 = index_getattr(newtuple,
03270 1,
03271 RelationGetDescr(state->indexRel),
03272 &stup->isnull1);
03273 }
03274
03275 static void
03276 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
03277 {
03278 IndexTuple tuple = (IndexTuple) stup->tuple;
03279 unsigned int tuplen;
03280
03281 tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
03282 LogicalTapeWrite(state->tapeset, tapenum,
03283 (void *) &tuplen, sizeof(tuplen));
03284 LogicalTapeWrite(state->tapeset, tapenum,
03285 (void *) tuple, IndexTupleSize(tuple));
03286 if (state->randomAccess)
03287 LogicalTapeWrite(state->tapeset, tapenum,
03288 (void *) &tuplen, sizeof(tuplen));
03289
03290 FREEMEM(state, GetMemoryChunkSpace(tuple));
03291 pfree(tuple);
03292 }
03293
03294 static void
03295 readtup_index(Tuplesortstate *state, SortTuple *stup,
03296 int tapenum, unsigned int len)
03297 {
03298 unsigned int tuplen = len - sizeof(unsigned int);
03299 IndexTuple tuple = (IndexTuple) palloc(tuplen);
03300
03301 USEMEM(state, GetMemoryChunkSpace(tuple));
03302 LogicalTapeReadExact(state->tapeset, tapenum,
03303 tuple, tuplen);
03304 if (state->randomAccess)
03305 LogicalTapeReadExact(state->tapeset, tapenum,
03306 &tuplen, sizeof(tuplen));
03307 stup->tuple = (void *) tuple;
03308
03309 stup->datum1 = index_getattr(tuple,
03310 1,
03311 RelationGetDescr(state->indexRel),
03312 &stup->isnull1);
03313 }
03314
03315 static void
03316 reversedirection_index_btree(Tuplesortstate *state)
03317 {
03318 ScanKey scanKey = state->indexScanKey;
03319 int nkey;
03320
03321 for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
03322 {
03323 scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
03324 }
03325 }
03326
03327 static void
03328 reversedirection_index_hash(Tuplesortstate *state)
03329 {
03330
03331 elog(ERROR, "reversedirection_index_hash is not implemented");
03332 }
03333
03334
03335
03336
03337
03338
03339 static int
03340 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
03341 {
03342 return ApplySortComparator(a->datum1, a->isnull1,
03343 b->datum1, b->isnull1,
03344 state->onlyKey);
03345 }
03346
03347 static void
03348 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
03349 {
03350
03351 elog(ERROR, "copytup_datum() should not be called");
03352 }
03353
03354 static void
03355 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
03356 {
03357 void *waddr;
03358 unsigned int tuplen;
03359 unsigned int writtenlen;
03360
03361 if (stup->isnull1)
03362 {
03363 waddr = NULL;
03364 tuplen = 0;
03365 }
03366 else if (state->datumTypeByVal)
03367 {
03368 waddr = &stup->datum1;
03369 tuplen = sizeof(Datum);
03370 }
03371 else
03372 {
03373 waddr = DatumGetPointer(stup->datum1);
03374 tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
03375 Assert(tuplen != 0);
03376 }
03377
03378 writtenlen = tuplen + sizeof(unsigned int);
03379
03380 LogicalTapeWrite(state->tapeset, tapenum,
03381 (void *) &writtenlen, sizeof(writtenlen));
03382 LogicalTapeWrite(state->tapeset, tapenum,
03383 waddr, tuplen);
03384 if (state->randomAccess)
03385 LogicalTapeWrite(state->tapeset, tapenum,
03386 (void *) &writtenlen, sizeof(writtenlen));
03387
03388 if (stup->tuple)
03389 {
03390 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
03391 pfree(stup->tuple);
03392 }
03393 }
03394
03395 static void
03396 readtup_datum(Tuplesortstate *state, SortTuple *stup,
03397 int tapenum, unsigned int len)
03398 {
03399 unsigned int tuplen = len - sizeof(unsigned int);
03400
03401 if (tuplen == 0)
03402 {
03403
03404 stup->datum1 = (Datum) 0;
03405 stup->isnull1 = true;
03406 stup->tuple = NULL;
03407 }
03408 else if (state->datumTypeByVal)
03409 {
03410 Assert(tuplen == sizeof(Datum));
03411 LogicalTapeReadExact(state->tapeset, tapenum,
03412 &stup->datum1, tuplen);
03413 stup->isnull1 = false;
03414 stup->tuple = NULL;
03415 }
03416 else
03417 {
03418 void *raddr = palloc(tuplen);
03419
03420 LogicalTapeReadExact(state->tapeset, tapenum,
03421 raddr, tuplen);
03422 stup->datum1 = PointerGetDatum(raddr);
03423 stup->isnull1 = false;
03424 stup->tuple = raddr;
03425 USEMEM(state, GetMemoryChunkSpace(raddr));
03426 }
03427
03428 if (state->randomAccess)
03429 LogicalTapeReadExact(state->tapeset, tapenum,
03430 &tuplen, sizeof(tuplen));
03431 }
03432
03433 static void
03434 reversedirection_datum(Tuplesortstate *state)
03435 {
03436 state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
03437 state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
03438 }
03439
03440
03441
03442
03443 static void
03444 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
03445 {
03446 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
03447 pfree(stup->tuple);
03448 }