00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "postgres.h"
00032
00033 #include <limits.h>
00034
00035 #include "access/genam.h"
00036 #include "access/heapam.h"
00037 #include "access/sysattr.h"
00038 #include "access/tuptoaster.h"
00039 #include "access/xact.h"
00040 #include "catalog/dependency.h"
00041 #include "catalog/indexing.h"
00042 #include "catalog/objectaccess.h"
00043 #include "catalog/pg_largeobject.h"
00044 #include "catalog/pg_largeobject_metadata.h"
00045 #include "libpq/libpq-fs.h"
00046 #include "miscadmin.h"
00047 #include "storage/large_object.h"
00048 #include "utils/fmgroids.h"
00049 #include "utils/rel.h"
00050 #include "utils/snapmgr.h"
00051 #include "utils/tqual.h"
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061 static Relation lo_heap_r = NULL;
00062 static Relation lo_index_r = NULL;
00063
00064
00065
00066
00067
00068 static void
00069 open_lo_relation(void)
00070 {
00071 ResourceOwner currentOwner;
00072
00073 if (lo_heap_r && lo_index_r)
00074 return;
00075
00076
00077 currentOwner = CurrentResourceOwner;
00078 PG_TRY();
00079 {
00080 CurrentResourceOwner = TopTransactionResourceOwner;
00081
00082
00083 if (lo_heap_r == NULL)
00084 lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
00085 if (lo_index_r == NULL)
00086 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
00087 }
00088 PG_CATCH();
00089 {
00090
00091 CurrentResourceOwner = currentOwner;
00092 PG_RE_THROW();
00093 }
00094 PG_END_TRY();
00095 CurrentResourceOwner = currentOwner;
00096 }
00097
00098
00099
00100
00101 void
00102 close_lo_relation(bool isCommit)
00103 {
00104 if (lo_heap_r || lo_index_r)
00105 {
00106
00107
00108
00109
00110 if (isCommit)
00111 {
00112 ResourceOwner currentOwner;
00113
00114 currentOwner = CurrentResourceOwner;
00115 PG_TRY();
00116 {
00117 CurrentResourceOwner = TopTransactionResourceOwner;
00118
00119 if (lo_index_r)
00120 index_close(lo_index_r, NoLock);
00121 if (lo_heap_r)
00122 heap_close(lo_heap_r, NoLock);
00123 }
00124 PG_CATCH();
00125 {
00126
00127 CurrentResourceOwner = currentOwner;
00128 PG_RE_THROW();
00129 }
00130 PG_END_TRY();
00131 CurrentResourceOwner = currentOwner;
00132 }
00133 lo_heap_r = NULL;
00134 lo_index_r = NULL;
00135 }
00136 }
00137
00138
00139
00140
00141
00142
00143 static bool
00144 myLargeObjectExists(Oid loid, Snapshot snapshot)
00145 {
00146 Relation pg_lo_meta;
00147 ScanKeyData skey[1];
00148 SysScanDesc sd;
00149 HeapTuple tuple;
00150 bool retval = false;
00151
00152 ScanKeyInit(&skey[0],
00153 ObjectIdAttributeNumber,
00154 BTEqualStrategyNumber, F_OIDEQ,
00155 ObjectIdGetDatum(loid));
00156
00157 pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
00158 AccessShareLock);
00159
00160 sd = systable_beginscan(pg_lo_meta,
00161 LargeObjectMetadataOidIndexId, true,
00162 snapshot, 1, skey);
00163
00164 tuple = systable_getnext(sd);
00165 if (HeapTupleIsValid(tuple))
00166 retval = true;
00167
00168 systable_endscan(sd);
00169
00170 heap_close(pg_lo_meta, AccessShareLock);
00171
00172 return retval;
00173 }
00174
00175
00176 static int32
00177 getbytealen(bytea *data)
00178 {
00179 Assert(!VARATT_IS_EXTENDED(data));
00180 if (VARSIZE(data) < VARHDRSZ)
00181 elog(ERROR, "invalid VARSIZE(data)");
00182 return (VARSIZE(data) - VARHDRSZ);
00183 }
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198 Oid
00199 inv_create(Oid lobjId)
00200 {
00201 Oid lobjId_new;
00202
00203
00204
00205
00206 lobjId_new = LargeObjectCreate(lobjId);
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217 recordDependencyOnOwner(LargeObjectRelationId,
00218 lobjId_new, GetUserId());
00219
00220
00221 InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
00222
00223
00224
00225
00226 CommandCounterIncrement();
00227
00228 return lobjId_new;
00229 }
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239 LargeObjectDesc *
00240 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
00241 {
00242 LargeObjectDesc *retval;
00243
00244 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
00245 sizeof(LargeObjectDesc));
00246
00247 retval->id = lobjId;
00248 retval->subid = GetCurrentSubTransactionId();
00249 retval->offset = 0;
00250
00251 if (flags & INV_WRITE)
00252 {
00253 retval->snapshot = SnapshotNow;
00254 retval->flags = IFS_WRLOCK | IFS_RDLOCK;
00255 }
00256 else if (flags & INV_READ)
00257 {
00258
00259
00260
00261
00262
00263 retval->snapshot = RegisterSnapshotOnOwner(GetActiveSnapshot(),
00264 TopTransactionResourceOwner);
00265 retval->flags = IFS_RDLOCK;
00266 }
00267 else
00268 ereport(ERROR,
00269 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00270 errmsg("invalid flags for opening a large object: %d",
00271 flags)));
00272
00273
00274 if (!myLargeObjectExists(lobjId, retval->snapshot))
00275 ereport(ERROR,
00276 (errcode(ERRCODE_UNDEFINED_OBJECT),
00277 errmsg("large object %u does not exist", lobjId)));
00278
00279 return retval;
00280 }
00281
00282
00283
00284
00285
00286 void
00287 inv_close(LargeObjectDesc *obj_desc)
00288 {
00289 Assert(PointerIsValid(obj_desc));
00290
00291 if (obj_desc->snapshot != SnapshotNow)
00292 UnregisterSnapshotFromOwner(obj_desc->snapshot,
00293 TopTransactionResourceOwner);
00294
00295 pfree(obj_desc);
00296 }
00297
00298
00299
00300
00301
00302
00303 int
00304 inv_drop(Oid lobjId)
00305 {
00306 ObjectAddress object;
00307
00308
00309
00310
00311 object.classId = LargeObjectRelationId;
00312 object.objectId = lobjId;
00313 object.objectSubId = 0;
00314 performDeletion(&object, DROP_CASCADE, 0);
00315
00316
00317
00318
00319
00320 CommandCounterIncrement();
00321
00322 return 1;
00323 }
00324
00325
00326
00327
00328
00329
00330
00331 static uint64
00332 inv_getsize(LargeObjectDesc *obj_desc)
00333 {
00334 uint64 lastbyte = 0;
00335 ScanKeyData skey[1];
00336 SysScanDesc sd;
00337 HeapTuple tuple;
00338
00339 Assert(PointerIsValid(obj_desc));
00340
00341 open_lo_relation();
00342
00343 ScanKeyInit(&skey[0],
00344 Anum_pg_largeobject_loid,
00345 BTEqualStrategyNumber, F_OIDEQ,
00346 ObjectIdGetDatum(obj_desc->id));
00347
00348 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
00349 obj_desc->snapshot, 1, skey);
00350
00351
00352
00353
00354
00355
00356
00357 tuple = systable_getnext_ordered(sd, BackwardScanDirection);
00358 if (HeapTupleIsValid(tuple))
00359 {
00360 Form_pg_largeobject data;
00361 bytea *datafield;
00362 bool pfreeit;
00363
00364 if (HeapTupleHasNulls(tuple))
00365 elog(ERROR, "null field found in pg_largeobject");
00366 data = (Form_pg_largeobject) GETSTRUCT(tuple);
00367 datafield = &(data->data);
00368 pfreeit = false;
00369 if (VARATT_IS_EXTENDED(datafield))
00370 {
00371 datafield = (bytea *)
00372 heap_tuple_untoast_attr((struct varlena *) datafield);
00373 pfreeit = true;
00374 }
00375 lastbyte = (uint64) data->pageno * LOBLKSIZE + getbytealen(datafield);
00376 if (pfreeit)
00377 pfree(datafield);
00378 }
00379
00380 systable_endscan_ordered(sd);
00381
00382 return lastbyte;
00383 }
00384
00385 int64
00386 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
00387 {
00388 int64 newoffset;
00389
00390 Assert(PointerIsValid(obj_desc));
00391
00392
00393
00394
00395
00396 switch (whence)
00397 {
00398 case SEEK_SET:
00399 newoffset = offset;
00400 break;
00401 case SEEK_CUR:
00402 newoffset = obj_desc->offset + offset;
00403 break;
00404 case SEEK_END:
00405 newoffset = inv_getsize(obj_desc) + offset;
00406 break;
00407 default:
00408 ereport(ERROR,
00409 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00410 errmsg("invalid whence setting: %d", whence)));
00411 newoffset = 0;
00412 break;
00413 }
00414
00415
00416
00417
00418
00419 if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
00420 ereport(ERROR,
00421 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00422 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
00423 newoffset)));
00424
00425 obj_desc->offset = newoffset;
00426 return newoffset;
00427 }
00428
00429 int64
00430 inv_tell(LargeObjectDesc *obj_desc)
00431 {
00432 Assert(PointerIsValid(obj_desc));
00433
00434 return obj_desc->offset;
00435 }
00436
00437 int
00438 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
00439 {
00440 int nread = 0;
00441 int64 n;
00442 int64 off;
00443 int len;
00444 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
00445 uint64 pageoff;
00446 ScanKeyData skey[2];
00447 SysScanDesc sd;
00448 HeapTuple tuple;
00449
00450 Assert(PointerIsValid(obj_desc));
00451 Assert(buf != NULL);
00452
00453 if (nbytes <= 0)
00454 return 0;
00455
00456 open_lo_relation();
00457
00458 ScanKeyInit(&skey[0],
00459 Anum_pg_largeobject_loid,
00460 BTEqualStrategyNumber, F_OIDEQ,
00461 ObjectIdGetDatum(obj_desc->id));
00462
00463 ScanKeyInit(&skey[1],
00464 Anum_pg_largeobject_pageno,
00465 BTGreaterEqualStrategyNumber, F_INT4GE,
00466 Int32GetDatum(pageno));
00467
00468 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
00469 obj_desc->snapshot, 2, skey);
00470
00471 while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
00472 {
00473 Form_pg_largeobject data;
00474 bytea *datafield;
00475 bool pfreeit;
00476
00477 if (HeapTupleHasNulls(tuple))
00478 elog(ERROR, "null field found in pg_largeobject");
00479 data = (Form_pg_largeobject) GETSTRUCT(tuple);
00480
00481
00482
00483
00484
00485
00486 pageoff = ((uint64) data->pageno) * LOBLKSIZE;
00487 if (pageoff > obj_desc->offset)
00488 {
00489 n = pageoff - obj_desc->offset;
00490 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
00491 MemSet(buf + nread, 0, n);
00492 nread += n;
00493 obj_desc->offset += n;
00494 }
00495
00496 if (nread < nbytes)
00497 {
00498 Assert(obj_desc->offset >= pageoff);
00499 off = (int) (obj_desc->offset - pageoff);
00500 Assert(off >= 0 && off < LOBLKSIZE);
00501
00502 datafield = &(data->data);
00503 pfreeit = false;
00504 if (VARATT_IS_EXTENDED(datafield))
00505 {
00506 datafield = (bytea *)
00507 heap_tuple_untoast_attr((struct varlena *) datafield);
00508 pfreeit = true;
00509 }
00510 len = getbytealen(datafield);
00511 if (len > off)
00512 {
00513 n = len - off;
00514 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
00515 memcpy(buf + nread, VARDATA(datafield) + off, n);
00516 nread += n;
00517 obj_desc->offset += n;
00518 }
00519 if (pfreeit)
00520 pfree(datafield);
00521 }
00522
00523 if (nread >= nbytes)
00524 break;
00525 }
00526
00527 systable_endscan_ordered(sd);
00528
00529 return nread;
00530 }
00531
00532 int
00533 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
00534 {
00535 int nwritten = 0;
00536 int n;
00537 int off;
00538 int len;
00539 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
00540 ScanKeyData skey[2];
00541 SysScanDesc sd;
00542 HeapTuple oldtuple;
00543 Form_pg_largeobject olddata;
00544 bool neednextpage;
00545 bytea *datafield;
00546 bool pfreeit;
00547 struct
00548 {
00549 bytea hdr;
00550 char data[LOBLKSIZE];
00551 int32 align_it;
00552 } workbuf;
00553 char *workb = VARDATA(&workbuf.hdr);
00554 HeapTuple newtup;
00555 Datum values[Natts_pg_largeobject];
00556 bool nulls[Natts_pg_largeobject];
00557 bool replace[Natts_pg_largeobject];
00558 CatalogIndexState indstate;
00559
00560 Assert(PointerIsValid(obj_desc));
00561 Assert(buf != NULL);
00562
00563
00564 Assert(obj_desc->flags & IFS_WRLOCK);
00565
00566 if (nbytes <= 0)
00567 return 0;
00568
00569
00570 if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
00571 ereport(ERROR,
00572 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00573 errmsg("invalid large object write request size: %d",
00574 nbytes)));
00575
00576 open_lo_relation();
00577
00578 indstate = CatalogOpenIndexes(lo_heap_r);
00579
00580 ScanKeyInit(&skey[0],
00581 Anum_pg_largeobject_loid,
00582 BTEqualStrategyNumber, F_OIDEQ,
00583 ObjectIdGetDatum(obj_desc->id));
00584
00585 ScanKeyInit(&skey[1],
00586 Anum_pg_largeobject_pageno,
00587 BTGreaterEqualStrategyNumber, F_INT4GE,
00588 Int32GetDatum(pageno));
00589
00590 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
00591 obj_desc->snapshot, 2, skey);
00592
00593 oldtuple = NULL;
00594 olddata = NULL;
00595 neednextpage = true;
00596
00597 while (nwritten < nbytes)
00598 {
00599
00600
00601
00602
00603 if (neednextpage)
00604 {
00605 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
00606 {
00607 if (HeapTupleHasNulls(oldtuple))
00608 elog(ERROR, "null field found in pg_largeobject");
00609 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
00610 Assert(olddata->pageno >= pageno);
00611 }
00612 neednextpage = false;
00613 }
00614
00615
00616
00617
00618
00619 if (olddata != NULL && olddata->pageno == pageno)
00620 {
00621
00622
00623
00624
00625
00626 datafield = &(olddata->data);
00627 pfreeit = false;
00628 if (VARATT_IS_EXTENDED(datafield))
00629 {
00630 datafield = (bytea *)
00631 heap_tuple_untoast_attr((struct varlena *) datafield);
00632 pfreeit = true;
00633 }
00634 len = getbytealen(datafield);
00635 Assert(len <= LOBLKSIZE);
00636 memcpy(workb, VARDATA(datafield), len);
00637 if (pfreeit)
00638 pfree(datafield);
00639
00640
00641
00642
00643 off = (int) (obj_desc->offset % LOBLKSIZE);
00644 if (off > len)
00645 MemSet(workb + len, 0, off - len);
00646
00647
00648
00649
00650 n = LOBLKSIZE - off;
00651 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
00652 memcpy(workb + off, buf + nwritten, n);
00653 nwritten += n;
00654 obj_desc->offset += n;
00655 off += n;
00656
00657 len = (len >= off) ? len : off;
00658 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
00659
00660
00661
00662
00663 memset(values, 0, sizeof(values));
00664 memset(nulls, false, sizeof(nulls));
00665 memset(replace, false, sizeof(replace));
00666 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
00667 replace[Anum_pg_largeobject_data - 1] = true;
00668 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
00669 values, nulls, replace);
00670 simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
00671 CatalogIndexInsert(indstate, newtup);
00672 heap_freetuple(newtup);
00673
00674
00675
00676
00677 oldtuple = NULL;
00678 olddata = NULL;
00679 neednextpage = true;
00680 }
00681 else
00682 {
00683
00684
00685
00686
00687
00688 off = (int) (obj_desc->offset % LOBLKSIZE);
00689 if (off > 0)
00690 MemSet(workb, 0, off);
00691
00692
00693
00694
00695 n = LOBLKSIZE - off;
00696 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
00697 memcpy(workb + off, buf + nwritten, n);
00698 nwritten += n;
00699 obj_desc->offset += n;
00700
00701 len = off + n;
00702 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
00703
00704
00705
00706
00707 memset(values, 0, sizeof(values));
00708 memset(nulls, false, sizeof(nulls));
00709 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
00710 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
00711 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
00712 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
00713 simple_heap_insert(lo_heap_r, newtup);
00714 CatalogIndexInsert(indstate, newtup);
00715 heap_freetuple(newtup);
00716 }
00717 pageno++;
00718 }
00719
00720 systable_endscan_ordered(sd);
00721
00722 CatalogCloseIndexes(indstate);
00723
00724
00725
00726
00727
00728 CommandCounterIncrement();
00729
00730 return nwritten;
00731 }
00732
00733 void
00734 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
00735 {
00736 int32 pageno = (int32) (len / LOBLKSIZE);
00737 int32 off;
00738 ScanKeyData skey[2];
00739 SysScanDesc sd;
00740 HeapTuple oldtuple;
00741 Form_pg_largeobject olddata;
00742 struct
00743 {
00744 bytea hdr;
00745 char data[LOBLKSIZE];
00746 int32 align_it;
00747 } workbuf;
00748 char *workb = VARDATA(&workbuf.hdr);
00749 HeapTuple newtup;
00750 Datum values[Natts_pg_largeobject];
00751 bool nulls[Natts_pg_largeobject];
00752 bool replace[Natts_pg_largeobject];
00753 CatalogIndexState indstate;
00754
00755 Assert(PointerIsValid(obj_desc));
00756
00757
00758 Assert(obj_desc->flags & IFS_WRLOCK);
00759
00760
00761
00762
00763
00764 if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
00765 ereport(ERROR,
00766 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00767 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
00768 len)));
00769
00770 open_lo_relation();
00771
00772 indstate = CatalogOpenIndexes(lo_heap_r);
00773
00774
00775
00776
00777 ScanKeyInit(&skey[0],
00778 Anum_pg_largeobject_loid,
00779 BTEqualStrategyNumber, F_OIDEQ,
00780 ObjectIdGetDatum(obj_desc->id));
00781
00782 ScanKeyInit(&skey[1],
00783 Anum_pg_largeobject_pageno,
00784 BTGreaterEqualStrategyNumber, F_INT4GE,
00785 Int32GetDatum(pageno));
00786
00787 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
00788 obj_desc->snapshot, 2, skey);
00789
00790
00791
00792
00793
00794 olddata = NULL;
00795 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
00796 {
00797 if (HeapTupleHasNulls(oldtuple))
00798 elog(ERROR, "null field found in pg_largeobject");
00799 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
00800 Assert(olddata->pageno >= pageno);
00801 }
00802
00803
00804
00805
00806
00807
00808 if (olddata != NULL && olddata->pageno == pageno)
00809 {
00810
00811 bytea *datafield = &(olddata->data);
00812
00813 bool pfreeit = false;
00814 int pagelen;
00815
00816 if (VARATT_IS_EXTENDED(datafield))
00817 {
00818 datafield = (bytea *)
00819 heap_tuple_untoast_attr((struct varlena *) datafield);
00820 pfreeit = true;
00821 }
00822 pagelen = getbytealen(datafield);
00823 Assert(pagelen <= LOBLKSIZE);
00824 memcpy(workb, VARDATA(datafield), pagelen);
00825 if (pfreeit)
00826 pfree(datafield);
00827
00828
00829
00830
00831 off = len % LOBLKSIZE;
00832 if (off > pagelen)
00833 MemSet(workb + pagelen, 0, off - pagelen);
00834
00835
00836 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
00837
00838
00839
00840
00841 memset(values, 0, sizeof(values));
00842 memset(nulls, false, sizeof(nulls));
00843 memset(replace, false, sizeof(replace));
00844 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
00845 replace[Anum_pg_largeobject_data - 1] = true;
00846 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
00847 values, nulls, replace);
00848 simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
00849 CatalogIndexInsert(indstate, newtup);
00850 heap_freetuple(newtup);
00851 }
00852 else
00853 {
00854
00855
00856
00857
00858
00859 if (olddata != NULL)
00860 {
00861 Assert(olddata->pageno > pageno);
00862 simple_heap_delete(lo_heap_r, &oldtuple->t_self);
00863 }
00864
00865
00866
00867
00868
00869
00870 off = len % LOBLKSIZE;
00871 if (off > 0)
00872 MemSet(workb, 0, off);
00873
00874
00875 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
00876
00877
00878
00879
00880 memset(values, 0, sizeof(values));
00881 memset(nulls, false, sizeof(nulls));
00882 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
00883 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
00884 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
00885 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
00886 simple_heap_insert(lo_heap_r, newtup);
00887 CatalogIndexInsert(indstate, newtup);
00888 heap_freetuple(newtup);
00889 }
00890
00891
00892
00893
00894
00895 if (olddata != NULL)
00896 {
00897 while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
00898 {
00899 simple_heap_delete(lo_heap_r, &oldtuple->t_self);
00900 }
00901 }
00902
00903 systable_endscan_ordered(sd);
00904
00905 CatalogCloseIndexes(indstate);
00906
00907
00908
00909
00910
00911 CommandCounterIncrement();
00912 }