1 /* Source: xio-posixmq.c */
2 /* Copyright Gerhard Rieger and contributors (see file CHANGES) */
3 /* Published under the GNU General Public License V.2, see file COPYING */
5 /* This file contains the source for opening addresses of POSIX MQ type */
7 #include "xiosysincludes.h"
10 #include "xio-socket.h"
11 #include "xio-listen.h"
12 #include "xio-posixmq.h"
13 #include "xio-named.h"
18 static int _posixmq_unlink(
20 int level
); /* message level on error */
22 static int xioopen_posixmq(int argc
, const char *argv
[], struct opt
*opts
, int xioflags
, xiofile_t
*xfd
, const struct addrdesc
*addrdesc
);
24 const struct addrdesc xioaddr_posixmq_bidir
= { "POSIXMQ-BIDIRECTIONAL", 1+XIO_RDWR
, xioopen_posixmq
, GROUP_FD
|GROUP_NAMED
|GROUP_POSIXMQ
|GROUP_RETRY
, XIO_RDWR
, 0, 0 HELP(":<mqname>") };
25 const struct addrdesc xioaddr_posixmq_read
= { "POSIXMQ-READ", 1+XIO_RDONLY
, xioopen_posixmq
, GROUP_FD
|GROUP_NAMED
|GROUP_POSIXMQ
|GROUP_RETRY
, XIO_RDONLY
, 0, 0 HELP(":<mqname>") };
26 const struct addrdesc xioaddr_posixmq_receive
= { "POSIXMQ-RECEIVE", 1+XIO_RDONLY
, xioopen_posixmq
, GROUP_FD
|GROUP_NAMED
|GROUP_POSIXMQ
|GROUP_RETRY
|GROUP_CHILD
, XIO_RDONLY
, XIOREAD_RECV_ONESHOT
, 0 HELP(":<mqname>") };
27 const struct addrdesc xioaddr_posixmq_send
= { "POSIXMQ-SEND", 1+XIO_WRONLY
, xioopen_posixmq
, GROUP_FD
|GROUP_NAMED
|GROUP_POSIXMQ
|GROUP_RETRY
|GROUP_CHILD
, XIO_WRONLY
, 0, 0 HELP(":<mqname>") };
29 const struct optdesc opt_posixmq_priority
= { "posixmq-priority", "mq-pri", OPT_POSIXMQ_PRIORITY
, GROUP_POSIXMQ
, PH_INIT
, TYPE_BOOL
, OFUNC_OFFSET
, XIO_OFFSETOF(para
.posixmq
.prio
), XIO_SIZEOF(para
.posixmq
.prio
), 0 };
31 /* _read(): open immediately, stay in transfer loop
32 _recv(): wait until data (how we know there is??), oneshot, opt.fork
34 static int xioopen_posixmq(
40 const struct addrdesc
*addrdesc
)
42 /* We expect the form: /mqname */
43 xiosingle_t
*sfd
= &xfd
->stream
;
45 int dirs
= addrdesc
->arg1
;
46 int oneshot
= addrdesc
->arg2
;
47 bool opt_unlink_early
= false;
49 bool opt_o_excl
= false;
50 mode_t opt_mode
= 0666;
55 bool with_intv
= false;
58 if (!xioparms
.experimental
) {
59 Error1("%s: use option --experimental to acknowledge unmature state", argv
[0]);
63 xio_syntax(argv
[0], 1, argc
-1, addrdesc
->syntax
);
69 retropt_bool(opts
, OPT_FORK
, &dofork
);
71 if (!(xioflags
& XIO_MAYFORK
)) {
72 Error1("%s: option fork not allowed in this context", argv
[0]);
75 sfd
->flags
|= XIO_DOESFORK
;
76 if (dirs
== XIO_WRONLY
) {
81 retropt_int(opts
, OPT_MAX_CHILDREN
, &maxchildren
);
82 if (! dofork
&& maxchildren
) {
83 Error("option max-children not allowed without option fork");
87 xiosetchilddied(); /* set SIGCHLD handler */
89 applyopts_offset(sfd
, opts
);
90 if (applyopts_single(sfd
, opts
, PH_INIT
) < 0) return STAT_NORETRY
;
91 applyopts(sfd
, -1, opts
, PH_INIT
);
93 if ((sfd
->para
.posixmq
.name
= strdup(name
)) == NULL
) {
94 Error1("strdup(\"%s\"): out of memory", name
);
97 retropt_bool(opts
, OPT_O_EXCL
, &opt_o_excl
);
98 retropt_mode(opts
, OPT_PERM
, &opt_mode
);
100 retropt_bool(opts
, OPT_UNLINK_EARLY
, &opt_unlink_early
);
101 if (opt_unlink_early
) {
102 _posixmq_unlink(sfd
->para
.posixmq
.name
, E_INFO
);
104 retropt_bool(opts
, OPT_UNLINK_CLOSE
, &sfd
->opt_unlink_close
);
105 if (sfd
->howtoend
== END_UNSPEC
)
106 sfd
->howtoend
= END_CLOSE
;
107 sfd
->dtype
= XIODATA_POSIXMQ
| oneshot
;
110 if (opt_o_excl
) oflag
|= O_EXCL
;
112 case XIO_RDWR
: oflag
|= O_RDWR
; break;
113 case XIO_RDONLY
: oflag
|= O_RDONLY
; break;
114 case XIO_WRONLY
: oflag
|= O_WRONLY
; break;
117 /* Now open the message queue */
118 Debug3("mq_open(\"%s\", %d, "F_mode
", NULL)", name
, oflag
, opt_mode
);
119 mqd
= mq_open(name
, oflag
, opt_mode
, NULL
);
121 Debug1("mq_open() -> %d", mqd
);
123 Error3("%s: mq_open(\"%s\"): %s", argv
[0], name
, strerror(errno
));
125 return STAT_RETRYLATER
;
129 if (!dofork
&& !oneshot
) {
132 /* Continue with modes that open only when data available */
135 if (xioparms
.logopt
== 'm') {
136 Info("starting POSIX-MQ fork loop, switching to syslog");
137 diag_set('y', xioparms
.syslogfac
); xioparms
.logopt
= 'y';
139 Info("starting POSIX-MQ fork loop");
143 /* Wait until a message is available (or until interval has expired),
144 then fork a sub process that handles this single message. Here we
145 continue waiting for more.
146 The trigger mechanism is described with function
147 _xioopen_dgram_recvfrom()
151 pid_t pid
; /* mostly int; only used with fork */
152 sigset_t mask_sigchld
;
154 Info1("%s: waiting for data or interval", argv
[0]);
156 struct pollfd pollfd
;
159 pollfd
.events
= (dirs
==XIO_RDONLY
?POLLIN
:POLLOUT
);
160 if (xiopoll(&pollfd
, 1, NULL
) > 0) {
163 if (errno
== EINTR
) {
166 Warn2("poll({%d,,},,-1): %s", sfd
->fd
, strerror(errno
));
169 if (!dofork
) return STAT_OK
;
171 Info("generating pipe that triggers parent when packet has been consumed");
172 if (dirs
== XIO_RDONLY
) {
173 if (Pipe(trigger
) < 0) {
174 Error1("pipe(): %s", strerror(errno
));
178 /* Block SIGCHLD until parent is ready to react */
179 sigemptyset(&mask_sigchld
);
180 sigaddset(&mask_sigchld
, SIGCHLD
);
181 Sigprocmask(SIG_BLOCK
, &mask_sigchld
, NULL
);
183 if ((pid
= xio_fork(false, E_ERROR
, xfd
->stream
.shutup
)) < 0) {
184 Sigprocmask(SIG_UNBLOCK
, &mask_sigchld
, NULL
);
185 if (dirs
==XIO_RDONLY
) {
189 xioclose_posixmq(sfd
);
190 return STAT_RETRYLATER
;
192 if (pid
== 0) { /* child */
193 pid_t cpid
= Getpid();
194 Sigprocmask(SIG_UNBLOCK
, &mask_sigchld
, NULL
);
195 xiosetenvulong("PID", cpid
, 1);
197 if (dirs
== XIO_RDONLY
) {
199 Fcntl_l(trigger
[1], F_SETFD
, FD_CLOEXEC
);
200 sfd
->triggerfd
= trigger
[1];
206 if (dirs
== XIO_RDONLY
) {
209 while (Read(trigger
[0], buf
, 1) < 0 && errno
== EINTR
)
215 Nanosleep(&sfd
->intervall
, NULL
);
219 /* now we are ready to handle signals */
220 Sigprocmask(SIG_UNBLOCK
, &mask_sigchld
, NULL
);
221 while (maxchildren
) {
222 if (num_child
< maxchildren
) break;
223 Notice1("max of %d children is active, waiting", num_child
);
224 while (!Sleep(UINT_MAX
)) ; /* any signal lets us continue */
226 Info("continue listening");
229 _xio_openlate(sfd
, opts
);
234 ssize_t
xiowrite_posixmq(
242 Debug4("mq_send(mqd=%d, %p, "F_Zu
", %u)", sfd
->fd
, buff
, bufsiz
, sfd
->para
.posixmq
.prio
);
243 res
= mq_send(sfd
->fd
, buff
, bufsiz
, sfd
->para
.posixmq
.prio
);
245 Debug1("mq_send() -> %d", res
);
248 Error2("mq_send(mqd=%d): %s", sfd
->fd
, strerror(errno
));
251 return bufsiz
; /* success */
254 ssize_t
xioread_posixmq(
262 Debug3("mq_receive(mqd=%d, %p, "F_Zu
", {} )", sfd
->fd
, buff
, bufsiz
);
263 res
= mq_receive(sfd
->fd
, buff
, bufsiz
, &sfd
->para
.posixmq
.prio
);
265 Debug1("mq_receive() -> "F_Zd
, res
);
268 Error2("mq_receive(mqd=%d): %s", sfd
->fd
, strerror(errno
));
271 if (sfd
->triggerfd
> 0) {
272 Close(sfd
->triggerfd
);
275 Info1("mq_receive() -> {prio=%u}", sfd
->para
.posixmq
.prio
);
276 xiosetenvulong("POSIXMQ_PRIO", (unsigned long)sfd
->para
.posixmq
.prio
, 1);
280 ssize_t
xiopending_posixmq(struct single
*sfd
);
282 ssize_t
xioclose_posixmq(
286 Debug1("xioclose_posixmq(): mq_close(%d)", sfd
->fd
);
287 res
= mq_close(sfd
->fd
);
289 Warn2("xioclose_posixmq(): mq_close(%d) -> -1: %s", sfd
->fd
, strerror(errno
));
291 Debug("xioclose_posixmq(): mq_close() -> 0");
293 if (sfd
->opt_unlink_close
) {
294 _posixmq_unlink(sfd
->para
.posixmq
.name
, E_WARN
);
296 free((void *)sfd
->para
.posixmq
.name
);
300 static int _posixmq_unlink(
302 int level
) /* message level on error */
307 Debug1("mq_unlink(\"%s\")", name
);
308 res
= mq_unlink(name
);
310 Debug1("mq_unlink() -> %d", res
);
313 Msg2(level
, "mq_unlink(\"%s\"): %s",name
, strerror(errno
));
318 #endif /* WITH_POSIXMQ */