Fix xslt_process() to ensure that it inserts a NULL terminator after the
[PostgreSQL.git] / src / backend / storage / large_object / inv_api.c
blob51b49ddba0350aa062e3b34599c54711bba4b909
1 /*-------------------------------------------------------------------------
3 * inv_api.c
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
22 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
26 * IDENTIFICATION
27 * $PostgreSQL$
29 *-------------------------------------------------------------------------
31 #include "postgres.h"
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/indexing.h"
39 #include "catalog/pg_largeobject.h"
40 #include "commands/comment.h"
41 #include "libpq/libpq-fs.h"
42 #include "storage/large_object.h"
43 #include "utils/fmgroids.h"
44 #include "utils/rel.h"
45 #include "utils/resowner.h"
46 #include "utils/snapmgr.h"
47 #include "utils/tqual.h"
51 * All accesses to pg_largeobject and its index make use of a single Relation
52 * reference, so that we only need to open pg_relation once per transaction.
53 * To avoid problems when the first such reference occurs inside a
54 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
55 * the Relation reference to TopTransactionResourceOwner.
57 static Relation lo_heap_r = NULL;
58 static Relation lo_index_r = NULL;
62 * Open pg_largeobject and its index, if not already done in current xact
64 static void
65 open_lo_relation(void)
67 ResourceOwner currentOwner;
69 if (lo_heap_r && lo_index_r)
70 return; /* already open in current xact */
72 /* Arrange for the top xact to own these relation references */
73 currentOwner = CurrentResourceOwner;
74 PG_TRY();
76 CurrentResourceOwner = TopTransactionResourceOwner;
78 /* Use RowExclusiveLock since we might either read or write */
79 if (lo_heap_r == NULL)
80 lo_heap_r = heap_open(LargeObjectRelationId, RowExclusiveLock);
81 if (lo_index_r == NULL)
82 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
84 PG_CATCH();
86 /* Ensure CurrentResourceOwner is restored on error */
87 CurrentResourceOwner = currentOwner;
88 PG_RE_THROW();
90 PG_END_TRY();
91 CurrentResourceOwner = currentOwner;
95 * Clean up at main transaction end
97 void
98 close_lo_relation(bool isCommit)
100 if (lo_heap_r || lo_index_r)
103 * Only bother to close if committing; else abort cleanup will handle
104 * it
106 if (isCommit)
108 ResourceOwner currentOwner;
110 currentOwner = CurrentResourceOwner;
111 PG_TRY();
113 CurrentResourceOwner = TopTransactionResourceOwner;
115 if (lo_index_r)
116 index_close(lo_index_r, NoLock);
117 if (lo_heap_r)
118 heap_close(lo_heap_r, NoLock);
120 PG_CATCH();
122 /* Ensure CurrentResourceOwner is restored on error */
123 CurrentResourceOwner = currentOwner;
124 PG_RE_THROW();
126 PG_END_TRY();
127 CurrentResourceOwner = currentOwner;
129 lo_heap_r = NULL;
130 lo_index_r = NULL;
136 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
137 * read with can be specified.
139 static bool
140 myLargeObjectExists(Oid loid, Snapshot snapshot)
142 bool retval = false;
143 Relation pg_largeobject;
144 ScanKeyData skey[1];
145 SysScanDesc sd;
148 * See if we can find any tuples belonging to the specified LO
150 ScanKeyInit(&skey[0],
151 Anum_pg_largeobject_loid,
152 BTEqualStrategyNumber, F_OIDEQ,
153 ObjectIdGetDatum(loid));
155 pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
157 sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
158 snapshot, 1, skey);
160 if (systable_getnext(sd) != NULL)
161 retval = true;
163 systable_endscan(sd);
165 heap_close(pg_largeobject, AccessShareLock);
167 return retval;
171 static int32
172 getbytealen(bytea *data)
174 Assert(!VARATT_IS_EXTENDED(data));
175 if (VARSIZE(data) < VARHDRSZ)
176 elog(ERROR, "invalid VARSIZE(data)");
177 return (VARSIZE(data) - VARHDRSZ);
182 * inv_create -- create a new large object
184 * Arguments:
185 * lobjId - OID to use for new large object, or InvalidOid to pick one
187 * Returns:
188 * OID of new object
190 * If lobjId is not InvalidOid, then an error occurs if the OID is already
191 * in use.
194 inv_create(Oid lobjId)
197 * Allocate an OID to be the LO's identifier, unless we were told what to
198 * use. We can use the index on pg_largeobject for checking OID
199 * uniqueness, even though it has additional columns besides OID.
201 if (!OidIsValid(lobjId))
203 open_lo_relation();
205 lobjId = GetNewOidWithIndex(lo_heap_r, LargeObjectLOidPNIndexId,
206 Anum_pg_largeobject_loid);
210 * Create the LO by writing an empty first page for it in pg_largeobject
211 * (will fail if duplicate)
213 LargeObjectCreate(lobjId);
216 * Advance command counter to make new tuple visible to later operations.
218 CommandCounterIncrement();
220 return lobjId;
224 * inv_open -- access an existing large object.
226 * Returns:
227 * Large object descriptor, appropriately filled in. The descriptor
228 * and subsidiary data are allocated in the specified memory context,
229 * which must be suitably long-lived for the caller's purposes.
231 LargeObjectDesc *
232 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
234 LargeObjectDesc *retval;
236 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
237 sizeof(LargeObjectDesc));
239 retval->id = lobjId;
240 retval->subid = GetCurrentSubTransactionId();
241 retval->offset = 0;
243 if (flags & INV_WRITE)
245 retval->snapshot = SnapshotNow;
246 retval->flags = IFS_WRLOCK | IFS_RDLOCK;
248 else if (flags & INV_READ)
251 * We must register the snapshot in TopTransaction's resowner, because
252 * it must stay alive until the LO is closed rather than until the
253 * current portal shuts down.
255 retval->snapshot = RegisterSnapshotOnOwner(GetActiveSnapshot(),
256 TopTransactionResourceOwner);
257 retval->flags = IFS_RDLOCK;
259 else
260 elog(ERROR, "invalid flags: %d", flags);
262 /* Can't use LargeObjectExists here because it always uses SnapshotNow */
263 if (!myLargeObjectExists(lobjId, retval->snapshot))
264 ereport(ERROR,
265 (errcode(ERRCODE_UNDEFINED_OBJECT),
266 errmsg("large object %u does not exist", lobjId)));
268 return retval;
272 * Closes a large object descriptor previously made by inv_open(), and
273 * releases the long-term memory used by it.
275 void
276 inv_close(LargeObjectDesc *obj_desc)
278 Assert(PointerIsValid(obj_desc));
280 if (obj_desc->snapshot != SnapshotNow)
281 UnregisterSnapshotFromOwner(obj_desc->snapshot,
282 TopTransactionResourceOwner);
284 pfree(obj_desc);
288 * Destroys an existing large object (not to be confused with a descriptor!)
290 * returns -1 if failed
293 inv_drop(Oid lobjId)
295 LargeObjectDrop(lobjId);
297 /* Delete any comments on the large object */
298 DeleteComments(lobjId, LargeObjectRelationId, 0);
301 * Advance command counter so that tuple removal will be seen by later
302 * large-object operations in this transaction.
304 CommandCounterIncrement();
306 return 1;
310 * Determine size of a large object
312 * NOTE: LOs can contain gaps, just like Unix files. We actually return
313 * the offset of the last byte + 1.
315 static uint32
316 inv_getsize(LargeObjectDesc *obj_desc)
318 bool found = false;
319 uint32 lastbyte = 0;
320 ScanKeyData skey[1];
321 SysScanDesc sd;
322 HeapTuple tuple;
324 Assert(PointerIsValid(obj_desc));
326 open_lo_relation();
328 ScanKeyInit(&skey[0],
329 Anum_pg_largeobject_loid,
330 BTEqualStrategyNumber, F_OIDEQ,
331 ObjectIdGetDatum(obj_desc->id));
333 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
334 obj_desc->snapshot, 1, skey);
337 * Because the pg_largeobject index is on both loid and pageno, but we
338 * constrain only loid, a backwards scan should visit all pages of the
339 * large object in reverse pageno order. So, it's sufficient to examine
340 * the first valid tuple (== last valid page).
342 while ((tuple = systable_getnext_ordered(sd, BackwardScanDirection)) != NULL)
344 Form_pg_largeobject data;
345 bytea *datafield;
346 bool pfreeit;
348 found = true;
349 if (HeapTupleHasNulls(tuple)) /* paranoia */
350 elog(ERROR, "null field found in pg_largeobject");
351 data = (Form_pg_largeobject) GETSTRUCT(tuple);
352 datafield = &(data->data); /* see note at top of file */
353 pfreeit = false;
354 if (VARATT_IS_EXTENDED(datafield))
356 datafield = (bytea *)
357 heap_tuple_untoast_attr((struct varlena *) datafield);
358 pfreeit = true;
360 lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
361 if (pfreeit)
362 pfree(datafield);
363 break;
366 systable_endscan_ordered(sd);
368 if (!found)
369 ereport(ERROR,
370 (errcode(ERRCODE_UNDEFINED_OBJECT),
371 errmsg("large object %u does not exist", obj_desc->id)));
372 return lastbyte;
376 inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
378 Assert(PointerIsValid(obj_desc));
380 switch (whence)
382 case SEEK_SET:
383 if (offset < 0)
384 elog(ERROR, "invalid seek offset: %d", offset);
385 obj_desc->offset = offset;
386 break;
387 case SEEK_CUR:
388 if (offset < 0 && obj_desc->offset < ((uint32) (-offset)))
389 elog(ERROR, "invalid seek offset: %d", offset);
390 obj_desc->offset += offset;
391 break;
392 case SEEK_END:
394 uint32 size = inv_getsize(obj_desc);
396 if (offset < 0 && size < ((uint32) (-offset)))
397 elog(ERROR, "invalid seek offset: %d", offset);
398 obj_desc->offset = size + offset;
400 break;
401 default:
402 elog(ERROR, "invalid whence: %d", whence);
404 return obj_desc->offset;
408 inv_tell(LargeObjectDesc *obj_desc)
410 Assert(PointerIsValid(obj_desc));
412 return obj_desc->offset;
416 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
418 int nread = 0;
419 int n;
420 int off;
421 int len;
422 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
423 uint32 pageoff;
424 ScanKeyData skey[2];
425 SysScanDesc sd;
426 HeapTuple tuple;
428 Assert(PointerIsValid(obj_desc));
429 Assert(buf != NULL);
431 if (nbytes <= 0)
432 return 0;
434 open_lo_relation();
436 ScanKeyInit(&skey[0],
437 Anum_pg_largeobject_loid,
438 BTEqualStrategyNumber, F_OIDEQ,
439 ObjectIdGetDatum(obj_desc->id));
441 ScanKeyInit(&skey[1],
442 Anum_pg_largeobject_pageno,
443 BTGreaterEqualStrategyNumber, F_INT4GE,
444 Int32GetDatum(pageno));
446 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
447 obj_desc->snapshot, 2, skey);
449 while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
451 Form_pg_largeobject data;
452 bytea *datafield;
453 bool pfreeit;
455 if (HeapTupleHasNulls(tuple)) /* paranoia */
456 elog(ERROR, "null field found in pg_largeobject");
457 data = (Form_pg_largeobject) GETSTRUCT(tuple);
460 * We expect the indexscan will deliver pages in order. However,
461 * there may be missing pages if the LO contains unwritten "holes". We
462 * want missing sections to read out as zeroes.
464 pageoff = ((uint32) data->pageno) * LOBLKSIZE;
465 if (pageoff > obj_desc->offset)
467 n = pageoff - obj_desc->offset;
468 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
469 MemSet(buf + nread, 0, n);
470 nread += n;
471 obj_desc->offset += n;
474 if (nread < nbytes)
476 Assert(obj_desc->offset >= pageoff);
477 off = (int) (obj_desc->offset - pageoff);
478 Assert(off >= 0 && off < LOBLKSIZE);
480 datafield = &(data->data); /* see note at top of file */
481 pfreeit = false;
482 if (VARATT_IS_EXTENDED(datafield))
484 datafield = (bytea *)
485 heap_tuple_untoast_attr((struct varlena *) datafield);
486 pfreeit = true;
488 len = getbytealen(datafield);
489 if (len > off)
491 n = len - off;
492 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
493 memcpy(buf + nread, VARDATA(datafield) + off, n);
494 nread += n;
495 obj_desc->offset += n;
497 if (pfreeit)
498 pfree(datafield);
501 if (nread >= nbytes)
502 break;
505 systable_endscan_ordered(sd);
507 return nread;
511 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
513 int nwritten = 0;
514 int n;
515 int off;
516 int len;
517 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
518 ScanKeyData skey[2];
519 SysScanDesc sd;
520 HeapTuple oldtuple;
521 Form_pg_largeobject olddata;
522 bool neednextpage;
523 bytea *datafield;
524 bool pfreeit;
525 struct
527 bytea hdr;
528 char data[LOBLKSIZE]; /* make struct big enough */
529 int32 align_it; /* ensure struct is aligned well enough */
530 } workbuf;
531 char *workb = VARDATA(&workbuf.hdr);
532 HeapTuple newtup;
533 Datum values[Natts_pg_largeobject];
534 bool nulls[Natts_pg_largeobject];
535 bool replace[Natts_pg_largeobject];
536 CatalogIndexState indstate;
538 Assert(PointerIsValid(obj_desc));
539 Assert(buf != NULL);
541 /* enforce writability because snapshot is probably wrong otherwise */
542 if ((obj_desc->flags & IFS_WRLOCK) == 0)
543 ereport(ERROR,
544 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
545 errmsg("large object %u was not opened for writing",
546 obj_desc->id)));
548 if (nbytes <= 0)
549 return 0;
551 open_lo_relation();
553 indstate = CatalogOpenIndexes(lo_heap_r);
555 ScanKeyInit(&skey[0],
556 Anum_pg_largeobject_loid,
557 BTEqualStrategyNumber, F_OIDEQ,
558 ObjectIdGetDatum(obj_desc->id));
560 ScanKeyInit(&skey[1],
561 Anum_pg_largeobject_pageno,
562 BTGreaterEqualStrategyNumber, F_INT4GE,
563 Int32GetDatum(pageno));
565 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
566 obj_desc->snapshot, 2, skey);
568 oldtuple = NULL;
569 olddata = NULL;
570 neednextpage = true;
572 while (nwritten < nbytes)
575 * If possible, get next pre-existing page of the LO. We expect the
576 * indexscan will deliver these in order --- but there may be holes.
578 if (neednextpage)
580 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
582 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
583 elog(ERROR, "null field found in pg_largeobject");
584 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
585 Assert(olddata->pageno >= pageno);
587 neednextpage = false;
591 * If we have a pre-existing page, see if it is the page we want to
592 * write, or a later one.
594 if (olddata != NULL && olddata->pageno == pageno)
597 * Update an existing page with fresh data.
599 * First, load old data into workbuf
601 datafield = &(olddata->data); /* see note at top of file */
602 pfreeit = false;
603 if (VARATT_IS_EXTENDED(datafield))
605 datafield = (bytea *)
606 heap_tuple_untoast_attr((struct varlena *) datafield);
607 pfreeit = true;
609 len = getbytealen(datafield);
610 Assert(len <= LOBLKSIZE);
611 memcpy(workb, VARDATA(datafield), len);
612 if (pfreeit)
613 pfree(datafield);
616 * Fill any hole
618 off = (int) (obj_desc->offset % LOBLKSIZE);
619 if (off > len)
620 MemSet(workb + len, 0, off - len);
623 * Insert appropriate portion of new data
625 n = LOBLKSIZE - off;
626 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
627 memcpy(workb + off, buf + nwritten, n);
628 nwritten += n;
629 obj_desc->offset += n;
630 off += n;
631 /* compute valid length of new page */
632 len = (len >= off) ? len : off;
633 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
636 * Form and insert updated tuple
638 memset(values, 0, sizeof(values));
639 memset(nulls, false, sizeof(nulls));
640 memset(replace, false, sizeof(replace));
641 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
642 replace[Anum_pg_largeobject_data - 1] = true;
643 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
644 values, nulls, replace);
645 simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
646 CatalogIndexInsert(indstate, newtup);
647 heap_freetuple(newtup);
650 * We're done with this old page.
652 oldtuple = NULL;
653 olddata = NULL;
654 neednextpage = true;
656 else
659 * Write a brand new page.
661 * First, fill any hole
663 off = (int) (obj_desc->offset % LOBLKSIZE);
664 if (off > 0)
665 MemSet(workb, 0, off);
668 * Insert appropriate portion of new data
670 n = LOBLKSIZE - off;
671 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
672 memcpy(workb + off, buf + nwritten, n);
673 nwritten += n;
674 obj_desc->offset += n;
675 /* compute valid length of new page */
676 len = off + n;
677 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
680 * Form and insert updated tuple
682 memset(values, 0, sizeof(values));
683 memset(nulls, false, sizeof(nulls));
684 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
685 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
686 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
687 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
688 simple_heap_insert(lo_heap_r, newtup);
689 CatalogIndexInsert(indstate, newtup);
690 heap_freetuple(newtup);
692 pageno++;
695 systable_endscan_ordered(sd);
697 CatalogCloseIndexes(indstate);
700 * Advance command counter so that my tuple updates will be seen by later
701 * large-object operations in this transaction.
703 CommandCounterIncrement();
705 return nwritten;
708 void
709 inv_truncate(LargeObjectDesc *obj_desc, int len)
711 int32 pageno = (int32) (len / LOBLKSIZE);
712 int off;
713 ScanKeyData skey[2];
714 SysScanDesc sd;
715 HeapTuple oldtuple;
716 Form_pg_largeobject olddata;
717 struct
719 bytea hdr;
720 char data[LOBLKSIZE]; /* make struct big enough */
721 int32 align_it; /* ensure struct is aligned well enough */
722 } workbuf;
723 char *workb = VARDATA(&workbuf.hdr);
724 HeapTuple newtup;
725 Datum values[Natts_pg_largeobject];
726 bool nulls[Natts_pg_largeobject];
727 bool replace[Natts_pg_largeobject];
728 CatalogIndexState indstate;
730 Assert(PointerIsValid(obj_desc));
732 /* enforce writability because snapshot is probably wrong otherwise */
733 if ((obj_desc->flags & IFS_WRLOCK) == 0)
734 ereport(ERROR,
735 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
736 errmsg("large object %u was not opened for writing",
737 obj_desc->id)));
739 open_lo_relation();
741 indstate = CatalogOpenIndexes(lo_heap_r);
743 ScanKeyInit(&skey[0],
744 Anum_pg_largeobject_loid,
745 BTEqualStrategyNumber, F_OIDEQ,
746 ObjectIdGetDatum(obj_desc->id));
748 ScanKeyInit(&skey[1],
749 Anum_pg_largeobject_pageno,
750 BTGreaterEqualStrategyNumber, F_INT4GE,
751 Int32GetDatum(pageno));
753 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
754 obj_desc->snapshot, 2, skey);
757 * If possible, get the page the truncation point is in. The truncation
758 * point may be beyond the end of the LO or in a hole.
760 olddata = NULL;
761 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
763 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
764 elog(ERROR, "null field found in pg_largeobject");
765 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
766 Assert(olddata->pageno >= pageno);
770 * If we found the page of the truncation point we need to truncate the
771 * data in it. Otherwise if we're in a hole, we need to create a page to
772 * mark the end of data.
774 if (olddata != NULL && olddata->pageno == pageno)
776 /* First, load old data into workbuf */
777 bytea *datafield = &(olddata->data); /* see note at top of
778 * file */
779 bool pfreeit = false;
780 int pagelen;
782 if (VARATT_IS_EXTENDED(datafield))
784 datafield = (bytea *)
785 heap_tuple_untoast_attr((struct varlena *) datafield);
786 pfreeit = true;
788 pagelen = getbytealen(datafield);
789 Assert(pagelen <= LOBLKSIZE);
790 memcpy(workb, VARDATA(datafield), pagelen);
791 if (pfreeit)
792 pfree(datafield);
795 * Fill any hole
797 off = len % LOBLKSIZE;
798 if (off > pagelen)
799 MemSet(workb + pagelen, 0, off - pagelen);
801 /* compute length of new page */
802 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
805 * Form and insert updated tuple
807 memset(values, 0, sizeof(values));
808 memset(nulls, false, sizeof(nulls));
809 memset(replace, false, sizeof(replace));
810 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
811 replace[Anum_pg_largeobject_data - 1] = true;
812 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
813 values, nulls, replace);
814 simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
815 CatalogIndexInsert(indstate, newtup);
816 heap_freetuple(newtup);
818 else
821 * If the first page we found was after the truncation point, we're in
822 * a hole that we'll fill, but we need to delete the later page.
824 if (olddata != NULL && olddata->pageno > pageno)
825 simple_heap_delete(lo_heap_r, &oldtuple->t_self);
828 * Write a brand new page.
830 * Fill the hole up to the truncation point
832 off = len % LOBLKSIZE;
833 if (off > 0)
834 MemSet(workb, 0, off);
836 /* compute length of new page */
837 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
840 * Form and insert new tuple
842 memset(values, 0, sizeof(values));
843 memset(nulls, false, sizeof(nulls));
844 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
845 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
846 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
847 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
848 simple_heap_insert(lo_heap_r, newtup);
849 CatalogIndexInsert(indstate, newtup);
850 heap_freetuple(newtup);
854 * Delete any pages after the truncation point
856 while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
858 simple_heap_delete(lo_heap_r, &oldtuple->t_self);
861 systable_endscan_ordered(sd);
863 CatalogCloseIndexes(indstate);
866 * Advance command counter so that tuple updates will be seen by later
867 * large-object operations in this transaction.
869 CommandCounterIncrement();