00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 #include "postgres.h"
00056
00057 #include "access/htup_details.h"
00058 #include "commands/tablespace.h"
00059 #include "executor/executor.h"
00060 #include "storage/buffile.h"
00061 #include "utils/memutils.h"
00062 #include "utils/resowner.h"
00063
00064
00065
00066
00067
00068
00069 typedef enum
00070 {
00071 TSS_INMEM,
00072 TSS_WRITEFILE,
00073 TSS_READFILE
00074 } TupStoreStatus;
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088 typedef struct
00089 {
00090 int eflags;
00091 bool eof_reached;
00092 int current;
00093 int file;
00094 off_t offset;
00095 } TSReadPointer;
00096
00097
00098
00099
00100 struct Tuplestorestate
00101 {
00102 TupStoreStatus status;
00103 int eflags;
00104 bool backward;
00105 bool interXact;
00106 bool truncated;
00107 long availMem;
00108 long allowedMem;
00109 BufFile *myfile;
00110 MemoryContext context;
00111 ResourceOwner resowner;
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127 void *(*copytup) (Tuplestorestate *state, void *tup);
00128
00129
00130
00131
00132
00133
00134
00135
00136 void (*writetup) (Tuplestorestate *state, void *tup);
00137
00138
00139
00140
00141
00142
00143
00144 void *(*readtup) (Tuplestorestate *state, unsigned int len);
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156 void **memtuples;
00157 int memtupdeleted;
00158 int memtupcount;
00159 int memtupsize;
00160 bool growmemtuples;
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170 TSReadPointer *readptrs;
00171 int activeptr;
00172 int readptrcount;
00173 int readptrsize;
00174
00175 int writepos_file;
00176 off_t writepos_offset;
00177 };
00178
00179 #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
00180 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
00181 #define READTUP(state,len) ((*(state)->readtup) (state, len))
00182 #define LACKMEM(state) ((state)->availMem < 0)
00183 #define USEMEM(state,amt) ((state)->availMem -= (amt))
00184 #define FREEMEM(state,amt) ((state)->availMem += (amt))
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232 static Tuplestorestate *tuplestore_begin_common(int eflags,
00233 bool interXact,
00234 int maxKBytes);
00235 static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
00236 static void dumptuples(Tuplestorestate *state);
00237 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
00238 static void *copytup_heap(Tuplestorestate *state, void *tup);
00239 static void writetup_heap(Tuplestorestate *state, void *tup);
00240 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
00241
00242
00243
00244
00245
00246
00247
00248 static Tuplestorestate *
00249 tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
00250 {
00251 Tuplestorestate *state;
00252
00253 state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
00254
00255 state->status = TSS_INMEM;
00256 state->eflags = eflags;
00257 state->interXact = interXact;
00258 state->truncated = false;
00259 state->allowedMem = maxKBytes * 1024L;
00260 state->availMem = state->allowedMem;
00261 state->myfile = NULL;
00262 state->context = CurrentMemoryContext;
00263 state->resowner = CurrentResourceOwner;
00264
00265 state->memtupdeleted = 0;
00266 state->memtupcount = 0;
00267 state->memtupsize = 1024;
00268 state->growmemtuples = true;
00269 state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
00270
00271 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
00272
00273 state->activeptr = 0;
00274 state->readptrcount = 1;
00275 state->readptrsize = 8;
00276 state->readptrs = (TSReadPointer *)
00277 palloc(state->readptrsize * sizeof(TSReadPointer));
00278
00279 state->readptrs[0].eflags = eflags;
00280 state->readptrs[0].eof_reached = false;
00281 state->readptrs[0].current = 0;
00282
00283 return state;
00284 }
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305 Tuplestorestate *
00306 tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
00307 {
00308 Tuplestorestate *state;
00309 int eflags;
00310
00311
00312
00313
00314
00315 eflags = randomAccess ?
00316 (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
00317 (EXEC_FLAG_REWIND);
00318
00319 state = tuplestore_begin_common(eflags, interXact, maxKBytes);
00320
00321 state->copytup = copytup_heap;
00322 state->writetup = writetup_heap;
00323 state->readtup = readtup_heap;
00324
00325 return state;
00326 }
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346 void
00347 tuplestore_set_eflags(Tuplestorestate *state, int eflags)
00348 {
00349 int i;
00350
00351 if (state->status != TSS_INMEM || state->memtupcount != 0)
00352 elog(ERROR, "too late to call tuplestore_set_eflags");
00353
00354 state->readptrs[0].eflags = eflags;
00355 for (i = 1; i < state->readptrcount; i++)
00356 eflags |= state->readptrs[i].eflags;
00357 state->eflags = eflags;
00358 }
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370 int
00371 tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
00372 {
00373
00374 if (state->status != TSS_INMEM || state->memtupcount != 0)
00375 {
00376 if ((state->eflags | eflags) != state->eflags)
00377 elog(ERROR, "too late to require new tuplestore eflags");
00378 }
00379
00380
00381 if (state->readptrcount >= state->readptrsize)
00382 {
00383 int newcnt = state->readptrsize * 2;
00384
00385 state->readptrs = (TSReadPointer *)
00386 repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
00387 state->readptrsize = newcnt;
00388 }
00389
00390
00391 state->readptrs[state->readptrcount] = state->readptrs[0];
00392 state->readptrs[state->readptrcount].eflags = eflags;
00393
00394 state->eflags |= eflags;
00395
00396 return state->readptrcount++;
00397 }
00398
00399
00400
00401
00402
00403
00404
00405 void
00406 tuplestore_clear(Tuplestorestate *state)
00407 {
00408 int i;
00409 TSReadPointer *readptr;
00410
00411 if (state->myfile)
00412 BufFileClose(state->myfile);
00413 state->myfile = NULL;
00414 if (state->memtuples)
00415 {
00416 for (i = state->memtupdeleted; i < state->memtupcount; i++)
00417 {
00418 FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
00419 pfree(state->memtuples[i]);
00420 }
00421 }
00422 state->status = TSS_INMEM;
00423 state->truncated = false;
00424 state->memtupdeleted = 0;
00425 state->memtupcount = 0;
00426 readptr = state->readptrs;
00427 for (i = 0; i < state->readptrcount; readptr++, i++)
00428 {
00429 readptr->eof_reached = false;
00430 readptr->current = 0;
00431 }
00432 }
00433
00434
00435
00436
00437
00438
00439 void
00440 tuplestore_end(Tuplestorestate *state)
00441 {
00442 int i;
00443
00444 if (state->myfile)
00445 BufFileClose(state->myfile);
00446 if (state->memtuples)
00447 {
00448 for (i = state->memtupdeleted; i < state->memtupcount; i++)
00449 pfree(state->memtuples[i]);
00450 pfree(state->memtuples);
00451 }
00452 pfree(state->readptrs);
00453 pfree(state);
00454 }
00455
00456
00457
00458
00459 void
00460 tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
00461 {
00462 TSReadPointer *readptr;
00463 TSReadPointer *oldptr;
00464
00465 Assert(ptr >= 0 && ptr < state->readptrcount);
00466
00467
00468 if (ptr == state->activeptr)
00469 return;
00470
00471 readptr = &state->readptrs[ptr];
00472 oldptr = &state->readptrs[state->activeptr];
00473
00474 switch (state->status)
00475 {
00476 case TSS_INMEM:
00477 case TSS_WRITEFILE:
00478
00479 break;
00480 case TSS_READFILE:
00481
00482
00483
00484
00485
00486 if (!oldptr->eof_reached)
00487 BufFileTell(state->myfile,
00488 &oldptr->file,
00489 &oldptr->offset);
00490
00491
00492
00493
00494
00495
00496
00497 if (readptr->eof_reached)
00498 {
00499 if (BufFileSeek(state->myfile,
00500 state->writepos_file,
00501 state->writepos_offset,
00502 SEEK_SET) != 0)
00503 elog(ERROR, "tuplestore seek failed");
00504 }
00505 else
00506 {
00507 if (BufFileSeek(state->myfile,
00508 readptr->file,
00509 readptr->offset,
00510 SEEK_SET) != 0)
00511 elog(ERROR, "tuplestore seek failed");
00512 }
00513 break;
00514 default:
00515 elog(ERROR, "invalid tuplestore state");
00516 break;
00517 }
00518
00519 state->activeptr = ptr;
00520 }
00521
00522
00523
00524
00525
00526
00527 bool
00528 tuplestore_ateof(Tuplestorestate *state)
00529 {
00530 return state->readptrs[state->activeptr].eof_reached;
00531 }
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547 static bool
00548 grow_memtuples(Tuplestorestate *state)
00549 {
00550 int newmemtupsize;
00551 int memtupsize = state->memtupsize;
00552 long memNowUsed = state->allowedMem - state->availMem;
00553
00554
00555 if (!state->growmemtuples)
00556 return false;
00557
00558
00559 if (memNowUsed <= state->availMem)
00560 {
00561
00562
00563
00564
00565
00566
00567
00568
00569 newmemtupsize = memtupsize * 2;
00570 }
00571 else
00572 {
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599 double grow_ratio;
00600
00601 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
00602 newmemtupsize = (int) (memtupsize * grow_ratio);
00603
00604
00605 state->growmemtuples = false;
00606 }
00607
00608
00609 if (newmemtupsize <= memtupsize)
00610 goto noalloc;
00611
00612
00613
00614
00615
00616 if ((Size) newmemtupsize >= MaxAllocSize / sizeof(void *))
00617 {
00618 newmemtupsize = (int) (MaxAllocSize / sizeof(void *));
00619 state->growmemtuples = false;
00620 }
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633 if (state->availMem < (long) ((newmemtupsize - memtupsize) * sizeof(void *)))
00634 goto noalloc;
00635
00636
00637 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
00638 state->memtupsize = newmemtupsize;
00639 state->memtuples = (void **)
00640 repalloc(state->memtuples,
00641 state->memtupsize * sizeof(void *));
00642 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
00643 if (LACKMEM(state))
00644 elog(ERROR, "unexpected out-of-memory situation during sort");
00645 return true;
00646
00647 noalloc:
00648
00649 state->growmemtuples = false;
00650 return false;
00651 }
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669 void
00670 tuplestore_puttupleslot(Tuplestorestate *state,
00671 TupleTableSlot *slot)
00672 {
00673 MinimalTuple tuple;
00674 MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
00675
00676
00677
00678
00679 tuple = ExecCopySlotMinimalTuple(slot);
00680 USEMEM(state, GetMemoryChunkSpace(tuple));
00681
00682 tuplestore_puttuple_common(state, (void *) tuple);
00683
00684 MemoryContextSwitchTo(oldcxt);
00685 }
00686
00687
00688
00689
00690
00691 void
00692 tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
00693 {
00694 MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
00695
00696
00697
00698
00699
00700 tuple = COPYTUP(state, tuple);
00701
00702 tuplestore_puttuple_common(state, (void *) tuple);
00703
00704 MemoryContextSwitchTo(oldcxt);
00705 }
00706
00707
00708
00709
00710
00711 void
00712 tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
00713 Datum *values, bool *isnull)
00714 {
00715 MinimalTuple tuple;
00716 MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
00717
00718 tuple = heap_form_minimal_tuple(tdesc, values, isnull);
00719 USEMEM(state, GetMemoryChunkSpace(tuple));
00720
00721 tuplestore_puttuple_common(state, (void *) tuple);
00722
00723 MemoryContextSwitchTo(oldcxt);
00724 }
00725
00726 static void
00727 tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
00728 {
00729 TSReadPointer *readptr;
00730 int i;
00731 ResourceOwner oldowner;
00732
00733 switch (state->status)
00734 {
00735 case TSS_INMEM:
00736
00737
00738
00739
00740 readptr = state->readptrs;
00741 for (i = 0; i < state->readptrcount; readptr++, i++)
00742 {
00743 if (readptr->eof_reached && i != state->activeptr)
00744 {
00745 readptr->eof_reached = false;
00746 readptr->current = state->memtupcount;
00747 }
00748 }
00749
00750
00751
00752
00753
00754
00755
00756 if (state->memtupcount >= state->memtupsize - 1)
00757 {
00758 (void) grow_memtuples(state);
00759 Assert(state->memtupcount < state->memtupsize);
00760 }
00761
00762
00763 state->memtuples[state->memtupcount++] = tuple;
00764
00765
00766
00767
00768 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
00769 return;
00770
00771
00772
00773
00774
00775 PrepareTempTablespaces();
00776
00777
00778 oldowner = CurrentResourceOwner;
00779 CurrentResourceOwner = state->resowner;
00780
00781 state->myfile = BufFileCreateTemp(state->interXact);
00782
00783 CurrentResourceOwner = oldowner;
00784
00785
00786
00787
00788
00789
00790 state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
00791 state->status = TSS_WRITEFILE;
00792 dumptuples(state);
00793 break;
00794 case TSS_WRITEFILE:
00795
00796
00797
00798
00799
00800
00801 readptr = state->readptrs;
00802 for (i = 0; i < state->readptrcount; readptr++, i++)
00803 {
00804 if (readptr->eof_reached && i != state->activeptr)
00805 {
00806 readptr->eof_reached = false;
00807 BufFileTell(state->myfile,
00808 &readptr->file,
00809 &readptr->offset);
00810 }
00811 }
00812
00813 WRITETUP(state, tuple);
00814 break;
00815 case TSS_READFILE:
00816
00817
00818
00819
00820 if (!state->readptrs[state->activeptr].eof_reached)
00821 BufFileTell(state->myfile,
00822 &state->readptrs[state->activeptr].file,
00823 &state->readptrs[state->activeptr].offset);
00824 if (BufFileSeek(state->myfile,
00825 state->writepos_file, state->writepos_offset,
00826 SEEK_SET) != 0)
00827 elog(ERROR, "tuplestore seek to EOF failed");
00828 state->status = TSS_WRITEFILE;
00829
00830
00831
00832
00833 readptr = state->readptrs;
00834 for (i = 0; i < state->readptrcount; readptr++, i++)
00835 {
00836 if (readptr->eof_reached && i != state->activeptr)
00837 {
00838 readptr->eof_reached = false;
00839 readptr->file = state->writepos_file;
00840 readptr->offset = state->writepos_offset;
00841 }
00842 }
00843
00844 WRITETUP(state, tuple);
00845 break;
00846 default:
00847 elog(ERROR, "invalid tuplestore state");
00848 break;
00849 }
00850 }
00851
00852
00853
00854
00855
00856
00857
00858
00859
00860 static void *
00861 tuplestore_gettuple(Tuplestorestate *state, bool forward,
00862 bool *should_free)
00863 {
00864 TSReadPointer *readptr = &state->readptrs[state->activeptr];
00865 unsigned int tuplen;
00866 void *tup;
00867
00868 Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
00869
00870 switch (state->status)
00871 {
00872 case TSS_INMEM:
00873 *should_free = false;
00874 if (forward)
00875 {
00876 if (readptr->eof_reached)
00877 return NULL;
00878 if (readptr->current < state->memtupcount)
00879 {
00880
00881 return state->memtuples[readptr->current++];
00882 }
00883 readptr->eof_reached = true;
00884 return NULL;
00885 }
00886 else
00887 {
00888
00889
00890
00891
00892 if (readptr->eof_reached)
00893 {
00894 readptr->current = state->memtupcount;
00895 readptr->eof_reached = false;
00896 }
00897 else
00898 {
00899 if (readptr->current <= state->memtupdeleted)
00900 {
00901 Assert(!state->truncated);
00902 return NULL;
00903 }
00904 readptr->current--;
00905 }
00906 if (readptr->current <= state->memtupdeleted)
00907 {
00908 Assert(!state->truncated);
00909 return NULL;
00910 }
00911 return state->memtuples[readptr->current - 1];
00912 }
00913 break;
00914
00915 case TSS_WRITEFILE:
00916
00917 if (readptr->eof_reached && forward)
00918 return NULL;
00919
00920
00921
00922
00923 BufFileTell(state->myfile,
00924 &state->writepos_file, &state->writepos_offset);
00925 if (!readptr->eof_reached)
00926 if (BufFileSeek(state->myfile,
00927 readptr->file, readptr->offset,
00928 SEEK_SET) != 0)
00929 elog(ERROR, "tuplestore seek failed");
00930 state->status = TSS_READFILE;
00931
00932
00933 case TSS_READFILE:
00934 *should_free = true;
00935 if (forward)
00936 {
00937 if ((tuplen = getlen(state, true)) != 0)
00938 {
00939 tup = READTUP(state, tuplen);
00940 return tup;
00941 }
00942 else
00943 {
00944 readptr->eof_reached = true;
00945 return NULL;
00946 }
00947 }
00948
00949
00950
00951
00952
00953
00954
00955
00956
00957
00958 if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
00959 SEEK_CUR) != 0)
00960 {
00961
00962 readptr->eof_reached = false;
00963 Assert(!state->truncated);
00964 return NULL;
00965 }
00966 tuplen = getlen(state, false);
00967
00968 if (readptr->eof_reached)
00969 {
00970 readptr->eof_reached = false;
00971
00972 }
00973 else
00974 {
00975
00976
00977
00978 if (BufFileSeek(state->myfile, 0,
00979 -(long) (tuplen + 2 * sizeof(unsigned int)),
00980 SEEK_CUR) != 0)
00981 {
00982
00983
00984
00985
00986
00987
00988 if (BufFileSeek(state->myfile, 0,
00989 -(long) (tuplen + sizeof(unsigned int)),
00990 SEEK_CUR) != 0)
00991 elog(ERROR, "bogus tuple length in backward scan");
00992 Assert(!state->truncated);
00993 return NULL;
00994 }
00995 tuplen = getlen(state, false);
00996 }
00997
00998
00999
01000
01001
01002
01003 if (BufFileSeek(state->myfile, 0,
01004 -(long) tuplen,
01005 SEEK_CUR) != 0)
01006 elog(ERROR, "bogus tuple length in backward scan");
01007 tup = READTUP(state, tuplen);
01008 return tup;
01009
01010 default:
01011 elog(ERROR, "invalid tuplestore state");
01012 return NULL;
01013 }
01014 }
01015
01016
01017
01018
01019
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029 bool
01030 tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
01031 bool copy, TupleTableSlot *slot)
01032 {
01033 MinimalTuple tuple;
01034 bool should_free;
01035
01036 tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
01037
01038 if (tuple)
01039 {
01040 if (copy && !should_free)
01041 {
01042 tuple = heap_copy_minimal_tuple(tuple);
01043 should_free = true;
01044 }
01045 ExecStoreMinimalTuple(tuple, slot, should_free);
01046 return true;
01047 }
01048 else
01049 {
01050 ExecClearTuple(slot);
01051 return false;
01052 }
01053 }
01054
01055
01056
01057
01058
01059
01060
01061
01062 bool
01063 tuplestore_advance(Tuplestorestate *state, bool forward)
01064 {
01065 void *tuple;
01066 bool should_free;
01067
01068 tuple = tuplestore_gettuple(state, forward, &should_free);
01069
01070 if (tuple)
01071 {
01072 if (should_free)
01073 pfree(tuple);
01074 return true;
01075 }
01076 else
01077 {
01078 return false;
01079 }
01080 }
01081
01082
01083
01084
01085
01086
01087
01088
01089 static void
01090 dumptuples(Tuplestorestate *state)
01091 {
01092 int i;
01093
01094 for (i = state->memtupdeleted;; i++)
01095 {
01096 TSReadPointer *readptr = state->readptrs;
01097 int j;
01098
01099 for (j = 0; j < state->readptrcount; readptr++, j++)
01100 {
01101 if (i == readptr->current && !readptr->eof_reached)
01102 BufFileTell(state->myfile,
01103 &readptr->file, &readptr->offset);
01104 }
01105 if (i >= state->memtupcount)
01106 break;
01107 WRITETUP(state, state->memtuples[i]);
01108 }
01109 state->memtupdeleted = 0;
01110 state->memtupcount = 0;
01111 }
01112
01113
01114
01115
01116 void
01117 tuplestore_rescan(Tuplestorestate *state)
01118 {
01119 TSReadPointer *readptr = &state->readptrs[state->activeptr];
01120
01121 Assert(readptr->eflags & EXEC_FLAG_REWIND);
01122 Assert(!state->truncated);
01123
01124 switch (state->status)
01125 {
01126 case TSS_INMEM:
01127 readptr->eof_reached = false;
01128 readptr->current = 0;
01129 break;
01130 case TSS_WRITEFILE:
01131 readptr->eof_reached = false;
01132 readptr->file = 0;
01133 readptr->offset = 0L;
01134 break;
01135 case TSS_READFILE:
01136 readptr->eof_reached = false;
01137 if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
01138 elog(ERROR, "tuplestore seek to start failed");
01139 break;
01140 default:
01141 elog(ERROR, "invalid tuplestore state");
01142 break;
01143 }
01144 }
01145
01146
01147
01148
01149 void
01150 tuplestore_copy_read_pointer(Tuplestorestate *state,
01151 int srcptr, int destptr)
01152 {
01153 TSReadPointer *sptr = &state->readptrs[srcptr];
01154 TSReadPointer *dptr = &state->readptrs[destptr];
01155
01156 Assert(srcptr >= 0 && srcptr < state->readptrcount);
01157 Assert(destptr >= 0 && destptr < state->readptrcount);
01158
01159
01160 if (srcptr == destptr)
01161 return;
01162
01163 if (dptr->eflags != sptr->eflags)
01164 {
01165
01166 int eflags;
01167 int i;
01168
01169 *dptr = *sptr;
01170 eflags = state->readptrs[0].eflags;
01171 for (i = 1; i < state->readptrcount; i++)
01172 eflags |= state->readptrs[i].eflags;
01173 state->eflags = eflags;
01174 }
01175 else
01176 *dptr = *sptr;
01177
01178 switch (state->status)
01179 {
01180 case TSS_INMEM:
01181 case TSS_WRITEFILE:
01182
01183 break;
01184 case TSS_READFILE:
01185
01186
01187
01188
01189
01190
01191
01192
01193 if (destptr == state->activeptr)
01194 {
01195 if (dptr->eof_reached)
01196 {
01197 if (BufFileSeek(state->myfile,
01198 state->writepos_file,
01199 state->writepos_offset,
01200 SEEK_SET) != 0)
01201 elog(ERROR, "tuplestore seek failed");
01202 }
01203 else
01204 {
01205 if (BufFileSeek(state->myfile,
01206 dptr->file, dptr->offset,
01207 SEEK_SET) != 0)
01208 elog(ERROR, "tuplestore seek failed");
01209 }
01210 }
01211 else if (srcptr == state->activeptr)
01212 {
01213 if (!dptr->eof_reached)
01214 BufFileTell(state->myfile,
01215 &dptr->file,
01216 &dptr->offset);
01217 }
01218 break;
01219 default:
01220 elog(ERROR, "invalid tuplestore state");
01221 break;
01222 }
01223 }
01224
01225
01226
01227
01228
01229
01230
01231
01232
01233
01234
01235
01236
01237 void
01238 tuplestore_trim(Tuplestorestate *state)
01239 {
01240 int oldest;
01241 int nremove;
01242 int i;
01243
01244
01245
01246
01247
01248 if (state->eflags & EXEC_FLAG_REWIND)
01249 return;
01250
01251
01252
01253
01254
01255 if (state->status != TSS_INMEM)
01256 return;
01257
01258
01259 oldest = state->memtupcount;
01260 for (i = 0; i < state->readptrcount; i++)
01261 {
01262 if (!state->readptrs[i].eof_reached)
01263 oldest = Min(oldest, state->readptrs[i].current);
01264 }
01265
01266
01267
01268
01269
01270
01271
01272
01273
01274
01275
01276 nremove = oldest - 1;
01277 if (nremove <= 0)
01278 return;
01279
01280 Assert(nremove >= state->memtupdeleted);
01281 Assert(nremove <= state->memtupcount);
01282
01283
01284 for (i = state->memtupdeleted; i < nremove; i++)
01285 {
01286 FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
01287 pfree(state->memtuples[i]);
01288 state->memtuples[i] = NULL;
01289 }
01290 state->memtupdeleted = nremove;
01291
01292
01293 state->truncated = true;
01294
01295
01296
01297
01298
01299
01300
01301 if (nremove < state->memtupcount / 8)
01302 return;
01303
01304
01305
01306
01307
01308
01309
01310 if (nremove + 1 == state->memtupcount)
01311 state->memtuples[0] = state->memtuples[nremove];
01312 else
01313 memmove(state->memtuples, state->memtuples + nremove,
01314 (state->memtupcount - nremove) * sizeof(void *));
01315
01316 state->memtupdeleted = 0;
01317 state->memtupcount -= nremove;
01318 for (i = 0; i < state->readptrcount; i++)
01319 {
01320 if (!state->readptrs[i].eof_reached)
01321 state->readptrs[i].current -= nremove;
01322 }
01323 }
01324
01325
01326
01327
01328
01329
01330
01331
01332 bool
01333 tuplestore_in_memory(Tuplestorestate *state)
01334 {
01335 return (state->status == TSS_INMEM);
01336 }
01337
01338
01339
01340
01341
01342
01343 static unsigned int
01344 getlen(Tuplestorestate *state, bool eofOK)
01345 {
01346 unsigned int len;
01347 size_t nbytes;
01348
01349 nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
01350 if (nbytes == sizeof(len))
01351 return len;
01352 if (nbytes != 0)
01353 elog(ERROR, "unexpected end of tape");
01354 if (!eofOK)
01355 elog(ERROR, "unexpected end of data");
01356 return 0;
01357 }
01358
01359
01360
01361
01362
01363
01364
01365
01366
01367
01368
01369
01370 static void *
01371 copytup_heap(Tuplestorestate *state, void *tup)
01372 {
01373 MinimalTuple tuple;
01374
01375 tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
01376 USEMEM(state, GetMemoryChunkSpace(tuple));
01377 return (void *) tuple;
01378 }
01379
01380 static void
01381 writetup_heap(Tuplestorestate *state, void *tup)
01382 {
01383 MinimalTuple tuple = (MinimalTuple) tup;
01384
01385
01386 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
01387 unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
01388
01389
01390 unsigned int tuplen = tupbodylen + sizeof(int);
01391
01392 if (BufFileWrite(state->myfile, (void *) &tuplen,
01393 sizeof(tuplen)) != sizeof(tuplen))
01394 elog(ERROR, "write failed");
01395 if (BufFileWrite(state->myfile, (void *) tupbody,
01396 tupbodylen) != (size_t) tupbodylen)
01397 elog(ERROR, "write failed");
01398 if (state->backward)
01399 if (BufFileWrite(state->myfile, (void *) &tuplen,
01400 sizeof(tuplen)) != sizeof(tuplen))
01401 elog(ERROR, "write failed");
01402
01403 FREEMEM(state, GetMemoryChunkSpace(tuple));
01404 heap_free_minimal_tuple(tuple);
01405 }
01406
01407 static void *
01408 readtup_heap(Tuplestorestate *state, unsigned int len)
01409 {
01410 unsigned int tupbodylen = len - sizeof(int);
01411 unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
01412 MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
01413 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
01414
01415 USEMEM(state, GetMemoryChunkSpace(tuple));
01416
01417 tuple->t_len = tuplen;
01418 if (BufFileRead(state->myfile, (void *) tupbody,
01419 tupbodylen) != (size_t) tupbodylen)
01420 elog(ERROR, "unexpected end of data");
01421 if (state->backward)
01422 if (BufFileRead(state->myfile, (void *) &tuplen,
01423 sizeof(tuplen)) != sizeof(tuplen))
01424 elog(ERROR, "unexpected end of data");
01425 return (void *) tuple;
01426 }