1 /*-------------------------------------------------------------------------
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
22 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
29 *-------------------------------------------------------------------------
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/indexing.h"
39 #include "catalog/pg_largeobject.h"
40 #include "commands/comment.h"
41 #include "libpq/libpq-fs.h"
42 #include "storage/large_object.h"
43 #include "utils/fmgroids.h"
44 #include "utils/rel.h"
45 #include "utils/resowner.h"
46 #include "utils/snapmgr.h"
47 #include "utils/tqual.h"
51 * All accesses to pg_largeobject and its index make use of a single Relation
52 * reference, so that we only need to open pg_relation once per transaction.
53 * To avoid problems when the first such reference occurs inside a
54 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
55 * the Relation reference to TopTransactionResourceOwner.
57 static Relation lo_heap_r
= NULL
;
58 static Relation lo_index_r
= NULL
;
62 * Open pg_largeobject and its index, if not already done in current xact
65 open_lo_relation(void)
67 ResourceOwner currentOwner
;
69 if (lo_heap_r
&& lo_index_r
)
70 return; /* already open in current xact */
72 /* Arrange for the top xact to own these relation references */
73 currentOwner
= CurrentResourceOwner
;
76 CurrentResourceOwner
= TopTransactionResourceOwner
;
78 /* Use RowExclusiveLock since we might either read or write */
79 if (lo_heap_r
== NULL
)
80 lo_heap_r
= heap_open(LargeObjectRelationId
, RowExclusiveLock
);
81 if (lo_index_r
== NULL
)
82 lo_index_r
= index_open(LargeObjectLOidPNIndexId
, RowExclusiveLock
);
86 /* Ensure CurrentResourceOwner is restored on error */
87 CurrentResourceOwner
= currentOwner
;
91 CurrentResourceOwner
= currentOwner
;
95 * Clean up at main transaction end
98 close_lo_relation(bool isCommit
)
100 if (lo_heap_r
|| lo_index_r
)
103 * Only bother to close if committing; else abort cleanup will handle
108 ResourceOwner currentOwner
;
110 currentOwner
= CurrentResourceOwner
;
113 CurrentResourceOwner
= TopTransactionResourceOwner
;
116 index_close(lo_index_r
, NoLock
);
118 heap_close(lo_heap_r
, NoLock
);
122 /* Ensure CurrentResourceOwner is restored on error */
123 CurrentResourceOwner
= currentOwner
;
127 CurrentResourceOwner
= currentOwner
;
136 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
137 * read with can be specified.
140 myLargeObjectExists(Oid loid
, Snapshot snapshot
)
143 Relation pg_largeobject
;
148 * See if we can find any tuples belonging to the specified LO
150 ScanKeyInit(&skey
[0],
151 Anum_pg_largeobject_loid
,
152 BTEqualStrategyNumber
, F_OIDEQ
,
153 ObjectIdGetDatum(loid
));
155 pg_largeobject
= heap_open(LargeObjectRelationId
, AccessShareLock
);
157 sd
= systable_beginscan(pg_largeobject
, LargeObjectLOidPNIndexId
, true,
160 if (systable_getnext(sd
) != NULL
)
163 systable_endscan(sd
);
165 heap_close(pg_largeobject
, AccessShareLock
);
172 getbytealen(bytea
*data
)
174 Assert(!VARATT_IS_EXTENDED(data
));
175 if (VARSIZE(data
) < VARHDRSZ
)
176 elog(ERROR
, "invalid VARSIZE(data)");
177 return (VARSIZE(data
) - VARHDRSZ
);
182 * inv_create -- create a new large object
185 * lobjId - OID to use for new large object, or InvalidOid to pick one
190 * If lobjId is not InvalidOid, then an error occurs if the OID is already
194 inv_create(Oid lobjId
)
197 * Allocate an OID to be the LO's identifier, unless we were told what to
198 * use. We can use the index on pg_largeobject for checking OID
199 * uniqueness, even though it has additional columns besides OID.
201 if (!OidIsValid(lobjId
))
205 lobjId
= GetNewOidWithIndex(lo_heap_r
, LargeObjectLOidPNIndexId
,
206 Anum_pg_largeobject_loid
);
210 * Create the LO by writing an empty first page for it in pg_largeobject
211 * (will fail if duplicate)
213 LargeObjectCreate(lobjId
);
216 * Advance command counter to make new tuple visible to later operations.
218 CommandCounterIncrement();
224 * inv_open -- access an existing large object.
227 * Large object descriptor, appropriately filled in. The descriptor
228 * and subsidiary data are allocated in the specified memory context,
229 * which must be suitably long-lived for the caller's purposes.
232 inv_open(Oid lobjId
, int flags
, MemoryContext mcxt
)
234 LargeObjectDesc
*retval
;
236 retval
= (LargeObjectDesc
*) MemoryContextAlloc(mcxt
,
237 sizeof(LargeObjectDesc
));
240 retval
->subid
= GetCurrentSubTransactionId();
243 if (flags
& INV_WRITE
)
245 retval
->snapshot
= SnapshotNow
;
246 retval
->flags
= IFS_WRLOCK
| IFS_RDLOCK
;
248 else if (flags
& INV_READ
)
251 * We must register the snapshot in TopTransaction's resowner, because
252 * it must stay alive until the LO is closed rather than until the
253 * current portal shuts down.
255 retval
->snapshot
= RegisterSnapshotOnOwner(GetActiveSnapshot(),
256 TopTransactionResourceOwner
);
257 retval
->flags
= IFS_RDLOCK
;
260 elog(ERROR
, "invalid flags: %d", flags
);
262 /* Can't use LargeObjectExists here because it always uses SnapshotNow */
263 if (!myLargeObjectExists(lobjId
, retval
->snapshot
))
265 (errcode(ERRCODE_UNDEFINED_OBJECT
),
266 errmsg("large object %u does not exist", lobjId
)));
272 * Closes a large object descriptor previously made by inv_open(), and
273 * releases the long-term memory used by it.
276 inv_close(LargeObjectDesc
*obj_desc
)
278 Assert(PointerIsValid(obj_desc
));
280 if (obj_desc
->snapshot
!= SnapshotNow
)
281 UnregisterSnapshotFromOwner(obj_desc
->snapshot
,
282 TopTransactionResourceOwner
);
288 * Destroys an existing large object (not to be confused with a descriptor!)
290 * returns -1 if failed
295 LargeObjectDrop(lobjId
);
297 /* Delete any comments on the large object */
298 DeleteComments(lobjId
, LargeObjectRelationId
, 0);
301 * Advance command counter so that tuple removal will be seen by later
302 * large-object operations in this transaction.
304 CommandCounterIncrement();
310 * Determine size of a large object
312 * NOTE: LOs can contain gaps, just like Unix files. We actually return
313 * the offset of the last byte + 1.
316 inv_getsize(LargeObjectDesc
*obj_desc
)
324 Assert(PointerIsValid(obj_desc
));
328 ScanKeyInit(&skey
[0],
329 Anum_pg_largeobject_loid
,
330 BTEqualStrategyNumber
, F_OIDEQ
,
331 ObjectIdGetDatum(obj_desc
->id
));
333 sd
= systable_beginscan_ordered(lo_heap_r
, lo_index_r
,
334 obj_desc
->snapshot
, 1, skey
);
337 * Because the pg_largeobject index is on both loid and pageno, but we
338 * constrain only loid, a backwards scan should visit all pages of the
339 * large object in reverse pageno order. So, it's sufficient to examine
340 * the first valid tuple (== last valid page).
342 while ((tuple
= systable_getnext_ordered(sd
, BackwardScanDirection
)) != NULL
)
344 Form_pg_largeobject data
;
349 if (HeapTupleHasNulls(tuple
)) /* paranoia */
350 elog(ERROR
, "null field found in pg_largeobject");
351 data
= (Form_pg_largeobject
) GETSTRUCT(tuple
);
352 datafield
= &(data
->data
); /* see note at top of file */
354 if (VARATT_IS_EXTENDED(datafield
))
356 datafield
= (bytea
*)
357 heap_tuple_untoast_attr((struct varlena
*) datafield
);
360 lastbyte
= data
->pageno
* LOBLKSIZE
+ getbytealen(datafield
);
366 systable_endscan_ordered(sd
);
370 (errcode(ERRCODE_UNDEFINED_OBJECT
),
371 errmsg("large object %u does not exist", obj_desc
->id
)));
376 inv_seek(LargeObjectDesc
*obj_desc
, int offset
, int whence
)
378 Assert(PointerIsValid(obj_desc
));
384 elog(ERROR
, "invalid seek offset: %d", offset
);
385 obj_desc
->offset
= offset
;
388 if (offset
< 0 && obj_desc
->offset
< ((uint32
) (-offset
)))
389 elog(ERROR
, "invalid seek offset: %d", offset
);
390 obj_desc
->offset
+= offset
;
394 uint32 size
= inv_getsize(obj_desc
);
396 if (offset
< 0 && size
< ((uint32
) (-offset
)))
397 elog(ERROR
, "invalid seek offset: %d", offset
);
398 obj_desc
->offset
= size
+ offset
;
402 elog(ERROR
, "invalid whence: %d", whence
);
404 return obj_desc
->offset
;
408 inv_tell(LargeObjectDesc
*obj_desc
)
410 Assert(PointerIsValid(obj_desc
));
412 return obj_desc
->offset
;
416 inv_read(LargeObjectDesc
*obj_desc
, char *buf
, int nbytes
)
422 int32 pageno
= (int32
) (obj_desc
->offset
/ LOBLKSIZE
);
428 Assert(PointerIsValid(obj_desc
));
436 ScanKeyInit(&skey
[0],
437 Anum_pg_largeobject_loid
,
438 BTEqualStrategyNumber
, F_OIDEQ
,
439 ObjectIdGetDatum(obj_desc
->id
));
441 ScanKeyInit(&skey
[1],
442 Anum_pg_largeobject_pageno
,
443 BTGreaterEqualStrategyNumber
, F_INT4GE
,
444 Int32GetDatum(pageno
));
446 sd
= systable_beginscan_ordered(lo_heap_r
, lo_index_r
,
447 obj_desc
->snapshot
, 2, skey
);
449 while ((tuple
= systable_getnext_ordered(sd
, ForwardScanDirection
)) != NULL
)
451 Form_pg_largeobject data
;
455 if (HeapTupleHasNulls(tuple
)) /* paranoia */
456 elog(ERROR
, "null field found in pg_largeobject");
457 data
= (Form_pg_largeobject
) GETSTRUCT(tuple
);
460 * We expect the indexscan will deliver pages in order. However,
461 * there may be missing pages if the LO contains unwritten "holes". We
462 * want missing sections to read out as zeroes.
464 pageoff
= ((uint32
) data
->pageno
) * LOBLKSIZE
;
465 if (pageoff
> obj_desc
->offset
)
467 n
= pageoff
- obj_desc
->offset
;
468 n
= (n
<= (nbytes
- nread
)) ? n
: (nbytes
- nread
);
469 MemSet(buf
+ nread
, 0, n
);
471 obj_desc
->offset
+= n
;
476 Assert(obj_desc
->offset
>= pageoff
);
477 off
= (int) (obj_desc
->offset
- pageoff
);
478 Assert(off
>= 0 && off
< LOBLKSIZE
);
480 datafield
= &(data
->data
); /* see note at top of file */
482 if (VARATT_IS_EXTENDED(datafield
))
484 datafield
= (bytea
*)
485 heap_tuple_untoast_attr((struct varlena
*) datafield
);
488 len
= getbytealen(datafield
);
492 n
= (n
<= (nbytes
- nread
)) ? n
: (nbytes
- nread
);
493 memcpy(buf
+ nread
, VARDATA(datafield
) + off
, n
);
495 obj_desc
->offset
+= n
;
505 systable_endscan_ordered(sd
);
511 inv_write(LargeObjectDesc
*obj_desc
, const char *buf
, int nbytes
)
517 int32 pageno
= (int32
) (obj_desc
->offset
/ LOBLKSIZE
);
521 Form_pg_largeobject olddata
;
528 char data
[LOBLKSIZE
]; /* make struct big enough */
529 int32 align_it
; /* ensure struct is aligned well enough */
531 char *workb
= VARDATA(&workbuf
.hdr
);
533 Datum values
[Natts_pg_largeobject
];
534 bool nulls
[Natts_pg_largeobject
];
535 bool replace
[Natts_pg_largeobject
];
536 CatalogIndexState indstate
;
538 Assert(PointerIsValid(obj_desc
));
541 /* enforce writability because snapshot is probably wrong otherwise */
542 if ((obj_desc
->flags
& IFS_WRLOCK
) == 0)
544 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
545 errmsg("large object %u was not opened for writing",
553 indstate
= CatalogOpenIndexes(lo_heap_r
);
555 ScanKeyInit(&skey
[0],
556 Anum_pg_largeobject_loid
,
557 BTEqualStrategyNumber
, F_OIDEQ
,
558 ObjectIdGetDatum(obj_desc
->id
));
560 ScanKeyInit(&skey
[1],
561 Anum_pg_largeobject_pageno
,
562 BTGreaterEqualStrategyNumber
, F_INT4GE
,
563 Int32GetDatum(pageno
));
565 sd
= systable_beginscan_ordered(lo_heap_r
, lo_index_r
,
566 obj_desc
->snapshot
, 2, skey
);
572 while (nwritten
< nbytes
)
575 * If possible, get next pre-existing page of the LO. We expect the
576 * indexscan will deliver these in order --- but there may be holes.
580 if ((oldtuple
= systable_getnext_ordered(sd
, ForwardScanDirection
)) != NULL
)
582 if (HeapTupleHasNulls(oldtuple
)) /* paranoia */
583 elog(ERROR
, "null field found in pg_largeobject");
584 olddata
= (Form_pg_largeobject
) GETSTRUCT(oldtuple
);
585 Assert(olddata
->pageno
>= pageno
);
587 neednextpage
= false;
591 * If we have a pre-existing page, see if it is the page we want to
592 * write, or a later one.
594 if (olddata
!= NULL
&& olddata
->pageno
== pageno
)
597 * Update an existing page with fresh data.
599 * First, load old data into workbuf
601 datafield
= &(olddata
->data
); /* see note at top of file */
603 if (VARATT_IS_EXTENDED(datafield
))
605 datafield
= (bytea
*)
606 heap_tuple_untoast_attr((struct varlena
*) datafield
);
609 len
= getbytealen(datafield
);
610 Assert(len
<= LOBLKSIZE
);
611 memcpy(workb
, VARDATA(datafield
), len
);
618 off
= (int) (obj_desc
->offset
% LOBLKSIZE
);
620 MemSet(workb
+ len
, 0, off
- len
);
623 * Insert appropriate portion of new data
626 n
= (n
<= (nbytes
- nwritten
)) ? n
: (nbytes
- nwritten
);
627 memcpy(workb
+ off
, buf
+ nwritten
, n
);
629 obj_desc
->offset
+= n
;
631 /* compute valid length of new page */
632 len
= (len
>= off
) ? len
: off
;
633 SET_VARSIZE(&workbuf
.hdr
, len
+ VARHDRSZ
);
636 * Form and insert updated tuple
638 memset(values
, 0, sizeof(values
));
639 memset(nulls
, false, sizeof(nulls
));
640 memset(replace
, false, sizeof(replace
));
641 values
[Anum_pg_largeobject_data
- 1] = PointerGetDatum(&workbuf
);
642 replace
[Anum_pg_largeobject_data
- 1] = true;
643 newtup
= heap_modify_tuple(oldtuple
, RelationGetDescr(lo_heap_r
),
644 values
, nulls
, replace
);
645 simple_heap_update(lo_heap_r
, &newtup
->t_self
, newtup
);
646 CatalogIndexInsert(indstate
, newtup
);
647 heap_freetuple(newtup
);
650 * We're done with this old page.
659 * Write a brand new page.
661 * First, fill any hole
663 off
= (int) (obj_desc
->offset
% LOBLKSIZE
);
665 MemSet(workb
, 0, off
);
668 * Insert appropriate portion of new data
671 n
= (n
<= (nbytes
- nwritten
)) ? n
: (nbytes
- nwritten
);
672 memcpy(workb
+ off
, buf
+ nwritten
, n
);
674 obj_desc
->offset
+= n
;
675 /* compute valid length of new page */
677 SET_VARSIZE(&workbuf
.hdr
, len
+ VARHDRSZ
);
680 * Form and insert updated tuple
682 memset(values
, 0, sizeof(values
));
683 memset(nulls
, false, sizeof(nulls
));
684 values
[Anum_pg_largeobject_loid
- 1] = ObjectIdGetDatum(obj_desc
->id
);
685 values
[Anum_pg_largeobject_pageno
- 1] = Int32GetDatum(pageno
);
686 values
[Anum_pg_largeobject_data
- 1] = PointerGetDatum(&workbuf
);
687 newtup
= heap_form_tuple(lo_heap_r
->rd_att
, values
, nulls
);
688 simple_heap_insert(lo_heap_r
, newtup
);
689 CatalogIndexInsert(indstate
, newtup
);
690 heap_freetuple(newtup
);
695 systable_endscan_ordered(sd
);
697 CatalogCloseIndexes(indstate
);
700 * Advance command counter so that my tuple updates will be seen by later
701 * large-object operations in this transaction.
703 CommandCounterIncrement();
709 inv_truncate(LargeObjectDesc
*obj_desc
, int len
)
711 int32 pageno
= (int32
) (len
/ LOBLKSIZE
);
716 Form_pg_largeobject olddata
;
720 char data
[LOBLKSIZE
]; /* make struct big enough */
721 int32 align_it
; /* ensure struct is aligned well enough */
723 char *workb
= VARDATA(&workbuf
.hdr
);
725 Datum values
[Natts_pg_largeobject
];
726 bool nulls
[Natts_pg_largeobject
];
727 bool replace
[Natts_pg_largeobject
];
728 CatalogIndexState indstate
;
730 Assert(PointerIsValid(obj_desc
));
732 /* enforce writability because snapshot is probably wrong otherwise */
733 if ((obj_desc
->flags
& IFS_WRLOCK
) == 0)
735 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
736 errmsg("large object %u was not opened for writing",
741 indstate
= CatalogOpenIndexes(lo_heap_r
);
743 ScanKeyInit(&skey
[0],
744 Anum_pg_largeobject_loid
,
745 BTEqualStrategyNumber
, F_OIDEQ
,
746 ObjectIdGetDatum(obj_desc
->id
));
748 ScanKeyInit(&skey
[1],
749 Anum_pg_largeobject_pageno
,
750 BTGreaterEqualStrategyNumber
, F_INT4GE
,
751 Int32GetDatum(pageno
));
753 sd
= systable_beginscan_ordered(lo_heap_r
, lo_index_r
,
754 obj_desc
->snapshot
, 2, skey
);
757 * If possible, get the page the truncation point is in. The truncation
758 * point may be beyond the end of the LO or in a hole.
761 if ((oldtuple
= systable_getnext_ordered(sd
, ForwardScanDirection
)) != NULL
)
763 if (HeapTupleHasNulls(oldtuple
)) /* paranoia */
764 elog(ERROR
, "null field found in pg_largeobject");
765 olddata
= (Form_pg_largeobject
) GETSTRUCT(oldtuple
);
766 Assert(olddata
->pageno
>= pageno
);
770 * If we found the page of the truncation point we need to truncate the
771 * data in it. Otherwise if we're in a hole, we need to create a page to
772 * mark the end of data.
774 if (olddata
!= NULL
&& olddata
->pageno
== pageno
)
776 /* First, load old data into workbuf */
777 bytea
*datafield
= &(olddata
->data
); /* see note at top of
779 bool pfreeit
= false;
782 if (VARATT_IS_EXTENDED(datafield
))
784 datafield
= (bytea
*)
785 heap_tuple_untoast_attr((struct varlena
*) datafield
);
788 pagelen
= getbytealen(datafield
);
789 Assert(pagelen
<= LOBLKSIZE
);
790 memcpy(workb
, VARDATA(datafield
), pagelen
);
797 off
= len
% LOBLKSIZE
;
799 MemSet(workb
+ pagelen
, 0, off
- pagelen
);
801 /* compute length of new page */
802 SET_VARSIZE(&workbuf
.hdr
, off
+ VARHDRSZ
);
805 * Form and insert updated tuple
807 memset(values
, 0, sizeof(values
));
808 memset(nulls
, false, sizeof(nulls
));
809 memset(replace
, false, sizeof(replace
));
810 values
[Anum_pg_largeobject_data
- 1] = PointerGetDatum(&workbuf
);
811 replace
[Anum_pg_largeobject_data
- 1] = true;
812 newtup
= heap_modify_tuple(oldtuple
, RelationGetDescr(lo_heap_r
),
813 values
, nulls
, replace
);
814 simple_heap_update(lo_heap_r
, &newtup
->t_self
, newtup
);
815 CatalogIndexInsert(indstate
, newtup
);
816 heap_freetuple(newtup
);
821 * If the first page we found was after the truncation point, we're in
822 * a hole that we'll fill, but we need to delete the later page.
824 if (olddata
!= NULL
&& olddata
->pageno
> pageno
)
825 simple_heap_delete(lo_heap_r
, &oldtuple
->t_self
);
828 * Write a brand new page.
830 * Fill the hole up to the truncation point
832 off
= len
% LOBLKSIZE
;
834 MemSet(workb
, 0, off
);
836 /* compute length of new page */
837 SET_VARSIZE(&workbuf
.hdr
, off
+ VARHDRSZ
);
840 * Form and insert new tuple
842 memset(values
, 0, sizeof(values
));
843 memset(nulls
, false, sizeof(nulls
));
844 values
[Anum_pg_largeobject_loid
- 1] = ObjectIdGetDatum(obj_desc
->id
);
845 values
[Anum_pg_largeobject_pageno
- 1] = Int32GetDatum(pageno
);
846 values
[Anum_pg_largeobject_data
- 1] = PointerGetDatum(&workbuf
);
847 newtup
= heap_form_tuple(lo_heap_r
->rd_att
, values
, nulls
);
848 simple_heap_insert(lo_heap_r
, newtup
);
849 CatalogIndexInsert(indstate
, newtup
);
850 heap_freetuple(newtup
);
854 * Delete any pages after the truncation point
856 while ((oldtuple
= systable_getnext_ordered(sd
, ForwardScanDirection
)) != NULL
)
858 simple_heap_delete(lo_heap_r
, &oldtuple
->t_self
);
861 systable_endscan_ordered(sd
);
863 CatalogCloseIndexes(indstate
);
866 * Advance command counter so that tuple updates will be seen by later
867 * large-object operations in this transaction.
869 CommandCounterIncrement();