1 /*-------------------------------------------------------------------------
4 * logical replication protocol functions
6 * Copyright (c) 2015-2024, PostgreSQL Global Development Group
9 * src/backend/replication/logical/proto.c
11 *-------------------------------------------------------------------------
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
19 #include "replication/logicalproto.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
24 * Protocol message flags.
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
28 #define MESSAGE_TRANSACTIONAL (1<<0)
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
32 static void logicalrep_write_attrs(StringInfo out
, Relation rel
,
33 Bitmapset
*columns
, bool include_gencols
);
34 static void logicalrep_write_tuple(StringInfo out
, Relation rel
,
36 bool binary
, Bitmapset
*columns
,
37 bool include_gencols
);
38 static void logicalrep_read_attrs(StringInfo in
, LogicalRepRelation
*rel
);
39 static void logicalrep_read_tuple(StringInfo in
, LogicalRepTupleData
*tuple
);
41 static void logicalrep_write_namespace(StringInfo out
, Oid nspid
);
42 static const char *logicalrep_read_namespace(StringInfo in
);
45 * Write BEGIN to the output stream.
48 logicalrep_write_begin(StringInfo out
, ReorderBufferTXN
*txn
)
50 pq_sendbyte(out
, LOGICAL_REP_MSG_BEGIN
);
53 pq_sendint64(out
, txn
->final_lsn
);
54 pq_sendint64(out
, txn
->xact_time
.commit_time
);
55 pq_sendint32(out
, txn
->xid
);
59 * Read transaction BEGIN from the stream.
62 logicalrep_read_begin(StringInfo in
, LogicalRepBeginData
*begin_data
)
65 begin_data
->final_lsn
= pq_getmsgint64(in
);
66 if (begin_data
->final_lsn
== InvalidXLogRecPtr
)
67 elog(ERROR
, "final_lsn not set in begin message");
68 begin_data
->committime
= pq_getmsgint64(in
);
69 begin_data
->xid
= pq_getmsgint(in
, 4);
74 * Write COMMIT to the output stream.
77 logicalrep_write_commit(StringInfo out
, ReorderBufferTXN
*txn
,
78 XLogRecPtr commit_lsn
)
82 pq_sendbyte(out
, LOGICAL_REP_MSG_COMMIT
);
84 /* send the flags field (unused for now) */
85 pq_sendbyte(out
, flags
);
88 pq_sendint64(out
, commit_lsn
);
89 pq_sendint64(out
, txn
->end_lsn
);
90 pq_sendint64(out
, txn
->xact_time
.commit_time
);
94 * Read transaction COMMIT from the stream.
97 logicalrep_read_commit(StringInfo in
, LogicalRepCommitData
*commit_data
)
99 /* read flags (unused for now) */
100 uint8 flags
= pq_getmsgbyte(in
);
103 elog(ERROR
, "unrecognized flags %u in commit message", flags
);
106 commit_data
->commit_lsn
= pq_getmsgint64(in
);
107 commit_data
->end_lsn
= pq_getmsgint64(in
);
108 commit_data
->committime
= pq_getmsgint64(in
);
112 * Write BEGIN PREPARE to the output stream.
115 logicalrep_write_begin_prepare(StringInfo out
, ReorderBufferTXN
*txn
)
117 pq_sendbyte(out
, LOGICAL_REP_MSG_BEGIN_PREPARE
);
120 pq_sendint64(out
, txn
->final_lsn
);
121 pq_sendint64(out
, txn
->end_lsn
);
122 pq_sendint64(out
, txn
->xact_time
.prepare_time
);
123 pq_sendint32(out
, txn
->xid
);
126 pq_sendstring(out
, txn
->gid
);
130 * Read transaction BEGIN PREPARE from the stream.
133 logicalrep_read_begin_prepare(StringInfo in
, LogicalRepPreparedTxnData
*begin_data
)
136 begin_data
->prepare_lsn
= pq_getmsgint64(in
);
137 if (begin_data
->prepare_lsn
== InvalidXLogRecPtr
)
138 elog(ERROR
, "prepare_lsn not set in begin prepare message");
139 begin_data
->end_lsn
= pq_getmsgint64(in
);
140 if (begin_data
->end_lsn
== InvalidXLogRecPtr
)
141 elog(ERROR
, "end_lsn not set in begin prepare message");
142 begin_data
->prepare_time
= pq_getmsgint64(in
);
143 begin_data
->xid
= pq_getmsgint(in
, 4);
145 /* read gid (copy it into a pre-allocated buffer) */
146 strlcpy(begin_data
->gid
, pq_getmsgstring(in
), sizeof(begin_data
->gid
));
150 * The core functionality for logicalrep_write_prepare and
151 * logicalrep_write_stream_prepare.
154 logicalrep_write_prepare_common(StringInfo out
, LogicalRepMsgType type
,
155 ReorderBufferTXN
*txn
, XLogRecPtr prepare_lsn
)
159 pq_sendbyte(out
, type
);
162 * This should only ever happen for two-phase commit transactions, in
163 * which case we expect to have a valid GID.
165 Assert(txn
->gid
!= NULL
);
166 Assert(rbtxn_prepared(txn
));
167 Assert(TransactionIdIsValid(txn
->xid
));
169 /* send the flags field */
170 pq_sendbyte(out
, flags
);
173 pq_sendint64(out
, prepare_lsn
);
174 pq_sendint64(out
, txn
->end_lsn
);
175 pq_sendint64(out
, txn
->xact_time
.prepare_time
);
176 pq_sendint32(out
, txn
->xid
);
179 pq_sendstring(out
, txn
->gid
);
183 * Write PREPARE to the output stream.
186 logicalrep_write_prepare(StringInfo out
, ReorderBufferTXN
*txn
,
187 XLogRecPtr prepare_lsn
)
189 logicalrep_write_prepare_common(out
, LOGICAL_REP_MSG_PREPARE
,
194 * The core functionality for logicalrep_read_prepare and
195 * logicalrep_read_stream_prepare.
198 logicalrep_read_prepare_common(StringInfo in
, char *msgtype
,
199 LogicalRepPreparedTxnData
*prepare_data
)
202 uint8 flags
= pq_getmsgbyte(in
);
205 elog(ERROR
, "unrecognized flags %u in %s message", flags
, msgtype
);
208 prepare_data
->prepare_lsn
= pq_getmsgint64(in
);
209 if (prepare_data
->prepare_lsn
== InvalidXLogRecPtr
)
210 elog(ERROR
, "prepare_lsn is not set in %s message", msgtype
);
211 prepare_data
->end_lsn
= pq_getmsgint64(in
);
212 if (prepare_data
->end_lsn
== InvalidXLogRecPtr
)
213 elog(ERROR
, "end_lsn is not set in %s message", msgtype
);
214 prepare_data
->prepare_time
= pq_getmsgint64(in
);
215 prepare_data
->xid
= pq_getmsgint(in
, 4);
216 if (prepare_data
->xid
== InvalidTransactionId
)
217 elog(ERROR
, "invalid two-phase transaction ID in %s message", msgtype
);
219 /* read gid (copy it into a pre-allocated buffer) */
220 strlcpy(prepare_data
->gid
, pq_getmsgstring(in
), sizeof(prepare_data
->gid
));
224 * Read transaction PREPARE from the stream.
227 logicalrep_read_prepare(StringInfo in
, LogicalRepPreparedTxnData
*prepare_data
)
229 logicalrep_read_prepare_common(in
, "prepare", prepare_data
);
233 * Write COMMIT PREPARED to the output stream.
236 logicalrep_write_commit_prepared(StringInfo out
, ReorderBufferTXN
*txn
,
237 XLogRecPtr commit_lsn
)
241 pq_sendbyte(out
, LOGICAL_REP_MSG_COMMIT_PREPARED
);
244 * This should only ever happen for two-phase commit transactions, in
245 * which case we expect to have a valid GID.
247 Assert(txn
->gid
!= NULL
);
249 /* send the flags field */
250 pq_sendbyte(out
, flags
);
253 pq_sendint64(out
, commit_lsn
);
254 pq_sendint64(out
, txn
->end_lsn
);
255 pq_sendint64(out
, txn
->xact_time
.commit_time
);
256 pq_sendint32(out
, txn
->xid
);
259 pq_sendstring(out
, txn
->gid
);
263 * Read transaction COMMIT PREPARED from the stream.
266 logicalrep_read_commit_prepared(StringInfo in
, LogicalRepCommitPreparedTxnData
*prepare_data
)
269 uint8 flags
= pq_getmsgbyte(in
);
272 elog(ERROR
, "unrecognized flags %u in commit prepared message", flags
);
275 prepare_data
->commit_lsn
= pq_getmsgint64(in
);
276 if (prepare_data
->commit_lsn
== InvalidXLogRecPtr
)
277 elog(ERROR
, "commit_lsn is not set in commit prepared message");
278 prepare_data
->end_lsn
= pq_getmsgint64(in
);
279 if (prepare_data
->end_lsn
== InvalidXLogRecPtr
)
280 elog(ERROR
, "end_lsn is not set in commit prepared message");
281 prepare_data
->commit_time
= pq_getmsgint64(in
);
282 prepare_data
->xid
= pq_getmsgint(in
, 4);
284 /* read gid (copy it into a pre-allocated buffer) */
285 strlcpy(prepare_data
->gid
, pq_getmsgstring(in
), sizeof(prepare_data
->gid
));
289 * Write ROLLBACK PREPARED to the output stream.
292 logicalrep_write_rollback_prepared(StringInfo out
, ReorderBufferTXN
*txn
,
293 XLogRecPtr prepare_end_lsn
,
294 TimestampTz prepare_time
)
298 pq_sendbyte(out
, LOGICAL_REP_MSG_ROLLBACK_PREPARED
);
301 * This should only ever happen for two-phase commit transactions, in
302 * which case we expect to have a valid GID.
304 Assert(txn
->gid
!= NULL
);
306 /* send the flags field */
307 pq_sendbyte(out
, flags
);
310 pq_sendint64(out
, prepare_end_lsn
);
311 pq_sendint64(out
, txn
->end_lsn
);
312 pq_sendint64(out
, prepare_time
);
313 pq_sendint64(out
, txn
->xact_time
.commit_time
);
314 pq_sendint32(out
, txn
->xid
);
317 pq_sendstring(out
, txn
->gid
);
321 * Read transaction ROLLBACK PREPARED from the stream.
324 logicalrep_read_rollback_prepared(StringInfo in
,
325 LogicalRepRollbackPreparedTxnData
*rollback_data
)
328 uint8 flags
= pq_getmsgbyte(in
);
331 elog(ERROR
, "unrecognized flags %u in rollback prepared message", flags
);
334 rollback_data
->prepare_end_lsn
= pq_getmsgint64(in
);
335 if (rollback_data
->prepare_end_lsn
== InvalidXLogRecPtr
)
336 elog(ERROR
, "prepare_end_lsn is not set in rollback prepared message");
337 rollback_data
->rollback_end_lsn
= pq_getmsgint64(in
);
338 if (rollback_data
->rollback_end_lsn
== InvalidXLogRecPtr
)
339 elog(ERROR
, "rollback_end_lsn is not set in rollback prepared message");
340 rollback_data
->prepare_time
= pq_getmsgint64(in
);
341 rollback_data
->rollback_time
= pq_getmsgint64(in
);
342 rollback_data
->xid
= pq_getmsgint(in
, 4);
344 /* read gid (copy it into a pre-allocated buffer) */
345 strlcpy(rollback_data
->gid
, pq_getmsgstring(in
), sizeof(rollback_data
->gid
));
349 * Write STREAM PREPARE to the output stream.
352 logicalrep_write_stream_prepare(StringInfo out
,
353 ReorderBufferTXN
*txn
,
354 XLogRecPtr prepare_lsn
)
356 logicalrep_write_prepare_common(out
, LOGICAL_REP_MSG_STREAM_PREPARE
,
361 * Read STREAM PREPARE from the stream.
364 logicalrep_read_stream_prepare(StringInfo in
, LogicalRepPreparedTxnData
*prepare_data
)
366 logicalrep_read_prepare_common(in
, "stream prepare", prepare_data
);
370 * Write ORIGIN to the output stream.
373 logicalrep_write_origin(StringInfo out
, const char *origin
,
374 XLogRecPtr origin_lsn
)
376 pq_sendbyte(out
, LOGICAL_REP_MSG_ORIGIN
);
379 pq_sendint64(out
, origin_lsn
);
382 pq_sendstring(out
, origin
);
386 * Read ORIGIN from the output stream.
389 logicalrep_read_origin(StringInfo in
, XLogRecPtr
*origin_lsn
)
392 *origin_lsn
= pq_getmsgint64(in
);
395 return pstrdup(pq_getmsgstring(in
));
399 * Write INSERT to the output stream.
402 logicalrep_write_insert(StringInfo out
, TransactionId xid
, Relation rel
,
403 TupleTableSlot
*newslot
, bool binary
,
404 Bitmapset
*columns
, bool include_gencols
)
406 pq_sendbyte(out
, LOGICAL_REP_MSG_INSERT
);
408 /* transaction ID (if not valid, we're not streaming) */
409 if (TransactionIdIsValid(xid
))
410 pq_sendint32(out
, xid
);
412 /* use Oid as relation identifier */
413 pq_sendint32(out
, RelationGetRelid(rel
));
415 pq_sendbyte(out
, 'N'); /* new tuple follows */
416 logicalrep_write_tuple(out
, rel
, newslot
, binary
, columns
, include_gencols
);
420 * Read INSERT from stream.
422 * Fills the new tuple.
425 logicalrep_read_insert(StringInfo in
, LogicalRepTupleData
*newtup
)
428 LogicalRepRelId relid
;
430 /* read the relation id */
431 relid
= pq_getmsgint(in
, 4);
433 action
= pq_getmsgbyte(in
);
435 elog(ERROR
, "expected new tuple but got %d",
438 logicalrep_read_tuple(in
, newtup
);
444 * Write UPDATE to the output stream.
447 logicalrep_write_update(StringInfo out
, TransactionId xid
, Relation rel
,
448 TupleTableSlot
*oldslot
, TupleTableSlot
*newslot
,
449 bool binary
, Bitmapset
*columns
, bool include_gencols
)
451 pq_sendbyte(out
, LOGICAL_REP_MSG_UPDATE
);
453 Assert(rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_DEFAULT
||
454 rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
||
455 rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_INDEX
);
457 /* transaction ID (if not valid, we're not streaming) */
458 if (TransactionIdIsValid(xid
))
459 pq_sendint32(out
, xid
);
461 /* use Oid as relation identifier */
462 pq_sendint32(out
, RelationGetRelid(rel
));
466 if (rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
)
467 pq_sendbyte(out
, 'O'); /* old tuple follows */
469 pq_sendbyte(out
, 'K'); /* old key follows */
470 logicalrep_write_tuple(out
, rel
, oldslot
, binary
, columns
,
474 pq_sendbyte(out
, 'N'); /* new tuple follows */
475 logicalrep_write_tuple(out
, rel
, newslot
, binary
, columns
, include_gencols
);
479 * Read UPDATE from stream.
482 logicalrep_read_update(StringInfo in
, bool *has_oldtuple
,
483 LogicalRepTupleData
*oldtup
,
484 LogicalRepTupleData
*newtup
)
487 LogicalRepRelId relid
;
489 /* read the relation id */
490 relid
= pq_getmsgint(in
, 4);
492 /* read and verify action */
493 action
= pq_getmsgbyte(in
);
494 if (action
!= 'K' && action
!= 'O' && action
!= 'N')
495 elog(ERROR
, "expected action 'N', 'O' or 'K', got %c",
498 /* check for old tuple */
499 if (action
== 'K' || action
== 'O')
501 logicalrep_read_tuple(in
, oldtup
);
502 *has_oldtuple
= true;
504 action
= pq_getmsgbyte(in
);
507 *has_oldtuple
= false;
509 /* check for new tuple */
511 elog(ERROR
, "expected action 'N', got %c",
514 logicalrep_read_tuple(in
, newtup
);
520 * Write DELETE to the output stream.
523 logicalrep_write_delete(StringInfo out
, TransactionId xid
, Relation rel
,
524 TupleTableSlot
*oldslot
, bool binary
,
525 Bitmapset
*columns
, bool include_gencols
)
527 Assert(rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_DEFAULT
||
528 rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
||
529 rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_INDEX
);
531 pq_sendbyte(out
, LOGICAL_REP_MSG_DELETE
);
533 /* transaction ID (if not valid, we're not streaming) */
534 if (TransactionIdIsValid(xid
))
535 pq_sendint32(out
, xid
);
537 /* use Oid as relation identifier */
538 pq_sendint32(out
, RelationGetRelid(rel
));
540 if (rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
)
541 pq_sendbyte(out
, 'O'); /* old tuple follows */
543 pq_sendbyte(out
, 'K'); /* old key follows */
545 logicalrep_write_tuple(out
, rel
, oldslot
, binary
, columns
, include_gencols
);
549 * Read DELETE from stream.
551 * Fills the old tuple.
554 logicalrep_read_delete(StringInfo in
, LogicalRepTupleData
*oldtup
)
557 LogicalRepRelId relid
;
559 /* read the relation id */
560 relid
= pq_getmsgint(in
, 4);
562 /* read and verify action */
563 action
= pq_getmsgbyte(in
);
564 if (action
!= 'K' && action
!= 'O')
565 elog(ERROR
, "expected action 'O' or 'K', got %c", action
);
567 logicalrep_read_tuple(in
, oldtup
);
573 * Write TRUNCATE to the output stream.
576 logicalrep_write_truncate(StringInfo out
,
580 bool cascade
, bool restart_seqs
)
585 pq_sendbyte(out
, LOGICAL_REP_MSG_TRUNCATE
);
587 /* transaction ID (if not valid, we're not streaming) */
588 if (TransactionIdIsValid(xid
))
589 pq_sendint32(out
, xid
);
591 pq_sendint32(out
, nrelids
);
593 /* encode and send truncate flags */
595 flags
|= TRUNCATE_CASCADE
;
597 flags
|= TRUNCATE_RESTART_SEQS
;
598 pq_sendint8(out
, flags
);
600 for (i
= 0; i
< nrelids
; i
++)
601 pq_sendint32(out
, relids
[i
]);
605 * Read TRUNCATE from stream.
608 logicalrep_read_truncate(StringInfo in
,
609 bool *cascade
, bool *restart_seqs
)
616 nrelids
= pq_getmsgint(in
, 4);
618 /* read and decode truncate flags */
619 flags
= pq_getmsgint(in
, 1);
620 *cascade
= (flags
& TRUNCATE_CASCADE
) > 0;
621 *restart_seqs
= (flags
& TRUNCATE_RESTART_SEQS
) > 0;
623 for (i
= 0; i
< nrelids
; i
++)
624 relids
= lappend_oid(relids
, pq_getmsgint(in
, 4));
630 * Write MESSAGE to stream
633 logicalrep_write_message(StringInfo out
, TransactionId xid
, XLogRecPtr lsn
,
634 bool transactional
, const char *prefix
, Size sz
,
639 pq_sendbyte(out
, LOGICAL_REP_MSG_MESSAGE
);
641 /* encode and send message flags */
643 flags
|= MESSAGE_TRANSACTIONAL
;
645 /* transaction ID (if not valid, we're not streaming) */
646 if (TransactionIdIsValid(xid
))
647 pq_sendint32(out
, xid
);
649 pq_sendint8(out
, flags
);
650 pq_sendint64(out
, lsn
);
651 pq_sendstring(out
, prefix
);
652 pq_sendint32(out
, sz
);
653 pq_sendbytes(out
, message
, sz
);
657 * Write relation description to the output stream.
660 logicalrep_write_rel(StringInfo out
, TransactionId xid
, Relation rel
,
661 Bitmapset
*columns
, bool include_gencols
)
665 pq_sendbyte(out
, LOGICAL_REP_MSG_RELATION
);
667 /* transaction ID (if not valid, we're not streaming) */
668 if (TransactionIdIsValid(xid
))
669 pq_sendint32(out
, xid
);
671 /* use Oid as relation identifier */
672 pq_sendint32(out
, RelationGetRelid(rel
));
674 /* send qualified relation name */
675 logicalrep_write_namespace(out
, RelationGetNamespace(rel
));
676 relname
= RelationGetRelationName(rel
);
677 pq_sendstring(out
, relname
);
679 /* send replica identity */
680 pq_sendbyte(out
, rel
->rd_rel
->relreplident
);
682 /* send the attribute info */
683 logicalrep_write_attrs(out
, rel
, columns
, include_gencols
);
687 * Read the relation info from stream and return as LogicalRepRelation.
690 logicalrep_read_rel(StringInfo in
)
692 LogicalRepRelation
*rel
= palloc(sizeof(LogicalRepRelation
));
694 rel
->remoteid
= pq_getmsgint(in
, 4);
696 /* Read relation name from stream */
697 rel
->nspname
= pstrdup(logicalrep_read_namespace(in
));
698 rel
->relname
= pstrdup(pq_getmsgstring(in
));
700 /* Read the replica identity. */
701 rel
->replident
= pq_getmsgbyte(in
);
703 /* Get attribute description */
704 logicalrep_read_attrs(in
, rel
);
710 * Write type info to the output stream.
712 * This function will always write base type info.
715 logicalrep_write_typ(StringInfo out
, TransactionId xid
, Oid typoid
)
717 Oid basetypoid
= getBaseType(typoid
);
721 pq_sendbyte(out
, LOGICAL_REP_MSG_TYPE
);
723 /* transaction ID (if not valid, we're not streaming) */
724 if (TransactionIdIsValid(xid
))
725 pq_sendint32(out
, xid
);
727 tup
= SearchSysCache1(TYPEOID
, ObjectIdGetDatum(basetypoid
));
728 if (!HeapTupleIsValid(tup
))
729 elog(ERROR
, "cache lookup failed for type %u", basetypoid
);
730 typtup
= (Form_pg_type
) GETSTRUCT(tup
);
732 /* use Oid as type identifier */
733 pq_sendint32(out
, typoid
);
735 /* send qualified type name */
736 logicalrep_write_namespace(out
, typtup
->typnamespace
);
737 pq_sendstring(out
, NameStr(typtup
->typname
));
739 ReleaseSysCache(tup
);
743 * Read type info from the output stream.
746 logicalrep_read_typ(StringInfo in
, LogicalRepTyp
*ltyp
)
748 ltyp
->remoteid
= pq_getmsgint(in
, 4);
750 /* Read type name from stream */
751 ltyp
->nspname
= pstrdup(logicalrep_read_namespace(in
));
752 ltyp
->typname
= pstrdup(pq_getmsgstring(in
));
756 * Write a tuple to the outputstream, in the most efficient format possible.
759 logicalrep_write_tuple(StringInfo out
, Relation rel
, TupleTableSlot
*slot
,
760 bool binary
, Bitmapset
*columns
, bool include_gencols
)
766 uint16 nliveatts
= 0;
768 desc
= RelationGetDescr(rel
);
770 for (i
= 0; i
< desc
->natts
; i
++)
772 Form_pg_attribute att
= TupleDescAttr(desc
, i
);
774 if (!logicalrep_should_publish_column(att
, columns
, include_gencols
))
779 pq_sendint16(out
, nliveatts
);
781 slot_getallattrs(slot
);
782 values
= slot
->tts_values
;
783 isnull
= slot
->tts_isnull
;
785 /* Write the values */
786 for (i
= 0; i
< desc
->natts
; i
++)
789 Form_pg_type typclass
;
790 Form_pg_attribute att
= TupleDescAttr(desc
, i
);
792 if (!logicalrep_should_publish_column(att
, columns
, include_gencols
))
797 pq_sendbyte(out
, LOGICALREP_COLUMN_NULL
);
801 if (att
->attlen
== -1 && VARATT_IS_EXTERNAL_ONDISK(values
[i
]))
804 * Unchanged toasted datum. (Note that we don't promise to detect
805 * unchanged data in general; this is just a cheap check to avoid
806 * sending large values unnecessarily.)
808 pq_sendbyte(out
, LOGICALREP_COLUMN_UNCHANGED
);
812 typtup
= SearchSysCache1(TYPEOID
, ObjectIdGetDatum(att
->atttypid
));
813 if (!HeapTupleIsValid(typtup
))
814 elog(ERROR
, "cache lookup failed for type %u", att
->atttypid
);
815 typclass
= (Form_pg_type
) GETSTRUCT(typtup
);
818 * Send in binary if requested and type has suitable send function.
820 if (binary
&& OidIsValid(typclass
->typsend
))
825 pq_sendbyte(out
, LOGICALREP_COLUMN_BINARY
);
826 outputbytes
= OidSendFunctionCall(typclass
->typsend
, values
[i
]);
827 len
= VARSIZE(outputbytes
) - VARHDRSZ
;
828 pq_sendint(out
, len
, 4); /* length */
829 pq_sendbytes(out
, VARDATA(outputbytes
), len
); /* data */
836 pq_sendbyte(out
, LOGICALREP_COLUMN_TEXT
);
837 outputstr
= OidOutputFunctionCall(typclass
->typoutput
, values
[i
]);
838 pq_sendcountedtext(out
, outputstr
, strlen(outputstr
));
842 ReleaseSysCache(typtup
);
847 * Read tuple in logical replication format from stream.
850 logicalrep_read_tuple(StringInfo in
, LogicalRepTupleData
*tuple
)
855 /* Get number of attributes */
856 natts
= pq_getmsgint(in
, 2);
858 /* Allocate space for per-column values; zero out unused StringInfoDatas */
859 tuple
->colvalues
= (StringInfoData
*) palloc0(natts
* sizeof(StringInfoData
));
860 tuple
->colstatus
= (char *) palloc(natts
* sizeof(char));
861 tuple
->ncols
= natts
;
864 for (i
= 0; i
< natts
; i
++)
869 StringInfo value
= &tuple
->colvalues
[i
];
871 kind
= pq_getmsgbyte(in
);
872 tuple
->colstatus
[i
] = kind
;
876 case LOGICALREP_COLUMN_NULL
:
877 /* nothing more to do */
879 case LOGICALREP_COLUMN_UNCHANGED
:
880 /* we don't receive the value of an unchanged column */
882 case LOGICALREP_COLUMN_TEXT
:
883 case LOGICALREP_COLUMN_BINARY
:
884 len
= pq_getmsgint(in
, 4); /* read length */
887 buff
= palloc(len
+ 1);
888 pq_copymsgbytes(in
, buff
, len
);
891 * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
892 * as input functions require that. For
893 * LOGICALREP_COLUMN_BINARY it's not technically required, but
898 initStringInfoFromString(value
, buff
, len
);
901 elog(ERROR
, "unrecognized data representation type '%c'", kind
);
907 * Write relation attribute metadata to the stream.
910 logicalrep_write_attrs(StringInfo out
, Relation rel
, Bitmapset
*columns
,
911 bool include_gencols
)
915 uint16 nliveatts
= 0;
916 Bitmapset
*idattrs
= NULL
;
919 desc
= RelationGetDescr(rel
);
921 /* send number of live attributes */
922 for (i
= 0; i
< desc
->natts
; i
++)
924 Form_pg_attribute att
= TupleDescAttr(desc
, i
);
926 if (!logicalrep_should_publish_column(att
, columns
, include_gencols
))
931 pq_sendint16(out
, nliveatts
);
933 /* fetch bitmap of REPLICATION IDENTITY attributes */
934 replidentfull
= (rel
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
);
936 idattrs
= RelationGetIdentityKeyBitmap(rel
);
938 /* send the attributes */
939 for (i
= 0; i
< desc
->natts
; i
++)
941 Form_pg_attribute att
= TupleDescAttr(desc
, i
);
944 if (!logicalrep_should_publish_column(att
, columns
, include_gencols
))
947 /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
949 bms_is_member(att
->attnum
- FirstLowInvalidHeapAttributeNumber
,
951 flags
|= LOGICALREP_IS_REPLICA_IDENTITY
;
953 pq_sendbyte(out
, flags
);
956 pq_sendstring(out
, NameStr(att
->attname
));
958 /* attribute type id */
959 pq_sendint32(out
, (int) att
->atttypid
);
962 pq_sendint32(out
, att
->atttypmod
);
969 * Read relation attribute metadata from the stream.
972 logicalrep_read_attrs(StringInfo in
, LogicalRepRelation
*rel
)
978 Bitmapset
*attkeys
= NULL
;
980 natts
= pq_getmsgint(in
, 2);
981 attnames
= palloc(natts
* sizeof(char *));
982 atttyps
= palloc(natts
* sizeof(Oid
));
984 /* read the attributes */
985 for (i
= 0; i
< natts
; i
++)
989 /* Check for replica identity column */
990 flags
= pq_getmsgbyte(in
);
991 if (flags
& LOGICALREP_IS_REPLICA_IDENTITY
)
992 attkeys
= bms_add_member(attkeys
, i
);
995 attnames
[i
] = pstrdup(pq_getmsgstring(in
));
997 /* attribute type id */
998 atttyps
[i
] = (Oid
) pq_getmsgint(in
, 4);
1000 /* we ignore attribute mode for now */
1001 (void) pq_getmsgint(in
, 4);
1004 rel
->attnames
= attnames
;
1005 rel
->atttyps
= atttyps
;
1006 rel
->attkeys
= attkeys
;
1011 * Write the namespace name or empty string for pg_catalog (to save space).
1014 logicalrep_write_namespace(StringInfo out
, Oid nspid
)
1016 if (nspid
== PG_CATALOG_NAMESPACE
)
1017 pq_sendbyte(out
, '\0');
1020 char *nspname
= get_namespace_name(nspid
);
1022 if (nspname
== NULL
)
1023 elog(ERROR
, "cache lookup failed for namespace %u",
1026 pq_sendstring(out
, nspname
);
1031 * Read the namespace name while treating empty string as pg_catalog.
1034 logicalrep_read_namespace(StringInfo in
)
1036 const char *nspname
= pq_getmsgstring(in
);
1038 if (nspname
[0] == '\0')
1039 nspname
= "pg_catalog";
1045 * Write the information for the start stream message to the output stream.
1048 logicalrep_write_stream_start(StringInfo out
,
1049 TransactionId xid
, bool first_segment
)
1051 pq_sendbyte(out
, LOGICAL_REP_MSG_STREAM_START
);
1053 Assert(TransactionIdIsValid(xid
));
1055 /* transaction ID (we're starting to stream, so must be valid) */
1056 pq_sendint32(out
, xid
);
1058 /* 1 if this is the first streaming segment for this xid */
1059 pq_sendbyte(out
, first_segment
? 1 : 0);
1063 * Read the information about the start stream message from output stream.
1066 logicalrep_read_stream_start(StringInfo in
, bool *first_segment
)
1070 Assert(first_segment
);
1072 xid
= pq_getmsgint(in
, 4);
1073 *first_segment
= (pq_getmsgbyte(in
) == 1);
1079 * Write the stop stream message to the output stream.
1082 logicalrep_write_stream_stop(StringInfo out
)
1084 pq_sendbyte(out
, LOGICAL_REP_MSG_STREAM_STOP
);
1088 * Write STREAM COMMIT to the output stream.
1091 logicalrep_write_stream_commit(StringInfo out
, ReorderBufferTXN
*txn
,
1092 XLogRecPtr commit_lsn
)
1096 pq_sendbyte(out
, LOGICAL_REP_MSG_STREAM_COMMIT
);
1098 Assert(TransactionIdIsValid(txn
->xid
));
1100 /* transaction ID */
1101 pq_sendint32(out
, txn
->xid
);
1103 /* send the flags field (unused for now) */
1104 pq_sendbyte(out
, flags
);
1107 pq_sendint64(out
, commit_lsn
);
1108 pq_sendint64(out
, txn
->end_lsn
);
1109 pq_sendint64(out
, txn
->xact_time
.commit_time
);
1113 * Read STREAM COMMIT from the output stream.
1116 logicalrep_read_stream_commit(StringInfo in
, LogicalRepCommitData
*commit_data
)
1121 xid
= pq_getmsgint(in
, 4);
1123 /* read flags (unused for now) */
1124 flags
= pq_getmsgbyte(in
);
1127 elog(ERROR
, "unrecognized flags %u in commit message", flags
);
1130 commit_data
->commit_lsn
= pq_getmsgint64(in
);
1131 commit_data
->end_lsn
= pq_getmsgint64(in
);
1132 commit_data
->committime
= pq_getmsgint64(in
);
1138 * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1139 * same for the top-level transaction abort.
1141 * If write_abort_info is true, send the abort_lsn and abort_time fields,
1145 logicalrep_write_stream_abort(StringInfo out
, TransactionId xid
,
1146 TransactionId subxid
, XLogRecPtr abort_lsn
,
1147 TimestampTz abort_time
, bool write_abort_info
)
1149 pq_sendbyte(out
, LOGICAL_REP_MSG_STREAM_ABORT
);
1151 Assert(TransactionIdIsValid(xid
) && TransactionIdIsValid(subxid
));
1153 /* transaction ID */
1154 pq_sendint32(out
, xid
);
1155 pq_sendint32(out
, subxid
);
1157 if (write_abort_info
)
1159 pq_sendint64(out
, abort_lsn
);
1160 pq_sendint64(out
, abort_time
);
1165 * Read STREAM ABORT from the output stream.
1167 * If read_abort_info is true, read the abort_lsn and abort_time fields,
1171 logicalrep_read_stream_abort(StringInfo in
,
1172 LogicalRepStreamAbortData
*abort_data
,
1173 bool read_abort_info
)
1177 abort_data
->xid
= pq_getmsgint(in
, 4);
1178 abort_data
->subxid
= pq_getmsgint(in
, 4);
1180 if (read_abort_info
)
1182 abort_data
->abort_lsn
= pq_getmsgint64(in
);
1183 abort_data
->abort_time
= pq_getmsgint64(in
);
1187 abort_data
->abort_lsn
= InvalidXLogRecPtr
;
1188 abort_data
->abort_time
= 0;
1193 * Get string representing LogicalRepMsgType.
1196 logicalrep_message_type(LogicalRepMsgType action
)
1198 static char err_unknown
[20];
1202 case LOGICAL_REP_MSG_BEGIN
:
1204 case LOGICAL_REP_MSG_COMMIT
:
1206 case LOGICAL_REP_MSG_ORIGIN
:
1208 case LOGICAL_REP_MSG_INSERT
:
1210 case LOGICAL_REP_MSG_UPDATE
:
1212 case LOGICAL_REP_MSG_DELETE
:
1214 case LOGICAL_REP_MSG_TRUNCATE
:
1216 case LOGICAL_REP_MSG_RELATION
:
1218 case LOGICAL_REP_MSG_TYPE
:
1220 case LOGICAL_REP_MSG_MESSAGE
:
1222 case LOGICAL_REP_MSG_BEGIN_PREPARE
:
1223 return "BEGIN PREPARE";
1224 case LOGICAL_REP_MSG_PREPARE
:
1226 case LOGICAL_REP_MSG_COMMIT_PREPARED
:
1227 return "COMMIT PREPARED";
1228 case LOGICAL_REP_MSG_ROLLBACK_PREPARED
:
1229 return "ROLLBACK PREPARED";
1230 case LOGICAL_REP_MSG_STREAM_START
:
1231 return "STREAM START";
1232 case LOGICAL_REP_MSG_STREAM_STOP
:
1233 return "STREAM STOP";
1234 case LOGICAL_REP_MSG_STREAM_COMMIT
:
1235 return "STREAM COMMIT";
1236 case LOGICAL_REP_MSG_STREAM_ABORT
:
1237 return "STREAM ABORT";
1238 case LOGICAL_REP_MSG_STREAM_PREPARE
:
1239 return "STREAM PREPARE";
1243 * This message provides context in the error raised when applying a
1244 * logical message. So we can't throw an error here. Return an unknown
1245 * indicator value so that the original error is still reported.
1247 snprintf(err_unknown
, sizeof(err_unknown
), "??? (%d)", action
);
1253 * Check if the column 'att' of a table should be published.
1255 * 'columns' represents the publication column list (if any) for that table.
1257 * 'include_gencols' flag indicates whether generated columns should be
1258 * published when there is no column list. Typically, this will have the same
1259 * value as the 'publish_generated_columns' publication parameter.
1261 * Note that generated columns can be published only when present in a
1262 * publication column list, or when include_gencols is true.
1265 logicalrep_should_publish_column(Form_pg_attribute att
, Bitmapset
*columns
,
1266 bool include_gencols
)
1268 if (att
->attisdropped
)
1271 /* If a column list is provided, publish only the cols in that list. */
1273 return bms_is_member(att
->attnum
, columns
);
1275 /* All non-generated columns are always published. */
1276 return att
->attgenerated
? include_gencols
: true;