Now that we have non-Latin1 SGML detection, restore Latin1 chars
[pgsql.git] / src / backend / replication / logical / proto.c
blob2c2085b2f98854064031440320f0694dc5b48da7
1 /*-------------------------------------------------------------------------
3 * proto.c
4 * logical replication protocol functions
6 * Copyright (c) 2015-2024, PostgreSQL Global Development Group
8 * IDENTIFICATION
9 * src/backend/replication/logical/proto.c
11 *-------------------------------------------------------------------------
13 #include "postgres.h"
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
19 #include "replication/logicalproto.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
24 * Protocol message flags.
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
28 #define MESSAGE_TRANSACTIONAL (1<<0)
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
32 static void logicalrep_write_attrs(StringInfo out, Relation rel,
33 Bitmapset *columns, bool include_gencols);
34 static void logicalrep_write_tuple(StringInfo out, Relation rel,
35 TupleTableSlot *slot,
36 bool binary, Bitmapset *columns,
37 bool include_gencols);
38 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
39 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
41 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
42 static const char *logicalrep_read_namespace(StringInfo in);
45 * Write BEGIN to the output stream.
47 void
48 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
50 pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
52 /* fixed fields */
53 pq_sendint64(out, txn->final_lsn);
54 pq_sendint64(out, txn->xact_time.commit_time);
55 pq_sendint32(out, txn->xid);
59 * Read transaction BEGIN from the stream.
61 void
62 logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
64 /* read fields */
65 begin_data->final_lsn = pq_getmsgint64(in);
66 if (begin_data->final_lsn == InvalidXLogRecPtr)
67 elog(ERROR, "final_lsn not set in begin message");
68 begin_data->committime = pq_getmsgint64(in);
69 begin_data->xid = pq_getmsgint(in, 4);
74 * Write COMMIT to the output stream.
76 void
77 logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
78 XLogRecPtr commit_lsn)
80 uint8 flags = 0;
82 pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
84 /* send the flags field (unused for now) */
85 pq_sendbyte(out, flags);
87 /* send fields */
88 pq_sendint64(out, commit_lsn);
89 pq_sendint64(out, txn->end_lsn);
90 pq_sendint64(out, txn->xact_time.commit_time);
94 * Read transaction COMMIT from the stream.
96 void
97 logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
99 /* read flags (unused for now) */
100 uint8 flags = pq_getmsgbyte(in);
102 if (flags != 0)
103 elog(ERROR, "unrecognized flags %u in commit message", flags);
105 /* read fields */
106 commit_data->commit_lsn = pq_getmsgint64(in);
107 commit_data->end_lsn = pq_getmsgint64(in);
108 commit_data->committime = pq_getmsgint64(in);
112 * Write BEGIN PREPARE to the output stream.
114 void
115 logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
117 pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
119 /* fixed fields */
120 pq_sendint64(out, txn->final_lsn);
121 pq_sendint64(out, txn->end_lsn);
122 pq_sendint64(out, txn->xact_time.prepare_time);
123 pq_sendint32(out, txn->xid);
125 /* send gid */
126 pq_sendstring(out, txn->gid);
130 * Read transaction BEGIN PREPARE from the stream.
132 void
133 logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
135 /* read fields */
136 begin_data->prepare_lsn = pq_getmsgint64(in);
137 if (begin_data->prepare_lsn == InvalidXLogRecPtr)
138 elog(ERROR, "prepare_lsn not set in begin prepare message");
139 begin_data->end_lsn = pq_getmsgint64(in);
140 if (begin_data->end_lsn == InvalidXLogRecPtr)
141 elog(ERROR, "end_lsn not set in begin prepare message");
142 begin_data->prepare_time = pq_getmsgint64(in);
143 begin_data->xid = pq_getmsgint(in, 4);
145 /* read gid (copy it into a pre-allocated buffer) */
146 strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
150 * The core functionality for logicalrep_write_prepare and
151 * logicalrep_write_stream_prepare.
153 static void
154 logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
155 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
157 uint8 flags = 0;
159 pq_sendbyte(out, type);
162 * This should only ever happen for two-phase commit transactions, in
163 * which case we expect to have a valid GID.
165 Assert(txn->gid != NULL);
166 Assert(rbtxn_prepared(txn));
167 Assert(TransactionIdIsValid(txn->xid));
169 /* send the flags field */
170 pq_sendbyte(out, flags);
172 /* send fields */
173 pq_sendint64(out, prepare_lsn);
174 pq_sendint64(out, txn->end_lsn);
175 pq_sendint64(out, txn->xact_time.prepare_time);
176 pq_sendint32(out, txn->xid);
178 /* send gid */
179 pq_sendstring(out, txn->gid);
183 * Write PREPARE to the output stream.
185 void
186 logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
187 XLogRecPtr prepare_lsn)
189 logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
190 txn, prepare_lsn);
194 * The core functionality for logicalrep_read_prepare and
195 * logicalrep_read_stream_prepare.
197 static void
198 logicalrep_read_prepare_common(StringInfo in, char *msgtype,
199 LogicalRepPreparedTxnData *prepare_data)
201 /* read flags */
202 uint8 flags = pq_getmsgbyte(in);
204 if (flags != 0)
205 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207 /* read fields */
208 prepare_data->prepare_lsn = pq_getmsgint64(in);
209 if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
210 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
211 prepare_data->end_lsn = pq_getmsgint64(in);
212 if (prepare_data->end_lsn == InvalidXLogRecPtr)
213 elog(ERROR, "end_lsn is not set in %s message", msgtype);
214 prepare_data->prepare_time = pq_getmsgint64(in);
215 prepare_data->xid = pq_getmsgint(in, 4);
216 if (prepare_data->xid == InvalidTransactionId)
217 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219 /* read gid (copy it into a pre-allocated buffer) */
220 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
224 * Read transaction PREPARE from the stream.
226 void
227 logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
229 logicalrep_read_prepare_common(in, "prepare", prepare_data);
233 * Write COMMIT PREPARED to the output stream.
235 void
236 logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
237 XLogRecPtr commit_lsn)
239 uint8 flags = 0;
241 pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
244 * This should only ever happen for two-phase commit transactions, in
245 * which case we expect to have a valid GID.
247 Assert(txn->gid != NULL);
249 /* send the flags field */
250 pq_sendbyte(out, flags);
252 /* send fields */
253 pq_sendint64(out, commit_lsn);
254 pq_sendint64(out, txn->end_lsn);
255 pq_sendint64(out, txn->xact_time.commit_time);
256 pq_sendint32(out, txn->xid);
258 /* send gid */
259 pq_sendstring(out, txn->gid);
263 * Read transaction COMMIT PREPARED from the stream.
265 void
266 logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
268 /* read flags */
269 uint8 flags = pq_getmsgbyte(in);
271 if (flags != 0)
272 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274 /* read fields */
275 prepare_data->commit_lsn = pq_getmsgint64(in);
276 if (prepare_data->commit_lsn == InvalidXLogRecPtr)
277 elog(ERROR, "commit_lsn is not set in commit prepared message");
278 prepare_data->end_lsn = pq_getmsgint64(in);
279 if (prepare_data->end_lsn == InvalidXLogRecPtr)
280 elog(ERROR, "end_lsn is not set in commit prepared message");
281 prepare_data->commit_time = pq_getmsgint64(in);
282 prepare_data->xid = pq_getmsgint(in, 4);
284 /* read gid (copy it into a pre-allocated buffer) */
285 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
289 * Write ROLLBACK PREPARED to the output stream.
291 void
292 logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
293 XLogRecPtr prepare_end_lsn,
294 TimestampTz prepare_time)
296 uint8 flags = 0;
298 pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
301 * This should only ever happen for two-phase commit transactions, in
302 * which case we expect to have a valid GID.
304 Assert(txn->gid != NULL);
306 /* send the flags field */
307 pq_sendbyte(out, flags);
309 /* send fields */
310 pq_sendint64(out, prepare_end_lsn);
311 pq_sendint64(out, txn->end_lsn);
312 pq_sendint64(out, prepare_time);
313 pq_sendint64(out, txn->xact_time.commit_time);
314 pq_sendint32(out, txn->xid);
316 /* send gid */
317 pq_sendstring(out, txn->gid);
321 * Read transaction ROLLBACK PREPARED from the stream.
323 void
324 logicalrep_read_rollback_prepared(StringInfo in,
325 LogicalRepRollbackPreparedTxnData *rollback_data)
327 /* read flags */
328 uint8 flags = pq_getmsgbyte(in);
330 if (flags != 0)
331 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333 /* read fields */
334 rollback_data->prepare_end_lsn = pq_getmsgint64(in);
335 if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
336 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
337 rollback_data->rollback_end_lsn = pq_getmsgint64(in);
338 if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
339 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
340 rollback_data->prepare_time = pq_getmsgint64(in);
341 rollback_data->rollback_time = pq_getmsgint64(in);
342 rollback_data->xid = pq_getmsgint(in, 4);
344 /* read gid (copy it into a pre-allocated buffer) */
345 strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
349 * Write STREAM PREPARE to the output stream.
351 void
352 logicalrep_write_stream_prepare(StringInfo out,
353 ReorderBufferTXN *txn,
354 XLogRecPtr prepare_lsn)
356 logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
357 txn, prepare_lsn);
361 * Read STREAM PREPARE from the stream.
363 void
364 logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
366 logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
370 * Write ORIGIN to the output stream.
372 void
373 logicalrep_write_origin(StringInfo out, const char *origin,
374 XLogRecPtr origin_lsn)
376 pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
378 /* fixed fields */
379 pq_sendint64(out, origin_lsn);
381 /* origin string */
382 pq_sendstring(out, origin);
386 * Read ORIGIN from the output stream.
388 char *
389 logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
391 /* fixed fields */
392 *origin_lsn = pq_getmsgint64(in);
394 /* return origin */
395 return pstrdup(pq_getmsgstring(in));
399 * Write INSERT to the output stream.
401 void
402 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
403 TupleTableSlot *newslot, bool binary,
404 Bitmapset *columns, bool include_gencols)
406 pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
408 /* transaction ID (if not valid, we're not streaming) */
409 if (TransactionIdIsValid(xid))
410 pq_sendint32(out, xid);
412 /* use Oid as relation identifier */
413 pq_sendint32(out, RelationGetRelid(rel));
415 pq_sendbyte(out, 'N'); /* new tuple follows */
416 logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
420 * Read INSERT from stream.
422 * Fills the new tuple.
424 LogicalRepRelId
425 logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
427 char action;
428 LogicalRepRelId relid;
430 /* read the relation id */
431 relid = pq_getmsgint(in, 4);
433 action = pq_getmsgbyte(in);
434 if (action != 'N')
435 elog(ERROR, "expected new tuple but got %d",
436 action);
438 logicalrep_read_tuple(in, newtup);
440 return relid;
444 * Write UPDATE to the output stream.
446 void
447 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
448 TupleTableSlot *oldslot, TupleTableSlot *newslot,
449 bool binary, Bitmapset *columns, bool include_gencols)
451 pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
453 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
454 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
455 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
457 /* transaction ID (if not valid, we're not streaming) */
458 if (TransactionIdIsValid(xid))
459 pq_sendint32(out, xid);
461 /* use Oid as relation identifier */
462 pq_sendint32(out, RelationGetRelid(rel));
464 if (oldslot != NULL)
466 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
467 pq_sendbyte(out, 'O'); /* old tuple follows */
468 else
469 pq_sendbyte(out, 'K'); /* old key follows */
470 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
471 include_gencols);
474 pq_sendbyte(out, 'N'); /* new tuple follows */
475 logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols);
479 * Read UPDATE from stream.
481 LogicalRepRelId
482 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
483 LogicalRepTupleData *oldtup,
484 LogicalRepTupleData *newtup)
486 char action;
487 LogicalRepRelId relid;
489 /* read the relation id */
490 relid = pq_getmsgint(in, 4);
492 /* read and verify action */
493 action = pq_getmsgbyte(in);
494 if (action != 'K' && action != 'O' && action != 'N')
495 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
496 action);
498 /* check for old tuple */
499 if (action == 'K' || action == 'O')
501 logicalrep_read_tuple(in, oldtup);
502 *has_oldtuple = true;
504 action = pq_getmsgbyte(in);
506 else
507 *has_oldtuple = false;
509 /* check for new tuple */
510 if (action != 'N')
511 elog(ERROR, "expected action 'N', got %c",
512 action);
514 logicalrep_read_tuple(in, newtup);
516 return relid;
520 * Write DELETE to the output stream.
522 void
523 logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
524 TupleTableSlot *oldslot, bool binary,
525 Bitmapset *columns, bool include_gencols)
527 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
528 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
529 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
531 pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
533 /* transaction ID (if not valid, we're not streaming) */
534 if (TransactionIdIsValid(xid))
535 pq_sendint32(out, xid);
537 /* use Oid as relation identifier */
538 pq_sendint32(out, RelationGetRelid(rel));
540 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
541 pq_sendbyte(out, 'O'); /* old tuple follows */
542 else
543 pq_sendbyte(out, 'K'); /* old key follows */
545 logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols);
549 * Read DELETE from stream.
551 * Fills the old tuple.
553 LogicalRepRelId
554 logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
556 char action;
557 LogicalRepRelId relid;
559 /* read the relation id */
560 relid = pq_getmsgint(in, 4);
562 /* read and verify action */
563 action = pq_getmsgbyte(in);
564 if (action != 'K' && action != 'O')
565 elog(ERROR, "expected action 'O' or 'K', got %c", action);
567 logicalrep_read_tuple(in, oldtup);
569 return relid;
573 * Write TRUNCATE to the output stream.
575 void
576 logicalrep_write_truncate(StringInfo out,
577 TransactionId xid,
578 int nrelids,
579 Oid relids[],
580 bool cascade, bool restart_seqs)
582 int i;
583 uint8 flags = 0;
585 pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
587 /* transaction ID (if not valid, we're not streaming) */
588 if (TransactionIdIsValid(xid))
589 pq_sendint32(out, xid);
591 pq_sendint32(out, nrelids);
593 /* encode and send truncate flags */
594 if (cascade)
595 flags |= TRUNCATE_CASCADE;
596 if (restart_seqs)
597 flags |= TRUNCATE_RESTART_SEQS;
598 pq_sendint8(out, flags);
600 for (i = 0; i < nrelids; i++)
601 pq_sendint32(out, relids[i]);
605 * Read TRUNCATE from stream.
607 List *
608 logicalrep_read_truncate(StringInfo in,
609 bool *cascade, bool *restart_seqs)
611 int i;
612 int nrelids;
613 List *relids = NIL;
614 uint8 flags;
616 nrelids = pq_getmsgint(in, 4);
618 /* read and decode truncate flags */
619 flags = pq_getmsgint(in, 1);
620 *cascade = (flags & TRUNCATE_CASCADE) > 0;
621 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
623 for (i = 0; i < nrelids; i++)
624 relids = lappend_oid(relids, pq_getmsgint(in, 4));
626 return relids;
630 * Write MESSAGE to stream
632 void
633 logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
634 bool transactional, const char *prefix, Size sz,
635 const char *message)
637 uint8 flags = 0;
639 pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
641 /* encode and send message flags */
642 if (transactional)
643 flags |= MESSAGE_TRANSACTIONAL;
645 /* transaction ID (if not valid, we're not streaming) */
646 if (TransactionIdIsValid(xid))
647 pq_sendint32(out, xid);
649 pq_sendint8(out, flags);
650 pq_sendint64(out, lsn);
651 pq_sendstring(out, prefix);
652 pq_sendint32(out, sz);
653 pq_sendbytes(out, message, sz);
657 * Write relation description to the output stream.
659 void
660 logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
661 Bitmapset *columns, bool include_gencols)
663 char *relname;
665 pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
667 /* transaction ID (if not valid, we're not streaming) */
668 if (TransactionIdIsValid(xid))
669 pq_sendint32(out, xid);
671 /* use Oid as relation identifier */
672 pq_sendint32(out, RelationGetRelid(rel));
674 /* send qualified relation name */
675 logicalrep_write_namespace(out, RelationGetNamespace(rel));
676 relname = RelationGetRelationName(rel);
677 pq_sendstring(out, relname);
679 /* send replica identity */
680 pq_sendbyte(out, rel->rd_rel->relreplident);
682 /* send the attribute info */
683 logicalrep_write_attrs(out, rel, columns, include_gencols);
687 * Read the relation info from stream and return as LogicalRepRelation.
689 LogicalRepRelation *
690 logicalrep_read_rel(StringInfo in)
692 LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
694 rel->remoteid = pq_getmsgint(in, 4);
696 /* Read relation name from stream */
697 rel->nspname = pstrdup(logicalrep_read_namespace(in));
698 rel->relname = pstrdup(pq_getmsgstring(in));
700 /* Read the replica identity. */
701 rel->replident = pq_getmsgbyte(in);
703 /* Get attribute description */
704 logicalrep_read_attrs(in, rel);
706 return rel;
710 * Write type info to the output stream.
712 * This function will always write base type info.
714 void
715 logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
717 Oid basetypoid = getBaseType(typoid);
718 HeapTuple tup;
719 Form_pg_type typtup;
721 pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
723 /* transaction ID (if not valid, we're not streaming) */
724 if (TransactionIdIsValid(xid))
725 pq_sendint32(out, xid);
727 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
728 if (!HeapTupleIsValid(tup))
729 elog(ERROR, "cache lookup failed for type %u", basetypoid);
730 typtup = (Form_pg_type) GETSTRUCT(tup);
732 /* use Oid as type identifier */
733 pq_sendint32(out, typoid);
735 /* send qualified type name */
736 logicalrep_write_namespace(out, typtup->typnamespace);
737 pq_sendstring(out, NameStr(typtup->typname));
739 ReleaseSysCache(tup);
743 * Read type info from the output stream.
745 void
746 logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
748 ltyp->remoteid = pq_getmsgint(in, 4);
750 /* Read type name from stream */
751 ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
752 ltyp->typname = pstrdup(pq_getmsgstring(in));
756 * Write a tuple to the outputstream, in the most efficient format possible.
758 static void
759 logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
760 bool binary, Bitmapset *columns, bool include_gencols)
762 TupleDesc desc;
763 Datum *values;
764 bool *isnull;
765 int i;
766 uint16 nliveatts = 0;
768 desc = RelationGetDescr(rel);
770 for (i = 0; i < desc->natts; i++)
772 Form_pg_attribute att = TupleDescAttr(desc, i);
774 if (!logicalrep_should_publish_column(att, columns, include_gencols))
775 continue;
777 nliveatts++;
779 pq_sendint16(out, nliveatts);
781 slot_getallattrs(slot);
782 values = slot->tts_values;
783 isnull = slot->tts_isnull;
785 /* Write the values */
786 for (i = 0; i < desc->natts; i++)
788 HeapTuple typtup;
789 Form_pg_type typclass;
790 Form_pg_attribute att = TupleDescAttr(desc, i);
792 if (!logicalrep_should_publish_column(att, columns, include_gencols))
793 continue;
795 if (isnull[i])
797 pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
798 continue;
801 if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
804 * Unchanged toasted datum. (Note that we don't promise to detect
805 * unchanged data in general; this is just a cheap check to avoid
806 * sending large values unnecessarily.)
808 pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
809 continue;
812 typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
813 if (!HeapTupleIsValid(typtup))
814 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
815 typclass = (Form_pg_type) GETSTRUCT(typtup);
818 * Send in binary if requested and type has suitable send function.
820 if (binary && OidIsValid(typclass->typsend))
822 bytea *outputbytes;
823 int len;
825 pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
826 outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
827 len = VARSIZE(outputbytes) - VARHDRSZ;
828 pq_sendint(out, len, 4); /* length */
829 pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
830 pfree(outputbytes);
832 else
834 char *outputstr;
836 pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
837 outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
838 pq_sendcountedtext(out, outputstr, strlen(outputstr));
839 pfree(outputstr);
842 ReleaseSysCache(typtup);
847 * Read tuple in logical replication format from stream.
849 static void
850 logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
852 int i;
853 int natts;
855 /* Get number of attributes */
856 natts = pq_getmsgint(in, 2);
858 /* Allocate space for per-column values; zero out unused StringInfoDatas */
859 tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
860 tuple->colstatus = (char *) palloc(natts * sizeof(char));
861 tuple->ncols = natts;
863 /* Read the data */
864 for (i = 0; i < natts; i++)
866 char *buff;
867 char kind;
868 int len;
869 StringInfo value = &tuple->colvalues[i];
871 kind = pq_getmsgbyte(in);
872 tuple->colstatus[i] = kind;
874 switch (kind)
876 case LOGICALREP_COLUMN_NULL:
877 /* nothing more to do */
878 break;
879 case LOGICALREP_COLUMN_UNCHANGED:
880 /* we don't receive the value of an unchanged column */
881 break;
882 case LOGICALREP_COLUMN_TEXT:
883 case LOGICALREP_COLUMN_BINARY:
884 len = pq_getmsgint(in, 4); /* read length */
886 /* and data */
887 buff = palloc(len + 1);
888 pq_copymsgbytes(in, buff, len);
891 * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
892 * as input functions require that. For
893 * LOGICALREP_COLUMN_BINARY it's not technically required, but
894 * it's harmless.
896 buff[len] = '\0';
898 initStringInfoFromString(value, buff, len);
899 break;
900 default:
901 elog(ERROR, "unrecognized data representation type '%c'", kind);
907 * Write relation attribute metadata to the stream.
909 static void
910 logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns,
911 bool include_gencols)
913 TupleDesc desc;
914 int i;
915 uint16 nliveatts = 0;
916 Bitmapset *idattrs = NULL;
917 bool replidentfull;
919 desc = RelationGetDescr(rel);
921 /* send number of live attributes */
922 for (i = 0; i < desc->natts; i++)
924 Form_pg_attribute att = TupleDescAttr(desc, i);
926 if (!logicalrep_should_publish_column(att, columns, include_gencols))
927 continue;
929 nliveatts++;
931 pq_sendint16(out, nliveatts);
933 /* fetch bitmap of REPLICATION IDENTITY attributes */
934 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
935 if (!replidentfull)
936 idattrs = RelationGetIdentityKeyBitmap(rel);
938 /* send the attributes */
939 for (i = 0; i < desc->natts; i++)
941 Form_pg_attribute att = TupleDescAttr(desc, i);
942 uint8 flags = 0;
944 if (!logicalrep_should_publish_column(att, columns, include_gencols))
945 continue;
947 /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
948 if (replidentfull ||
949 bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
950 idattrs))
951 flags |= LOGICALREP_IS_REPLICA_IDENTITY;
953 pq_sendbyte(out, flags);
955 /* attribute name */
956 pq_sendstring(out, NameStr(att->attname));
958 /* attribute type id */
959 pq_sendint32(out, (int) att->atttypid);
961 /* attribute mode */
962 pq_sendint32(out, att->atttypmod);
965 bms_free(idattrs);
969 * Read relation attribute metadata from the stream.
971 static void
972 logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
974 int i;
975 int natts;
976 char **attnames;
977 Oid *atttyps;
978 Bitmapset *attkeys = NULL;
980 natts = pq_getmsgint(in, 2);
981 attnames = palloc(natts * sizeof(char *));
982 atttyps = palloc(natts * sizeof(Oid));
984 /* read the attributes */
985 for (i = 0; i < natts; i++)
987 uint8 flags;
989 /* Check for replica identity column */
990 flags = pq_getmsgbyte(in);
991 if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
992 attkeys = bms_add_member(attkeys, i);
994 /* attribute name */
995 attnames[i] = pstrdup(pq_getmsgstring(in));
997 /* attribute type id */
998 atttyps[i] = (Oid) pq_getmsgint(in, 4);
1000 /* we ignore attribute mode for now */
1001 (void) pq_getmsgint(in, 4);
1004 rel->attnames = attnames;
1005 rel->atttyps = atttyps;
1006 rel->attkeys = attkeys;
1007 rel->natts = natts;
1011 * Write the namespace name or empty string for pg_catalog (to save space).
1013 static void
1014 logicalrep_write_namespace(StringInfo out, Oid nspid)
1016 if (nspid == PG_CATALOG_NAMESPACE)
1017 pq_sendbyte(out, '\0');
1018 else
1020 char *nspname = get_namespace_name(nspid);
1022 if (nspname == NULL)
1023 elog(ERROR, "cache lookup failed for namespace %u",
1024 nspid);
1026 pq_sendstring(out, nspname);
1031 * Read the namespace name while treating empty string as pg_catalog.
1033 static const char *
1034 logicalrep_read_namespace(StringInfo in)
1036 const char *nspname = pq_getmsgstring(in);
1038 if (nspname[0] == '\0')
1039 nspname = "pg_catalog";
1041 return nspname;
1045 * Write the information for the start stream message to the output stream.
1047 void
1048 logicalrep_write_stream_start(StringInfo out,
1049 TransactionId xid, bool first_segment)
1051 pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
1053 Assert(TransactionIdIsValid(xid));
1055 /* transaction ID (we're starting to stream, so must be valid) */
1056 pq_sendint32(out, xid);
1058 /* 1 if this is the first streaming segment for this xid */
1059 pq_sendbyte(out, first_segment ? 1 : 0);
1063 * Read the information about the start stream message from output stream.
1065 TransactionId
1066 logicalrep_read_stream_start(StringInfo in, bool *first_segment)
1068 TransactionId xid;
1070 Assert(first_segment);
1072 xid = pq_getmsgint(in, 4);
1073 *first_segment = (pq_getmsgbyte(in) == 1);
1075 return xid;
1079 * Write the stop stream message to the output stream.
1081 void
1082 logicalrep_write_stream_stop(StringInfo out)
1084 pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
1088 * Write STREAM COMMIT to the output stream.
1090 void
1091 logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
1092 XLogRecPtr commit_lsn)
1094 uint8 flags = 0;
1096 pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
1098 Assert(TransactionIdIsValid(txn->xid));
1100 /* transaction ID */
1101 pq_sendint32(out, txn->xid);
1103 /* send the flags field (unused for now) */
1104 pq_sendbyte(out, flags);
1106 /* send fields */
1107 pq_sendint64(out, commit_lsn);
1108 pq_sendint64(out, txn->end_lsn);
1109 pq_sendint64(out, txn->xact_time.commit_time);
1113 * Read STREAM COMMIT from the output stream.
1115 TransactionId
1116 logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
1118 TransactionId xid;
1119 uint8 flags;
1121 xid = pq_getmsgint(in, 4);
1123 /* read flags (unused for now) */
1124 flags = pq_getmsgbyte(in);
1126 if (flags != 0)
1127 elog(ERROR, "unrecognized flags %u in commit message", flags);
1129 /* read fields */
1130 commit_data->commit_lsn = pq_getmsgint64(in);
1131 commit_data->end_lsn = pq_getmsgint64(in);
1132 commit_data->committime = pq_getmsgint64(in);
1134 return xid;
1138 * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1139 * same for the top-level transaction abort.
1141 * If write_abort_info is true, send the abort_lsn and abort_time fields,
1142 * otherwise don't.
1144 void
1145 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
1146 TransactionId subxid, XLogRecPtr abort_lsn,
1147 TimestampTz abort_time, bool write_abort_info)
1149 pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
1151 Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
1153 /* transaction ID */
1154 pq_sendint32(out, xid);
1155 pq_sendint32(out, subxid);
1157 if (write_abort_info)
1159 pq_sendint64(out, abort_lsn);
1160 pq_sendint64(out, abort_time);
1165 * Read STREAM ABORT from the output stream.
1167 * If read_abort_info is true, read the abort_lsn and abort_time fields,
1168 * otherwise don't.
1170 void
1171 logicalrep_read_stream_abort(StringInfo in,
1172 LogicalRepStreamAbortData *abort_data,
1173 bool read_abort_info)
1175 Assert(abort_data);
1177 abort_data->xid = pq_getmsgint(in, 4);
1178 abort_data->subxid = pq_getmsgint(in, 4);
1180 if (read_abort_info)
1182 abort_data->abort_lsn = pq_getmsgint64(in);
1183 abort_data->abort_time = pq_getmsgint64(in);
1185 else
1187 abort_data->abort_lsn = InvalidXLogRecPtr;
1188 abort_data->abort_time = 0;
1193 * Get string representing LogicalRepMsgType.
1195 const char *
1196 logicalrep_message_type(LogicalRepMsgType action)
1198 static char err_unknown[20];
1200 switch (action)
1202 case LOGICAL_REP_MSG_BEGIN:
1203 return "BEGIN";
1204 case LOGICAL_REP_MSG_COMMIT:
1205 return "COMMIT";
1206 case LOGICAL_REP_MSG_ORIGIN:
1207 return "ORIGIN";
1208 case LOGICAL_REP_MSG_INSERT:
1209 return "INSERT";
1210 case LOGICAL_REP_MSG_UPDATE:
1211 return "UPDATE";
1212 case LOGICAL_REP_MSG_DELETE:
1213 return "DELETE";
1214 case LOGICAL_REP_MSG_TRUNCATE:
1215 return "TRUNCATE";
1216 case LOGICAL_REP_MSG_RELATION:
1217 return "RELATION";
1218 case LOGICAL_REP_MSG_TYPE:
1219 return "TYPE";
1220 case LOGICAL_REP_MSG_MESSAGE:
1221 return "MESSAGE";
1222 case LOGICAL_REP_MSG_BEGIN_PREPARE:
1223 return "BEGIN PREPARE";
1224 case LOGICAL_REP_MSG_PREPARE:
1225 return "PREPARE";
1226 case LOGICAL_REP_MSG_COMMIT_PREPARED:
1227 return "COMMIT PREPARED";
1228 case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
1229 return "ROLLBACK PREPARED";
1230 case LOGICAL_REP_MSG_STREAM_START:
1231 return "STREAM START";
1232 case LOGICAL_REP_MSG_STREAM_STOP:
1233 return "STREAM STOP";
1234 case LOGICAL_REP_MSG_STREAM_COMMIT:
1235 return "STREAM COMMIT";
1236 case LOGICAL_REP_MSG_STREAM_ABORT:
1237 return "STREAM ABORT";
1238 case LOGICAL_REP_MSG_STREAM_PREPARE:
1239 return "STREAM PREPARE";
1243 * This message provides context in the error raised when applying a
1244 * logical message. So we can't throw an error here. Return an unknown
1245 * indicator value so that the original error is still reported.
1247 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1249 return err_unknown;
1253 * Check if the column 'att' of a table should be published.
1255 * 'columns' represents the publication column list (if any) for that table.
1257 * 'include_gencols' flag indicates whether generated columns should be
1258 * published when there is no column list. Typically, this will have the same
1259 * value as the 'publish_generated_columns' publication parameter.
1261 * Note that generated columns can be published only when present in a
1262 * publication column list, or when include_gencols is true.
1264 bool
1265 logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns,
1266 bool include_gencols)
1268 if (att->attisdropped)
1269 return false;
1271 /* If a column list is provided, publish only the cols in that list. */
1272 if (columns)
1273 return bms_is_member(att->attnum, columns);
1275 /* All non-generated columns are always published. */
1276 return att->attgenerated ? include_gencols : true;