Header And Logo

PostgreSQL
| The world's most advanced open source database.

inv_api.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * inv_api.c
00004  *    routines for manipulating inversion fs large objects. This file
00005  *    contains the user-level large object application interface routines.
00006  *
00007  *
00008  * Note: we access pg_largeobject.data using its C struct declaration.
00009  * This is safe because it immediately follows pageno which is an int4 field,
00010  * and therefore the data field will always be 4-byte aligned, even if it
00011  * is in the short 1-byte-header format.  We have to detoast it since it's
00012  * quite likely to be in compressed or short format.  We also need to check
00013  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
00014  *
00015  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
00016  * does most of the backend code.  We expect that CurrentMemoryContext will
00017  * be a short-lived context.  Data that must persist across function calls
00018  * is kept either in CacheMemoryContext (the Relation structs) or in the
00019  * memory context given to inv_open (for LargeObjectDesc structs).
00020  *
00021  *
00022  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00023  * Portions Copyright (c) 1994, Regents of the University of California
00024  *
00025  *
00026  * IDENTIFICATION
00027  *    src/backend/storage/large_object/inv_api.c
00028  *
00029  *-------------------------------------------------------------------------
00030  */
00031 #include "postgres.h"
00032 
00033 #include <limits.h>
00034 
00035 #include "access/genam.h"
00036 #include "access/heapam.h"
00037 #include "access/sysattr.h"
00038 #include "access/tuptoaster.h"
00039 #include "access/xact.h"
00040 #include "catalog/dependency.h"
00041 #include "catalog/indexing.h"
00042 #include "catalog/objectaccess.h"
00043 #include "catalog/pg_largeobject.h"
00044 #include "catalog/pg_largeobject_metadata.h"
00045 #include "libpq/libpq-fs.h"
00046 #include "miscadmin.h"
00047 #include "storage/large_object.h"
00048 #include "utils/fmgroids.h"
00049 #include "utils/rel.h"
00050 #include "utils/snapmgr.h"
00051 #include "utils/tqual.h"
00052 
00053 
00054 /*
00055  * All accesses to pg_largeobject and its index make use of a single Relation
00056  * reference, so that we only need to open pg_relation once per transaction.
00057  * To avoid problems when the first such reference occurs inside a
00058  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
00059  * the Relation reference to TopTransactionResourceOwner.
00060  */
00061 static Relation lo_heap_r = NULL;
00062 static Relation lo_index_r = NULL;
00063 
00064 
00065 /*
00066  * Open pg_largeobject and its index, if not already done in current xact
00067  */
00068 static void
00069 open_lo_relation(void)
00070 {
00071     ResourceOwner currentOwner;
00072 
00073     if (lo_heap_r && lo_index_r)
00074         return;                 /* already open in current xact */
00075 
00076     /* Arrange for the top xact to own these relation references */
00077     currentOwner = CurrentResourceOwner;
00078     PG_TRY();
00079     {
00080         CurrentResourceOwner = TopTransactionResourceOwner;
00081 
00082         /* Use RowExclusiveLock since we might either read or write */
00083         if (lo_heap_r == NULL)
00084             lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
00085         if (lo_index_r == NULL)
00086             lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
00087     }
00088     PG_CATCH();
00089     {
00090         /* Ensure CurrentResourceOwner is restored on error */
00091         CurrentResourceOwner = currentOwner;
00092         PG_RE_THROW();
00093     }
00094     PG_END_TRY();
00095     CurrentResourceOwner = currentOwner;
00096 }
00097 
00098 /*
00099  * Clean up at main transaction end
00100  */
00101 void
00102 close_lo_relation(bool isCommit)
00103 {
00104     if (lo_heap_r || lo_index_r)
00105     {
00106         /*
00107          * Only bother to close if committing; else abort cleanup will handle
00108          * it
00109          */
00110         if (isCommit)
00111         {
00112             ResourceOwner currentOwner;
00113 
00114             currentOwner = CurrentResourceOwner;
00115             PG_TRY();
00116             {
00117                 CurrentResourceOwner = TopTransactionResourceOwner;
00118 
00119                 if (lo_index_r)
00120                     index_close(lo_index_r, NoLock);
00121                 if (lo_heap_r)
00122                     heap_close(lo_heap_r, NoLock);
00123             }
00124             PG_CATCH();
00125             {
00126                 /* Ensure CurrentResourceOwner is restored on error */
00127                 CurrentResourceOwner = currentOwner;
00128                 PG_RE_THROW();
00129             }
00130             PG_END_TRY();
00131             CurrentResourceOwner = currentOwner;
00132         }
00133         lo_heap_r = NULL;
00134         lo_index_r = NULL;
00135     }
00136 }
00137 
00138 
00139 /*
00140  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
00141  * read with can be specified.
00142  */
00143 static bool
00144 myLargeObjectExists(Oid loid, Snapshot snapshot)
00145 {
00146     Relation    pg_lo_meta;
00147     ScanKeyData skey[1];
00148     SysScanDesc sd;
00149     HeapTuple   tuple;
00150     bool        retval = false;
00151 
00152     ScanKeyInit(&skey[0],
00153                 ObjectIdAttributeNumber,
00154                 BTEqualStrategyNumber, F_OIDEQ,
00155                 ObjectIdGetDatum(loid));
00156 
00157     pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
00158                            AccessShareLock);
00159 
00160     sd = systable_beginscan(pg_lo_meta,
00161                             LargeObjectMetadataOidIndexId, true,
00162                             snapshot, 1, skey);
00163 
00164     tuple = systable_getnext(sd);
00165     if (HeapTupleIsValid(tuple))
00166         retval = true;
00167 
00168     systable_endscan(sd);
00169 
00170     heap_close(pg_lo_meta, AccessShareLock);
00171 
00172     return retval;
00173 }
00174 
00175 
00176 static int32
00177 getbytealen(bytea *data)
00178 {
00179     Assert(!VARATT_IS_EXTENDED(data));
00180     if (VARSIZE(data) < VARHDRSZ)
00181         elog(ERROR, "invalid VARSIZE(data)");
00182     return (VARSIZE(data) - VARHDRSZ);
00183 }
00184 
00185 
00186 /*
00187  *  inv_create -- create a new large object
00188  *
00189  *  Arguments:
00190  *    lobjId - OID to use for new large object, or InvalidOid to pick one
00191  *
00192  *  Returns:
00193  *    OID of new object
00194  *
00195  * If lobjId is not InvalidOid, then an error occurs if the OID is already
00196  * in use.
00197  */
00198 Oid
00199 inv_create(Oid lobjId)
00200 {
00201     Oid         lobjId_new;
00202 
00203     /*
00204      * Create a new largeobject with empty data pages
00205      */
00206     lobjId_new = LargeObjectCreate(lobjId);
00207 
00208     /*
00209      * dependency on the owner of largeobject
00210      *
00211      * The reason why we use LargeObjectRelationId instead of
00212      * LargeObjectMetadataRelationId here is to provide backward compatibility
00213      * to the applications which utilize a knowledge about internal layout of
00214      * system catalogs. OID of pg_largeobject_metadata and loid of
00215      * pg_largeobject are same value, so there are no actual differences here.
00216      */
00217     recordDependencyOnOwner(LargeObjectRelationId,
00218                             lobjId_new, GetUserId());
00219 
00220     /* Post creation hook for new large object */
00221     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
00222 
00223     /*
00224      * Advance command counter to make new tuple visible to later operations.
00225      */
00226     CommandCounterIncrement();
00227 
00228     return lobjId_new;
00229 }
00230 
00231 /*
00232  *  inv_open -- access an existing large object.
00233  *
00234  *      Returns:
00235  *        Large object descriptor, appropriately filled in.  The descriptor
00236  *        and subsidiary data are allocated in the specified memory context,
00237  *        which must be suitably long-lived for the caller's purposes.
00238  */
00239 LargeObjectDesc *
00240 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
00241 {
00242     LargeObjectDesc *retval;
00243 
00244     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
00245                                                     sizeof(LargeObjectDesc));
00246 
00247     retval->id = lobjId;
00248     retval->subid = GetCurrentSubTransactionId();
00249     retval->offset = 0;
00250 
00251     if (flags & INV_WRITE)
00252     {
00253         retval->snapshot = SnapshotNow;
00254         retval->flags = IFS_WRLOCK | IFS_RDLOCK;
00255     }
00256     else if (flags & INV_READ)
00257     {
00258         /*
00259          * We must register the snapshot in TopTransaction's resowner, because
00260          * it must stay alive until the LO is closed rather than until the
00261          * current portal shuts down.
00262          */
00263         retval->snapshot = RegisterSnapshotOnOwner(GetActiveSnapshot(),
00264                                                 TopTransactionResourceOwner);
00265         retval->flags = IFS_RDLOCK;
00266     }
00267     else
00268         ereport(ERROR,
00269                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00270                  errmsg("invalid flags for opening a large object: %d",
00271                         flags)));
00272 
00273     /* Can't use LargeObjectExists here because it always uses SnapshotNow */
00274     if (!myLargeObjectExists(lobjId, retval->snapshot))
00275         ereport(ERROR,
00276                 (errcode(ERRCODE_UNDEFINED_OBJECT),
00277                  errmsg("large object %u does not exist", lobjId)));
00278 
00279     return retval;
00280 }
00281 
00282 /*
00283  * Closes a large object descriptor previously made by inv_open(), and
00284  * releases the long-term memory used by it.
00285  */
00286 void
00287 inv_close(LargeObjectDesc *obj_desc)
00288 {
00289     Assert(PointerIsValid(obj_desc));
00290 
00291     if (obj_desc->snapshot != SnapshotNow)
00292         UnregisterSnapshotFromOwner(obj_desc->snapshot,
00293                                     TopTransactionResourceOwner);
00294 
00295     pfree(obj_desc);
00296 }
00297 
00298 /*
00299  * Destroys an existing large object (not to be confused with a descriptor!)
00300  *
00301  * returns -1 if failed
00302  */
00303 int
00304 inv_drop(Oid lobjId)
00305 {
00306     ObjectAddress object;
00307 
00308     /*
00309      * Delete any comments and dependencies on the large object
00310      */
00311     object.classId = LargeObjectRelationId;
00312     object.objectId = lobjId;
00313     object.objectSubId = 0;
00314     performDeletion(&object, DROP_CASCADE, 0);
00315 
00316     /*
00317      * Advance command counter so that tuple removal will be seen by later
00318      * large-object operations in this transaction.
00319      */
00320     CommandCounterIncrement();
00321 
00322     return 1;
00323 }
00324 
00325 /*
00326  * Determine size of a large object
00327  *
00328  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
00329  * the offset of the last byte + 1.
00330  */
00331 static uint64
00332 inv_getsize(LargeObjectDesc *obj_desc)
00333 {
00334     uint64      lastbyte = 0;
00335     ScanKeyData skey[1];
00336     SysScanDesc sd;
00337     HeapTuple   tuple;
00338 
00339     Assert(PointerIsValid(obj_desc));
00340 
00341     open_lo_relation();
00342 
00343     ScanKeyInit(&skey[0],
00344                 Anum_pg_largeobject_loid,
00345                 BTEqualStrategyNumber, F_OIDEQ,
00346                 ObjectIdGetDatum(obj_desc->id));
00347 
00348     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
00349                                     obj_desc->snapshot, 1, skey);
00350 
00351     /*
00352      * Because the pg_largeobject index is on both loid and pageno, but we
00353      * constrain only loid, a backwards scan should visit all pages of the
00354      * large object in reverse pageno order.  So, it's sufficient to examine
00355      * the first valid tuple (== last valid page).
00356      */
00357     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
00358     if (HeapTupleIsValid(tuple))
00359     {
00360         Form_pg_largeobject data;
00361         bytea      *datafield;
00362         bool        pfreeit;
00363 
00364         if (HeapTupleHasNulls(tuple))   /* paranoia */
00365             elog(ERROR, "null field found in pg_largeobject");
00366         data = (Form_pg_largeobject) GETSTRUCT(tuple);
00367         datafield = &(data->data);      /* see note at top of file */
00368         pfreeit = false;
00369         if (VARATT_IS_EXTENDED(datafield))
00370         {
00371             datafield = (bytea *)
00372                 heap_tuple_untoast_attr((struct varlena *) datafield);
00373             pfreeit = true;
00374         }
00375         lastbyte = (uint64) data->pageno * LOBLKSIZE + getbytealen(datafield);
00376         if (pfreeit)
00377             pfree(datafield);
00378     }
00379 
00380     systable_endscan_ordered(sd);
00381 
00382     return lastbyte;
00383 }
00384 
00385 int64
00386 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
00387 {
00388     int64       newoffset;
00389 
00390     Assert(PointerIsValid(obj_desc));
00391 
00392     /*
00393      * Note: overflow in the additions is possible, but since we will reject
00394      * negative results, we don't need any extra test for that.
00395      */
00396     switch (whence)
00397     {
00398         case SEEK_SET:
00399             newoffset = offset;
00400             break;
00401         case SEEK_CUR:
00402             newoffset = obj_desc->offset + offset;
00403             break;
00404         case SEEK_END:
00405             newoffset = inv_getsize(obj_desc) + offset;
00406             break;
00407         default:
00408             ereport(ERROR,
00409                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00410                      errmsg("invalid whence setting: %d", whence)));
00411             newoffset = 0;      /* keep compiler quiet */
00412             break;
00413     }
00414 
00415     /*
00416      * use errmsg_internal here because we don't want to expose INT64_FORMAT
00417      * in translatable strings; doing better is not worth the trouble
00418      */
00419     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
00420         ereport(ERROR,
00421                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00422            errmsg_internal("invalid large object seek target: " INT64_FORMAT,
00423                            newoffset)));
00424 
00425     obj_desc->offset = newoffset;
00426     return newoffset;
00427 }
00428 
00429 int64
00430 inv_tell(LargeObjectDesc *obj_desc)
00431 {
00432     Assert(PointerIsValid(obj_desc));
00433 
00434     return obj_desc->offset;
00435 }
00436 
00437 int
00438 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
00439 {
00440     int         nread = 0;
00441     int64       n;
00442     int64       off;
00443     int         len;
00444     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
00445     uint64      pageoff;
00446     ScanKeyData skey[2];
00447     SysScanDesc sd;
00448     HeapTuple   tuple;
00449 
00450     Assert(PointerIsValid(obj_desc));
00451     Assert(buf != NULL);
00452 
00453     if (nbytes <= 0)
00454         return 0;
00455 
00456     open_lo_relation();
00457 
00458     ScanKeyInit(&skey[0],
00459                 Anum_pg_largeobject_loid,
00460                 BTEqualStrategyNumber, F_OIDEQ,
00461                 ObjectIdGetDatum(obj_desc->id));
00462 
00463     ScanKeyInit(&skey[1],
00464                 Anum_pg_largeobject_pageno,
00465                 BTGreaterEqualStrategyNumber, F_INT4GE,
00466                 Int32GetDatum(pageno));
00467 
00468     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
00469                                     obj_desc->snapshot, 2, skey);
00470 
00471     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
00472     {
00473         Form_pg_largeobject data;
00474         bytea      *datafield;
00475         bool        pfreeit;
00476 
00477         if (HeapTupleHasNulls(tuple))   /* paranoia */
00478             elog(ERROR, "null field found in pg_largeobject");
00479         data = (Form_pg_largeobject) GETSTRUCT(tuple);
00480 
00481         /*
00482          * We expect the indexscan will deliver pages in order.  However,
00483          * there may be missing pages if the LO contains unwritten "holes". We
00484          * want missing sections to read out as zeroes.
00485          */
00486         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
00487         if (pageoff > obj_desc->offset)
00488         {
00489             n = pageoff - obj_desc->offset;
00490             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
00491             MemSet(buf + nread, 0, n);
00492             nread += n;
00493             obj_desc->offset += n;
00494         }
00495 
00496         if (nread < nbytes)
00497         {
00498             Assert(obj_desc->offset >= pageoff);
00499             off = (int) (obj_desc->offset - pageoff);
00500             Assert(off >= 0 && off < LOBLKSIZE);
00501 
00502             datafield = &(data->data);  /* see note at top of file */
00503             pfreeit = false;
00504             if (VARATT_IS_EXTENDED(datafield))
00505             {
00506                 datafield = (bytea *)
00507                     heap_tuple_untoast_attr((struct varlena *) datafield);
00508                 pfreeit = true;
00509             }
00510             len = getbytealen(datafield);
00511             if (len > off)
00512             {
00513                 n = len - off;
00514                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
00515                 memcpy(buf + nread, VARDATA(datafield) + off, n);
00516                 nread += n;
00517                 obj_desc->offset += n;
00518             }
00519             if (pfreeit)
00520                 pfree(datafield);
00521         }
00522 
00523         if (nread >= nbytes)
00524             break;
00525     }
00526 
00527     systable_endscan_ordered(sd);
00528 
00529     return nread;
00530 }
00531 
00532 int
00533 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
00534 {
00535     int         nwritten = 0;
00536     int         n;
00537     int         off;
00538     int         len;
00539     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
00540     ScanKeyData skey[2];
00541     SysScanDesc sd;
00542     HeapTuple   oldtuple;
00543     Form_pg_largeobject olddata;
00544     bool        neednextpage;
00545     bytea      *datafield;
00546     bool        pfreeit;
00547     struct
00548     {
00549         bytea       hdr;
00550         char        data[LOBLKSIZE];    /* make struct big enough */
00551         int32       align_it;   /* ensure struct is aligned well enough */
00552     }           workbuf;
00553     char       *workb = VARDATA(&workbuf.hdr);
00554     HeapTuple   newtup;
00555     Datum       values[Natts_pg_largeobject];
00556     bool        nulls[Natts_pg_largeobject];
00557     bool        replace[Natts_pg_largeobject];
00558     CatalogIndexState indstate;
00559 
00560     Assert(PointerIsValid(obj_desc));
00561     Assert(buf != NULL);
00562 
00563     /* enforce writability because snapshot is probably wrong otherwise */
00564     Assert(obj_desc->flags & IFS_WRLOCK);
00565 
00566     if (nbytes <= 0)
00567         return 0;
00568 
00569     /* this addition can't overflow because nbytes is only int32 */
00570     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
00571         ereport(ERROR,
00572                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00573                  errmsg("invalid large object write request size: %d",
00574                         nbytes)));
00575 
00576     open_lo_relation();
00577 
00578     indstate = CatalogOpenIndexes(lo_heap_r);
00579 
00580     ScanKeyInit(&skey[0],
00581                 Anum_pg_largeobject_loid,
00582                 BTEqualStrategyNumber, F_OIDEQ,
00583                 ObjectIdGetDatum(obj_desc->id));
00584 
00585     ScanKeyInit(&skey[1],
00586                 Anum_pg_largeobject_pageno,
00587                 BTGreaterEqualStrategyNumber, F_INT4GE,
00588                 Int32GetDatum(pageno));
00589 
00590     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
00591                                     obj_desc->snapshot, 2, skey);
00592 
00593     oldtuple = NULL;
00594     olddata = NULL;
00595     neednextpage = true;
00596 
00597     while (nwritten < nbytes)
00598     {
00599         /*
00600          * If possible, get next pre-existing page of the LO.  We expect the
00601          * indexscan will deliver these in order --- but there may be holes.
00602          */
00603         if (neednextpage)
00604         {
00605             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
00606             {
00607                 if (HeapTupleHasNulls(oldtuple))        /* paranoia */
00608                     elog(ERROR, "null field found in pg_largeobject");
00609                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
00610                 Assert(olddata->pageno >= pageno);
00611             }
00612             neednextpage = false;
00613         }
00614 
00615         /*
00616          * If we have a pre-existing page, see if it is the page we want to
00617          * write, or a later one.
00618          */
00619         if (olddata != NULL && olddata->pageno == pageno)
00620         {
00621             /*
00622              * Update an existing page with fresh data.
00623              *
00624              * First, load old data into workbuf
00625              */
00626             datafield = &(olddata->data);       /* see note at top of file */
00627             pfreeit = false;
00628             if (VARATT_IS_EXTENDED(datafield))
00629             {
00630                 datafield = (bytea *)
00631                     heap_tuple_untoast_attr((struct varlena *) datafield);
00632                 pfreeit = true;
00633             }
00634             len = getbytealen(datafield);
00635             Assert(len <= LOBLKSIZE);
00636             memcpy(workb, VARDATA(datafield), len);
00637             if (pfreeit)
00638                 pfree(datafield);
00639 
00640             /*
00641              * Fill any hole
00642              */
00643             off = (int) (obj_desc->offset % LOBLKSIZE);
00644             if (off > len)
00645                 MemSet(workb + len, 0, off - len);
00646 
00647             /*
00648              * Insert appropriate portion of new data
00649              */
00650             n = LOBLKSIZE - off;
00651             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
00652             memcpy(workb + off, buf + nwritten, n);
00653             nwritten += n;
00654             obj_desc->offset += n;
00655             off += n;
00656             /* compute valid length of new page */
00657             len = (len >= off) ? len : off;
00658             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
00659 
00660             /*
00661              * Form and insert updated tuple
00662              */
00663             memset(values, 0, sizeof(values));
00664             memset(nulls, false, sizeof(nulls));
00665             memset(replace, false, sizeof(replace));
00666             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
00667             replace[Anum_pg_largeobject_data - 1] = true;
00668             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
00669                                        values, nulls, replace);
00670             simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
00671             CatalogIndexInsert(indstate, newtup);
00672             heap_freetuple(newtup);
00673 
00674             /*
00675              * We're done with this old page.
00676              */
00677             oldtuple = NULL;
00678             olddata = NULL;
00679             neednextpage = true;
00680         }
00681         else
00682         {
00683             /*
00684              * Write a brand new page.
00685              *
00686              * First, fill any hole
00687              */
00688             off = (int) (obj_desc->offset % LOBLKSIZE);
00689             if (off > 0)
00690                 MemSet(workb, 0, off);
00691 
00692             /*
00693              * Insert appropriate portion of new data
00694              */
00695             n = LOBLKSIZE - off;
00696             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
00697             memcpy(workb + off, buf + nwritten, n);
00698             nwritten += n;
00699             obj_desc->offset += n;
00700             /* compute valid length of new page */
00701             len = off + n;
00702             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
00703 
00704             /*
00705              * Form and insert updated tuple
00706              */
00707             memset(values, 0, sizeof(values));
00708             memset(nulls, false, sizeof(nulls));
00709             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
00710             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
00711             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
00712             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
00713             simple_heap_insert(lo_heap_r, newtup);
00714             CatalogIndexInsert(indstate, newtup);
00715             heap_freetuple(newtup);
00716         }
00717         pageno++;
00718     }
00719 
00720     systable_endscan_ordered(sd);
00721 
00722     CatalogCloseIndexes(indstate);
00723 
00724     /*
00725      * Advance command counter so that my tuple updates will be seen by later
00726      * large-object operations in this transaction.
00727      */
00728     CommandCounterIncrement();
00729 
00730     return nwritten;
00731 }
00732 
00733 void
00734 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
00735 {
00736     int32       pageno = (int32) (len / LOBLKSIZE);
00737     int32       off;
00738     ScanKeyData skey[2];
00739     SysScanDesc sd;
00740     HeapTuple   oldtuple;
00741     Form_pg_largeobject olddata;
00742     struct
00743     {
00744         bytea       hdr;
00745         char        data[LOBLKSIZE];    /* make struct big enough */
00746         int32       align_it;   /* ensure struct is aligned well enough */
00747     }           workbuf;
00748     char       *workb = VARDATA(&workbuf.hdr);
00749     HeapTuple   newtup;
00750     Datum       values[Natts_pg_largeobject];
00751     bool        nulls[Natts_pg_largeobject];
00752     bool        replace[Natts_pg_largeobject];
00753     CatalogIndexState indstate;
00754 
00755     Assert(PointerIsValid(obj_desc));
00756 
00757     /* enforce writability because snapshot is probably wrong otherwise */
00758     Assert(obj_desc->flags & IFS_WRLOCK);
00759 
00760     /*
00761      * use errmsg_internal here because we don't want to expose INT64_FORMAT
00762      * in translatable strings; doing better is not worth the trouble
00763      */
00764     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
00765         ereport(ERROR,
00766                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00767                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
00768                                  len)));
00769 
00770     open_lo_relation();
00771 
00772     indstate = CatalogOpenIndexes(lo_heap_r);
00773 
00774     /*
00775      * Set up to find all pages with desired loid and pageno >= target
00776      */
00777     ScanKeyInit(&skey[0],
00778                 Anum_pg_largeobject_loid,
00779                 BTEqualStrategyNumber, F_OIDEQ,
00780                 ObjectIdGetDatum(obj_desc->id));
00781 
00782     ScanKeyInit(&skey[1],
00783                 Anum_pg_largeobject_pageno,
00784                 BTGreaterEqualStrategyNumber, F_INT4GE,
00785                 Int32GetDatum(pageno));
00786 
00787     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
00788                                     obj_desc->snapshot, 2, skey);
00789 
00790     /*
00791      * If possible, get the page the truncation point is in. The truncation
00792      * point may be beyond the end of the LO or in a hole.
00793      */
00794     olddata = NULL;
00795     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
00796     {
00797         if (HeapTupleHasNulls(oldtuple))        /* paranoia */
00798             elog(ERROR, "null field found in pg_largeobject");
00799         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
00800         Assert(olddata->pageno >= pageno);
00801     }
00802 
00803     /*
00804      * If we found the page of the truncation point we need to truncate the
00805      * data in it.  Otherwise if we're in a hole, we need to create a page to
00806      * mark the end of data.
00807      */
00808     if (olddata != NULL && olddata->pageno == pageno)
00809     {
00810         /* First, load old data into workbuf */
00811         bytea      *datafield = &(olddata->data);       /* see note at top of
00812                                                          * file */
00813         bool        pfreeit = false;
00814         int         pagelen;
00815 
00816         if (VARATT_IS_EXTENDED(datafield))
00817         {
00818             datafield = (bytea *)
00819                 heap_tuple_untoast_attr((struct varlena *) datafield);
00820             pfreeit = true;
00821         }
00822         pagelen = getbytealen(datafield);
00823         Assert(pagelen <= LOBLKSIZE);
00824         memcpy(workb, VARDATA(datafield), pagelen);
00825         if (pfreeit)
00826             pfree(datafield);
00827 
00828         /*
00829          * Fill any hole
00830          */
00831         off = len % LOBLKSIZE;
00832         if (off > pagelen)
00833             MemSet(workb + pagelen, 0, off - pagelen);
00834 
00835         /* compute length of new page */
00836         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
00837 
00838         /*
00839          * Form and insert updated tuple
00840          */
00841         memset(values, 0, sizeof(values));
00842         memset(nulls, false, sizeof(nulls));
00843         memset(replace, false, sizeof(replace));
00844         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
00845         replace[Anum_pg_largeobject_data - 1] = true;
00846         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
00847                                    values, nulls, replace);
00848         simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
00849         CatalogIndexInsert(indstate, newtup);
00850         heap_freetuple(newtup);
00851     }
00852     else
00853     {
00854         /*
00855          * If the first page we found was after the truncation point, we're in
00856          * a hole that we'll fill, but we need to delete the later page
00857          * because the loop below won't visit it again.
00858          */
00859         if (olddata != NULL)
00860         {
00861             Assert(olddata->pageno > pageno);
00862             simple_heap_delete(lo_heap_r, &oldtuple->t_self);
00863         }
00864 
00865         /*
00866          * Write a brand new page.
00867          *
00868          * Fill the hole up to the truncation point
00869          */
00870         off = len % LOBLKSIZE;
00871         if (off > 0)
00872             MemSet(workb, 0, off);
00873 
00874         /* compute length of new page */
00875         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
00876 
00877         /*
00878          * Form and insert new tuple
00879          */
00880         memset(values, 0, sizeof(values));
00881         memset(nulls, false, sizeof(nulls));
00882         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
00883         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
00884         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
00885         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
00886         simple_heap_insert(lo_heap_r, newtup);
00887         CatalogIndexInsert(indstate, newtup);
00888         heap_freetuple(newtup);
00889     }
00890 
00891     /*
00892      * Delete any pages after the truncation point.  If the initial search
00893      * didn't find a page, then of course there's nothing more to do.
00894      */
00895     if (olddata != NULL)
00896     {
00897         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
00898         {
00899             simple_heap_delete(lo_heap_r, &oldtuple->t_self);
00900         }
00901     }
00902 
00903     systable_endscan_ordered(sd);
00904 
00905     CatalogCloseIndexes(indstate);
00906 
00907     /*
00908      * Advance command counter so that tuple updates will be seen by later
00909      * large-object operations in this transaction.
00910      */
00911     CommandCounterIncrement();
00912 }