Seed a slot for multilateral involvement, even if only Herokai now
[pg_logfebe.git] / pg_logfebe.c
blob99d9ee88a8702a8e3545b3298b08f9075c55448e
1 /*
2 * pg_logfebe.c
4 * Implements a module to be loaded via shared_preload_libraries that,
5 * should "logfebe.unix_socket" be set in postgresql.conf will cause
6 * log traffic to be written to the unix socket in question on a
7 * best-effort basis.
9 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
10 * Portions Copyright (c) 1994, Regents of the University of California
11 * Portions Copyright (c) 2012, Heroku
14 #include "postgres.h"
16 #include <stdint.h>
17 #include <sys/un.h>
18 #include <unistd.h>
20 #include "access/xact.h"
21 #include "funcapi.h"
22 #include "lib/stringinfo.h"
23 #include "libpq/libpq-be.h"
24 #include "miscadmin.h"
25 #include "pg_config.h"
26 #include "pgtime.h"
27 #include "storage/proc.h"
28 #include "tcop/tcopprot.h"
29 #include "utils/elog.h"
30 #include "utils/guc.h"
31 #include "utils/ps_status.h"
34 * 64-bit byte-swapping, as per
35 * http://stackoverflow.com/questions/809902/64-bit-ntohl-in-c
37 #if defined(__linux__)
38 #include <endian.h>
40 #elif defined(__FreeBSD__) || defined(__NetBSD__)
41 #include <sys/endian.h>
43 #elif defined(__OpenBSD__)
44 #include <sys/types.h>
45 #define be16toh(x) betoh16(x)
46 #define be32toh(x) betoh32(x)
47 #define be64toh(x) betoh64(x)
49 #elif defined(__darwin__)
50 #include <libkern/OSByteOrder.h>
51 #define htobe32(x) __DARWIN_OSSwapInt32(x)
52 #define htobe64(x) __DARWIN_OSSwapInt64(x)
53 #endif
55 #if defined(__darwin__)
57 * On Macintosh, MSG_NOSIGNAL is not defined, but it has a moral
58 * equivalent, SO_NOSIGPIPE.
60 #define MSG_NOSIGNAL SO_NOSIGPIPE
61 #endif
63 PG_MODULE_MAGIC;
65 #define FORMATTED_TS_LEN 128
68 * Startup version string, e.g. "PG-9.2.4/logfebe-1", where the
69 * "logfebe-1" indicates the pg_logfebe protocol version.
71 #define PROTO_VERSION ("PG-" PG_VERSION "/logfebe-1")
73 /* GUC-configured destination of the log pages */
74 static char *logUnixSocketPath = "";
75 static char *ident = "";
77 /* Old hook storage for loading/unloading of the extension */
78 static emit_log_hook_type prev_emit_log_hook = NULL;
80 /* Used to detect if values inherited over fork need resetting. */
81 static int savedPid = 0;
83 /* Caches the formatted start time */
84 static char cachedBackendStartTime[FORMATTED_TS_LEN];
86 /* Counter for log sequence number. */
87 static long seqNum = 0;
90 * File descriptor that log records are written to.
92 * Is re-set if a write fails.
94 static int outSockFd = -1;
96 /* Dynamic linking hooks for Postgres */
97 void _PG_init(void);
98 void _PG_fini(void);
100 /* Internal function definitions*/
101 static bool formAddr(struct sockaddr_un *dst, char *path);
102 static bool isLogLevelOutput(int elevel, int log_min_level);
103 static void appendStringInfoPtr(StringInfo dst, const char *s);
104 static void closeSocket(int *fd);
105 static void fmtLogMsg(StringInfo dst, ErrorData *edata);
106 static void formatLogTime(char *dst, size_t dstSz, struct timeval tv);
107 static void formatNow(char *dst, size_t dstSz);
108 static void gucOnAssignCloseInvalidate(const char *newval, void *extra);
109 static void logfebe_emit_log_hook(ErrorData *edata);
110 static void openSocket(int *dst, char *path);
111 static void optionalGucGet(char **dest, const char *name,
112 const char *shortDesc);
113 static void reCacheBackendStartTime(void);
114 static void sendOrInval(int *fd, char *payload, size_t payloadSz);
118 * Useful for HUP triggered reassignment: invalidate the socket, which will
119 * cause path information to be evaluated when reconnection and identification
120 * to be re-exchanged.
122 static void
123 gucOnAssignCloseInvalidate(const char *newval, void *extra)
125 closeSocket(&outSockFd);
129 * Procedure that wraps a bunch of boilerplate GUC options appropriate for all
130 * the options used in this extension.
132 static void
133 optionalGucGet(char **dest, const char *name,
134 const char *shortDesc)
136 DefineCustomStringVariable(
137 name,
138 shortDesc,
140 dest,
142 PGC_SIGHUP,
143 GUC_NOT_IN_SAMPLE,
144 NULL,
145 gucOnAssignCloseInvalidate,
146 NULL);
150 * Form a sockaddr_un for communication, returning false if this could not be
151 * completed.
153 static bool
154 formAddr(struct sockaddr_un *dst, char *path)
156 size_t len;
158 dst->sun_family = AF_UNIX;
159 len = strlcpy(dst->sun_path, path, sizeof dst->sun_path);
161 if (len <= sizeof dst->sun_path)
163 /* The copy could fit, and was copied. */
164 return true;
167 /* Truncation; dst does not contain the full passed path. */
168 return false;
172 * _PG_init() - library load-time initialization
174 * DO NOT make this static nor change its name!
176 * Init the module, all we have to do here is getting our GUC
178 void
179 _PG_init(void)
181 /* Set up GUCs */
182 optionalGucGet(&logUnixSocketPath, "logfebe.unix_socket",
183 "Unix socket to send logs to in FEBE frames.");
184 optionalGucGet(&ident, "logfebe.identity",
185 "The identity of the installation of PostgreSQL.");
187 EmitWarningsOnPlaceholders("logfebe");
189 /* Install hook */
190 prev_emit_log_hook = emit_log_hook;
191 emit_log_hook = logfebe_emit_log_hook;
196 * Given an invalid Fd in *dst, try to open a unix socket connection to the
197 * given path.
199 static void
200 openSocket(int *dst, char *path)
202 const int save_errno = errno;
203 struct sockaddr_un addr;
204 bool formed;
205 int fd = -1;
206 StringInfoData startup;
209 * This procedure is only defined on the domain of invalidated file
210 * descriptors
212 Assert(*dst < 0);
214 /* Begin attempting connection, first by forming the address */
215 formed = formAddr(&addr, path);
216 if (!formed)
218 /* Didn't work, give up */
219 goto err;
222 /* Get socket fd, or die */
223 fd = socket(AF_LOCAL, SOCK_STREAM, 0);
224 if (fd < 0)
225 goto err;
227 /* Connect socket to server. Or die. */
228 Assert(formed);
231 int res;
233 errno = 0;
234 res = connect(fd, (void *) &addr, sizeof addr);
235 if (res < 0 || (errno != EINTR && errno != 0))
236 goto err;
237 } while (errno == EINTR);
240 * Connection established.
242 * Try to send start-up information as a service to the caller. Should
243 * this fail, sendOrInval will close and invalidate the socket, though.
245 Assert(fd >= 0);
246 initStringInfo(&startup);
248 /* Prepare startup: protocol version ('V') frame */
250 const uint32_t nVlen = htobe32((sizeof PROTO_VERSION) +
251 sizeof(u_int32_t));
253 appendStringInfoChar(&startup, 'V');
254 appendBinaryStringInfo(&startup, (void *) &nVlen, sizeof nVlen);
255 appendBinaryStringInfo(&startup, PROTO_VERSION, sizeof PROTO_VERSION);
258 /* Prepare startup: system identification ('I') frame */
260 char *payload;
261 int payloadLen;
262 uint32_t nPayloadLen;
264 if (ident == NULL)
265 payload = "";
266 else
267 payload = ident;
269 payloadLen = strlen(payload) + sizeof '\0';
270 nPayloadLen = htobe32(payloadLen + sizeof nPayloadLen);
272 appendStringInfoChar(&startup, 'I');
273 appendBinaryStringInfo(&startup, (void *) &nPayloadLen,
274 sizeof nPayloadLen);
275 appendBinaryStringInfo(&startup, (void *) payload, payloadLen);
279 * Try to send the prepared startup packet, invaliding fd if things go
280 * awry.
282 sendOrInval(&fd, startup.data, startup.len);
283 pfree(startup.data);
284 *dst = fd;
285 goto exit;
287 err:
288 /* Close and invalidate 'fd' if it got made */
289 if (fd >= 0)
291 closeSocket(&fd);
292 Assert(fd < 0);
295 Assert(*dst < 0);
296 goto exit;
298 exit:
299 /* Universal post-condition */
300 errno = save_errno;
301 return;
305 * Perform a best-effort to close and invalidate a file descriptor.
307 * This exists to to encapsulate EINTR handling and invalidation.
309 static void
310 closeSocket(int *fd)
312 const int save_errno = errno;
315 * Close *fd and ignore EINTR, on advice from libusual's
316 * "safe_close" function:
318 * POSIX says close() can return EINTR but fd state is "undefined"
319 * later. Seems Linux and BSDs close the fd anyway and EINTR is
320 * simply informative. Thus retry is dangerous.
322 close(*fd);
323 *fd = -1;
325 errno = save_errno;
328 static void
329 formatLogTime(char *dst, size_t dstSz, struct timeval tv)
331 char msbuf[8];
332 struct pg_tm *tm;
333 pg_time_t stamp_time;
335 stamp_time = (pg_time_t) tv.tv_sec;
336 tm = pg_localtime(&stamp_time, log_timezone);
338 Assert(dstSz >= FORMATTED_TS_LEN);
339 pg_strftime(dst, dstSz,
340 /* leave room for milliseconds... */
341 "%Y-%m-%d %H:%M:%S %Z", tm);
343 /* 'paste' milliseconds into place... */
344 sprintf(msbuf, ".%03d", (int) (tv.tv_usec / 1000));
345 strncpy(dst + 19, msbuf, 4);
348 static void
349 reCacheBackendStartTime(void)
351 pg_time_t stampTime = (pg_time_t) MyStartTime;
354 * Note: we expect that guc.c will ensure that log_timezone is set up (at
355 * least with a minimal GMT value) before Log_line_prefix can become
356 * nonempty or CSV mode can be selected.
358 pg_strftime(cachedBackendStartTime, FORMATTED_TS_LEN,
359 "%Y-%m-%d %H:%M:%S %Z",
360 pg_localtime(&stampTime, log_timezone));
363 static void
364 formatNow(char *dst, size_t dstSz)
366 struct timeval tv;
368 gettimeofday(&tv, NULL);
370 Assert(dstSz >= FORMATTED_TS_LEN);
371 formatLogTime(dst, dstSz, tv);
375 * isLogLevelOutput -- is elevel logically >= log_min_level?
377 * We use this for tests that should consider LOG to sort out-of-order,
378 * between ERROR and FATAL. Generally this is the right thing for testing
379 * whether a message should go to the postmaster log, whereas a simple >=
380 * test is correct for testing whether the message should go to the client.
382 static bool
383 isLogLevelOutput(int elevel, int log_min_level)
385 if (elevel == LOG || elevel == COMMERROR)
387 if (log_min_level == LOG || log_min_level <= ERROR)
388 return true;
390 else if (log_min_level == LOG)
392 /* elevel != LOG */
393 if (elevel >= FATAL)
394 return true;
396 /* Neither is LOG */
397 else if (elevel >= log_min_level)
398 return true;
400 return false;
404 * Append a string in a special format that prepends information about
405 * its NULL-ity, should it be NULL.
407 static void
408 appendStringInfoPtr(StringInfo dst, const char *s)
410 /* 'N' for NULL, 'P' for "Present" */
411 if (s == NULL)
412 appendStringInfoChar(dst, 'N');
413 else
415 appendStringInfoChar(dst, 'P');
416 appendStringInfoString(dst, s);
419 appendStringInfoChar(dst, '\0');
422 static void
423 fmtLogMsg(StringInfo dst, ErrorData *edata)
426 char formattedLogTime[FORMATTED_TS_LEN];
428 /* timestamp with milliseconds */
429 formatNow(formattedLogTime, sizeof formattedLogTime);
432 * Always present, non-nullable; don't need to write the N/P
433 * header.
435 appendStringInfoString(dst, formattedLogTime);
436 appendStringInfoChar(dst, '\0');
439 /* username */
440 if (MyProcPort)
441 appendStringInfoPtr(dst, MyProcPort->user_name);
442 else
443 appendStringInfoPtr(dst, NULL);
445 /* database name */
446 if (MyProcPort)
447 appendStringInfoPtr(dst, MyProcPort->database_name);
448 else
449 appendStringInfoPtr(dst, NULL);
451 /* Process id */
453 uint32_t nPid = htobe32(savedPid);
455 appendBinaryStringInfo(dst, (void *) &nPid, sizeof nPid);
458 /* Remote host and port */
459 if (MyProcPort && MyProcPort->remote_host)
461 /* 'present' string header, since this string is nullable */
462 appendStringInfoChar(dst, 'P');
464 appendStringInfoString(dst, MyProcPort->remote_host);
465 if (MyProcPort->remote_port && MyProcPort->remote_port[0] != '\0')
467 appendStringInfoChar(dst, ':');
468 appendStringInfoString(dst, MyProcPort->remote_port);
471 appendStringInfoChar(dst, '\0');
473 else
474 appendStringInfoPtr(dst, NULL);
476 /* session id; non-nullable */
477 appendStringInfo(dst, "%lx.%x", (long) MyStartTime, MyProcPid);
478 appendStringInfoChar(dst, '\0');
480 /* Line number */
482 uint64_t nSeqNum = htobe64(seqNum);
483 appendBinaryStringInfo(dst, (void *) &nSeqNum, sizeof nSeqNum);
486 /* PS display */
487 if (MyProcPort)
489 StringInfoData msgbuf;
490 const char *psdisp;
491 int displen;
493 initStringInfo(&msgbuf);
495 psdisp = get_ps_display(&displen);
496 appendBinaryStringInfo(&msgbuf, psdisp, displen);
498 appendStringInfoChar(dst, 'P');
499 appendStringInfoString(dst, msgbuf.data);
500 appendStringInfoChar(dst, '\0');
502 pfree(msgbuf.data);
504 else
505 appendStringInfoPtr(dst, NULL);
507 /* session start timestamp */
508 if (cachedBackendStartTime[0] == '\0')
510 /* Rebuild the cache if it was blown */
511 reCacheBackendStartTime();
514 /* backend start time; non-nullable string */
515 appendStringInfoString(dst, cachedBackendStartTime);
516 appendStringInfoChar(dst, '\0');
519 * Virtual transaction id
521 * keep VXID format in sync with lockfuncs.c
523 if (MyProc != NULL && MyProc->backendId != InvalidBackendId)
525 appendStringInfoChar(dst, 'P');
526 appendStringInfo(dst, "%d/%u", MyProc->backendId, MyProc->lxid);
527 appendStringInfoChar(dst, '\0');
529 else
530 appendStringInfoPtr(dst, NULL);
533 * Transaction id
535 * This seems to be a mistake both here and in elog.c; in particular, it's
536 * not clear how the epoch would get added here. However, leave room in
537 * the protocol to fix this later by upcasting.
540 uint64_t nTxid = htobe64((uint64) GetTopTransactionIdIfAny());
542 appendBinaryStringInfo(dst, (void *) &nTxid, sizeof nTxid);
545 /* Error severity */
547 uint32_t nelevel = htobe32(edata->elevel);
549 appendBinaryStringInfo(dst, (void *) &nelevel, sizeof nelevel);
552 /* SQL state code */
553 appendStringInfoPtr(dst, unpack_sql_state(edata->sqlerrcode));
555 /* errmessage */
556 appendStringInfoPtr(dst, edata->message);
558 /* errdetail or errdetail_log */
559 if (edata->detail_log)
560 appendStringInfoPtr(dst, edata->detail_log);
561 else
562 appendStringInfoPtr(dst, edata->detail);
564 /* errhint */
565 appendStringInfoPtr(dst, edata->hint);
567 /* internal query */
568 appendStringInfoPtr(dst, edata->internalquery);
570 /* if printed internal query, print internal pos too */
571 if (edata->internalpos > 0 && edata->internalquery != NULL)
573 uint32_t ninternalpos = htobe32(edata->internalpos);
575 appendBinaryStringInfo(dst, (void *) &ninternalpos,
576 sizeof ninternalpos);
578 else
580 uint32_t ninternalpos = htobe32(-1);
582 appendBinaryStringInfo(dst, (void *) &ninternalpos,
583 sizeof ninternalpos);
586 /* errcontext */
587 appendStringInfoPtr(dst, edata->context);
590 * user query --- only reported if not disabled by the caller.
592 * Also include query position.
594 if (isLogLevelOutput(edata->elevel, log_min_error_statement) &&
595 debug_query_string != NULL && !edata->hide_stmt)
597 uint32_t nCursorPos = htobe32(edata->cursorpos);
599 appendStringInfoPtr(dst, debug_query_string);
600 appendBinaryStringInfo(dst, (void *) &nCursorPos, sizeof nCursorPos);
602 else
604 uint32_t nCursorPos = htobe32(-1);
606 appendStringInfoPtr(dst, NULL);
607 appendBinaryStringInfo(dst, (void *) &nCursorPos, sizeof nCursorPos);
610 /* file error location */
611 if (Log_error_verbosity >= PGERROR_VERBOSE)
613 StringInfoData msgbuf;
615 initStringInfo(&msgbuf);
617 if (edata->funcname && edata->filename)
618 appendStringInfo(&msgbuf, "%s, %s:%d",
619 edata->funcname, edata->filename,
620 edata->lineno);
621 else if (edata->filename)
622 appendStringInfo(&msgbuf, "%s:%d",
623 edata->filename, edata->lineno);
625 appendStringInfoChar(dst, 'P');
626 appendStringInfoString(dst, msgbuf.data);
627 appendStringInfoChar(dst, '\0');
629 pfree(msgbuf.data);
631 else
632 appendStringInfoPtr(dst, NULL);
634 /* application name */
635 appendStringInfoPtr(dst, application_name);
639 * Send the payload or invalidate *fd.
641 * No confirmation of success or failure is delivered.
643 static void
644 sendOrInval(int *fd, char *payload, size_t payloadSz)
646 const int saved_errno = errno;
647 ssize_t bytesWritten;
649 writeAgain:
650 errno = 0;
653 * Send, and carefully suppress SIGPIPE, which otherwise will
654 * cause sendOrInval's error handling to function in since a
655 * failure will come in as a signal rather than an error code.
657 * This is required to allow re-connection in event the server
658 * closes the connection.
660 bytesWritten = send(*fd, payload, payloadSz, MSG_NOSIGNAL);
663 * NB: Carefully perform signed-integer conversion to ssize_t;
664 * otherwise the comparison delivers unintuitive results.
666 if (bytesWritten < (ssize_t) payloadSz)
669 * Something went wrong.
671 * The ErrorData passed to the hook goes un-logged in this case (except
672 * when errno is EINTR).
674 * Because *fd is presumed a blocking socket, it is expected that
675 * whenever a full write could not be achieved that something is awry,
676 * and that the connection should abandoned.
678 Assert(errno != 0);
680 /* Harmless and brief; just try again */
681 if (errno == EINTR)
682 goto writeAgain;
685 * Close and invalidate the socket fd; a new attempt to get a valid fd
686 * must come the next time this hook is called.
688 closeSocket(fd);
691 errno = saved_errno;
694 static void
695 logfebe_emit_log_hook(ErrorData *edata)
697 int save_errno;
698 StringInfoData buf;
701 * This is one of the few places where we'd rather not inherit a static
702 * variable's value from the postmaster. But since we will, reset it when
703 * MyProcPid changes.
705 if (savedPid != MyProcPid)
707 savedPid = MyProcPid;
709 /* Invalidate all inherited values */
710 seqNum = 0;
711 cachedBackendStartTime[0] = '\0';
713 if (outSockFd >= 0)
715 closeSocket(&outSockFd);
720 * Increment log sequence number
722 * Done early on so this happens regardless if there are problems emitting
723 * the log.
725 seqNum += 1;
728 * Early exit if the socket path is not set and isn't in the format of
729 * an absolute path.
731 * The empty identity ("ident") is a valid one, so it is not rejected in
732 * the same way an empty logUnixSocketPath is.
734 if (logUnixSocketPath == NULL ||
735 strlen(logUnixSocketPath) <= 0 || logUnixSocketPath[0] != '/')
738 * Unsetting the GUCs via SIGHUP would leave a connection
739 * dangling, if it exists, close it.
741 if (outSockFd >= 0)
743 closeSocket(&outSockFd);
746 goto quickExit;
749 save_errno = errno;
752 * Initialize StringInfoDatas early, because pfree is called
753 * unconditionally at exit.
755 initStringInfo(&buf);
757 if (outSockFd < 0)
759 openSocket(&outSockFd, logUnixSocketPath);
761 /* Couldn't get a valid socket; give up */
762 if (outSockFd < 0)
763 goto exit;
767 * Make room for message type byte and length header. The length header
768 * must be overwritten to the correct value at the end.
771 const char logHdr[5] = {'L', '\0', '\0', '\0', '\0'};
773 appendBinaryStringInfo(&buf, logHdr, sizeof logHdr);
777 * Format the output, and figure out how long it is, and frame it
778 * for the protocol.
781 uint32_t *msgSize;
783 fmtLogMsg(&buf, edata);
786 * Check that buf is prepared properly, with enough space and
787 * the placeholder length expected.
789 Assert(buf.len > 5);
790 Assert(buf.data[0] == 'L');
792 msgSize = (uint32_t *)(buf.data + 1);
793 Assert(*msgSize == 0);
796 * Fill in *msgSize: buf contains the msg header, which is not
797 * included in length; subract and byte-swap to paste the
798 * right length into place.
800 *msgSize = htobe32(buf.len - 1);
803 /* Finally: try to send the constructed message */
804 sendOrInval(&outSockFd, buf.data, buf.len);
806 exit:
807 pfree(buf.data);
808 errno = save_errno;
810 quickExit:
811 /* Call a previous hook, should it exist */
812 if (prev_emit_log_hook != NULL)
813 prev_emit_log_hook(edata);
817 * Module unload callback
819 void
820 _PG_fini(void)
822 /* Uninstall hook */
823 emit_log_hook = prev_emit_log_hook;