00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "postgres.h"
00017
00018 #include "access/hash.h"
00019 #include "utils/rel.h"
00020
00021
00022
00023
00024
00025
00026
00027
00028 void
00029 _hash_doinsert(Relation rel, IndexTuple itup)
00030 {
00031 Buffer buf;
00032 Buffer metabuf;
00033 HashMetaPage metap;
00034 BlockNumber blkno;
00035 BlockNumber oldblkno = InvalidBlockNumber;
00036 bool retry = false;
00037 Page page;
00038 HashPageOpaque pageopaque;
00039 Size itemsz;
00040 bool do_expand;
00041 uint32 hashkey;
00042 Bucket bucket;
00043
00044
00045
00046
00047 hashkey = _hash_get_indextuple_hashkey(itup);
00048
00049
00050 itemsz = IndexTupleDSize(*itup);
00051 itemsz = MAXALIGN(itemsz);
00052
00053
00054
00055 metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
00056 metap = HashPageGetMeta(BufferGetPage(metabuf));
00057
00058
00059
00060
00061
00062
00063
00064
00065 if (itemsz > HashMaxItemSize((Page) metap))
00066 ereport(ERROR,
00067 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
00068 errmsg("index row size %lu exceeds hash maximum %lu",
00069 (unsigned long) itemsz,
00070 (unsigned long) HashMaxItemSize((Page) metap)),
00071 errhint("Values larger than a buffer page cannot be indexed.")));
00072
00073
00074
00075
00076 for (;;)
00077 {
00078
00079
00080
00081 bucket = _hash_hashkey2bucket(hashkey,
00082 metap->hashm_maxbucket,
00083 metap->hashm_highmask,
00084 metap->hashm_lowmask);
00085
00086 blkno = BUCKET_TO_BLKNO(metap, bucket);
00087
00088
00089 _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
00090
00091
00092
00093
00094
00095
00096 if (retry)
00097 {
00098 if (oldblkno == blkno)
00099 break;
00100 _hash_droplock(rel, oldblkno, HASH_SHARE);
00101 }
00102 _hash_getlock(rel, blkno, HASH_SHARE);
00103
00104
00105
00106
00107
00108 _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_READ);
00109 oldblkno = blkno;
00110 retry = true;
00111 }
00112
00113
00114 buf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BUCKET_PAGE);
00115 page = BufferGetPage(buf);
00116 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
00117 Assert(pageopaque->hasho_bucket == bucket);
00118
00119
00120 while (PageGetFreeSpace(page) < itemsz)
00121 {
00122
00123
00124
00125 BlockNumber nextblkno = pageopaque->hasho_nextblkno;
00126
00127 if (BlockNumberIsValid(nextblkno))
00128 {
00129
00130
00131
00132
00133 _hash_relbuf(rel, buf);
00134 buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
00135 page = BufferGetPage(buf);
00136 }
00137 else
00138 {
00139
00140
00141
00142
00143
00144
00145 _hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
00146
00147
00148 buf = _hash_addovflpage(rel, metabuf, buf);
00149 page = BufferGetPage(buf);
00150
00151
00152 Assert(PageGetFreeSpace(page) >= itemsz);
00153 }
00154 pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
00155 Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE);
00156 Assert(pageopaque->hasho_bucket == bucket);
00157 }
00158
00159
00160 (void) _hash_pgaddtup(rel, buf, itemsz, itup);
00161
00162
00163 _hash_wrtbuf(rel, buf);
00164
00165
00166 _hash_droplock(rel, blkno, HASH_SHARE);
00167
00168
00169
00170
00171
00172 _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
00173
00174 metap->hashm_ntuples += 1;
00175
00176
00177 do_expand = metap->hashm_ntuples >
00178 (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
00179
00180
00181 _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
00182
00183
00184 if (do_expand)
00185 _hash_expandtable(rel, metabuf);
00186
00187
00188 _hash_dropbuf(rel, metabuf);
00189 }
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202 OffsetNumber
00203 _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
00204 {
00205 OffsetNumber itup_off;
00206 Page page;
00207 uint32 hashkey;
00208
00209 _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
00210 page = BufferGetPage(buf);
00211
00212
00213 hashkey = _hash_get_indextuple_hashkey(itup);
00214 itup_off = _hash_binsearch(page, hashkey);
00215
00216 if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
00217 == InvalidOffsetNumber)
00218 elog(ERROR, "failed to add index item to \"%s\"",
00219 RelationGetRelationName(rel));
00220
00221 return itup_off;
00222 }