db_updater: Put parentheses back
[merlin.git] / ipc.c
blob8afad52db78f32e7c7b37cc795dc0b386111693b
1 #include "shared.h"
3 static int listen_sock = -1; /* for bind() and such */
4 static char *ipc_sock_path;
5 merlin_node ipc; /* the ipc node */
7 /*
8 * this lives here since both daemon and module needs it, but
9 * none of the apps should have it
11 int dump_nodeinfo(merlin_node *n, int sd, int instance_id)
13 merlin_nodeinfo *i;
14 merlin_node_stats *s = &n->stats;
15 struct merlin_assigned_objects aso;
16 merlin_peer_group *pg;
18 i = &n->info;
19 pg = n->pgroup;
20 aso.hosts = n->assigned.current.hosts + n->assigned.extra.hosts;
21 aso.services = n->assigned.current.services + n->assigned.extra.services;
23 nsock_printf(sd, "instance_id=%d;name=%s;source_name=%s;socket=%d;type=%s;"
24 "state=%s;peer_id=%u;flags=%d;"
25 "address=%s;port=%u;"
26 "data_timeout=%u;last_recv=%lu;last_sent=%lu;"
27 "last_conn_attempt=%lu;last_action=%d;latency=%d;"
28 "binlog_size=%u;iocache_available=%lu;"
29 "events_sent=%llu;events_read=%llu;"
30 "events_logged=%llu;events_dropped=%llu;"
31 "bytes_sent=%llu;bytes_read=%llu;"
32 "bytes_logged=%llu;bytes_dropped=%llu;"
33 "version=%u;word_size=%u;byte_order=%u;"
34 "object_structure_version=%u;start=%lu.%lu;"
35 "last_cfg_change=%lu;config_hash=%s;"
36 "self_assigned_peer_id=%u;warn_flags=%u;"
37 "active_peers=%u;configured_peers=%u;"
38 "active_pollers=%u;configured_pollers=%u;"
39 "active_masters=%u;configured_masters=%u;"
40 "host_checks_handled=%u;service_checks_handled=%u;"
41 "host_checks_executed=%u;service_checks_executed=%u;"
42 "monitored_object_state_size=%u;connect_time=%lu;"
43 "assigned_hosts=%u;assigned_services=%u;"
44 "expired_hosts=%u;expired_services=%u;"
45 "pgroup_active_nodes=%u;pgroup_total_nodes=%u;"
46 "pgroup_hosts=%u;pgroup_services=%u;"
47 "pgroup_id=%d;pgroup_hostgroups=%s"
48 "\n",
49 instance_id,
50 n->name, n->source_name, n->sock, node_type(n),
51 node_state_name(n->state), n->peer_id, n->flags,
52 inet_ntoa(n->sain.sin_addr), ntohs(n->sain.sin_port),
53 n->data_timeout, n->last_recv, n->last_sent,
54 n->last_conn_attempt, n->last_action, n->latency,
55 binlog_size(n->binlog), iocache_available(n->ioc),
56 s->events.sent, s->events.read,
57 s->events.logged, s->events.dropped,
58 s->bytes.sent, s->bytes.read,
59 s->bytes.logged, s->bytes.dropped,
60 i->version, i->word_size, i->byte_order,
61 i->object_structure_version, i->start.tv_sec, i->start.tv_usec,
62 i->last_cfg_change, tohex(i->config_hash, 20),
63 i->peer_id, n->warn_flags,
64 i->active_peers, i->configured_peers,
65 i->active_pollers, i->configured_pollers,
66 i->active_masters, i->configured_masters,
67 i->host_checks_handled, i->service_checks_handled,
68 n->host_checks, n->service_checks,
69 i->monitored_object_state_size, n->connect_time,
70 aso.hosts, aso.services,
71 n->assigned.expired.hosts, n->assigned.expired.services,
72 n->pgroup ? n->pgroup->active_nodes : 0,
73 n->pgroup ? n->pgroup->total_nodes : 0,
74 pg ? pg->assigned.hosts : 0, pg ? pg->assigned.services : 0,
75 pg ? pg->id : -1, pg ? pg->hostgroups : "");
76 return 0;
79 void ipc_init_struct(void)
81 memset(&ipc, 0, sizeof(ipc));
82 ipc.sock = -1;
83 ipc.state = STATE_NONE;
84 ipc.id = CTRL_GENERIC;
85 ipc.type = MODE_LOCAL;
86 ipc.name = "ipc";
87 ipc.flags = MERLIN_NODE_DEFAULT_IPC_FLAGS;
88 ipc.ioc = iocache_create(MERLIN_IOC_BUFSIZE);
89 if (ipc.ioc == NULL) {
90 lerr("Failed to malloc() %d bytes for ipc io cache: %s",
91 MERLIN_IOC_BUFSIZE, strerror(errno));
93 * failing to create this buffer means we can't communicate
94 * with the daemon in any sensible fashion, so we must bail
95 * out noisily when it happens
97 exit(1);
101 void ipc_log_event_count(void)
103 node_log_event_count(&ipc, 0);
106 int ipc_reinit(void)
108 ipc_deinit();
110 return ipc_init();
113 int ipc_accept(void)
115 struct sockaddr_un saun;
116 socklen_t slen = sizeof(struct sockaddr_un);
118 if (ipc.sock != -1) {
119 lwarn("New connection inbound when one already exists. Dropping old");
120 close(ipc.sock);
123 ipc.sock = accept(listen_sock, (struct sockaddr *)&saun, &slen);
124 if (ipc.sock < 0) {
125 lerr("Failed to accept() from listen_sock (%d): %s",
126 listen_sock, strerror(errno));
127 return -1;
130 node_set_state(&ipc, STATE_NEGOTIATING, "Accepted");
132 return ipc.sock;
135 static int ipc_set_sock_path(const char *path)
137 int result;
138 struct stat st;
140 /* the sock-path will be set both from module and daemon,
141 * so path must be absolute */
142 if (*path != '/')
143 return -1;
145 if (strlen(path) > UNIX_PATH_MAX)
146 return -1;
148 safe_free(ipc_sock_path);
150 ipc_sock_path = strdup(path);
151 if (!ipc_sock_path)
152 return -1;
154 result = stat(path, &st);
155 if (result < 0 && errno != ENOENT)
156 return -1;
158 if (!result && !(st.st_mode & S_IFSOCK))
159 return -2;
161 return 0;
164 int ipc_grok_var(char *var, char *val)
166 if (!val)
167 return 0;
169 if (!strcmp(var, "ipc_socket"))
170 return !ipc_set_sock_path(val);
172 if (!strcmp(var, "ipc_binlog")) {
173 lwarn("%s is deprecated. The name will always be computed.", var);
174 lwarn(" Set binlog_dir to control where the file will be created");
175 return 1;
178 if (!strcmp(var, "ipc_binlog_dir") || !strcmp(var, "ipc_backlog_dir")) {
179 lwarn("%s is deprecated. Use binlog_dir instead", var);
180 return 1;
183 return 0;
186 int ipc_init(void)
188 struct sockaddr_un saun;
189 struct sockaddr *sa = (struct sockaddr *)&saun;
190 socklen_t slen;
191 int quiet = 0;
193 /* don't spam the logs */
194 if (ipc.last_conn_attempt_logged + 30 >= time(NULL)) {
195 quiet = 1;
196 } else {
197 ipc.last_conn_attempt_logged = time(NULL);
200 if (!ipc_sock_path) {
201 lerr("Attempting to initialize ipc socket, but no socket path has been set\n");
202 return -1;
205 slen = strlen(ipc_sock_path);
207 if (!quiet) {
208 linfo("Initializing IPC socket '%s' for %s", ipc_sock_path,
209 is_module ? "module" : "daemon");
212 memset(&saun, 0, sizeof(saun));
213 saun.sun_family = AF_UNIX;
214 memcpy(saun.sun_path, ipc_sock_path, slen);
215 slen += sizeof(struct sockaddr);
217 if (listen_sock == -1 || (is_module && ipc.sock == -1)) {
218 listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
219 if (listen_sock < 0) {
220 lerr("Failed to obtain ipc socket: %s", strerror(errno));
221 return -1;
223 merlin_set_socket_options(listen_sock, 0);
226 if (!is_module) {
227 mode_t old_umask;
228 int result;
230 if (unlink(ipc_sock_path) && errno != ENOENT) {
231 lerr("Failed to unlink(%s)", ipc_sock_path);
232 return -1;
235 slen += offsetof(struct sockaddr_un, sun_path);
236 /* Socket is made world writable for now */
237 old_umask = umask(0);
238 result = bind(listen_sock, sa, slen);
239 umask(old_umask);
240 if (result < 0) {
241 if (!quiet) {
242 lerr("Failed to bind ipc socket %d to path '%s' with len %d: %s",
243 listen_sock, ipc_sock_path, slen, strerror(errno));
245 close(listen_sock);
246 listen_sock = -1;
247 return -1;
250 if (listen(listen_sock, 1) < 0) {
251 lerr("listen(%d, 1) failed: %s", listen_sock, strerror(errno));
252 close(listen_sock);
253 listen_sock = -1;
254 return -1;
257 return 0;
260 /* working with the module here */
261 if (connect(listen_sock, sa, slen) < 0) {
262 if (errno == EISCONN)
263 return 0;
264 if (!quiet) {
265 lerr("Failed to connect to ipc socket '%s': %s", ipc_sock_path, strerror(errno));
267 ipc_deinit();
268 return -1;
270 ipc.last_conn_attempt_logged = 0;
272 /* module connected successfully */
273 ipc.sock = listen_sock;
274 node_set_state(&ipc, STATE_CONNECTED, "Connected");
276 return 0;
280 void ipc_deinit(void)
282 node_disconnect(&ipc, "Deinitializing");
284 /* avoid spurious valgrind/strace warnings */
285 if (listen_sock >= 0)
286 close(listen_sock);
288 listen_sock = -1;
290 if (!is_module)
291 unlink(ipc_sock_path);
295 int ipc_is_connected(int msec)
297 if (is_module) {
298 if (ipc.sock < 0)
299 return ipc_reinit() == 0;
301 node_set_state(&ipc, STATE_CONNECTED, "Connected");
302 return 1;
305 if (io_read_ok(listen_sock, msec) > 0) {
306 int i;
307 ipc.sock = ipc_accept();
308 if (ipc.sock < 0) {
309 lerr("ipc: accept() failed: %s", strerror(errno));
310 return 0;
312 node_set_state(&ipc, STATE_CONNECTED, "Connected");
313 for (i = 0; i < num_nodes; i++) {
314 node_table[i]->csync_num_attempts = 0;
318 return ipc.sock != -1;
321 int ipc_listen_sock_desc(void)
323 return listen_sock;
327 * Sends a control packet to ipc, making sure it's connected
328 * first. If data isn't null, len bytes is copied from it to
329 * pkt.body
331 int ipc_ctrl(int code, uint sel, void *data, uint32_t len)
333 ipc_is_connected(0);
334 return node_ctrl(&ipc, code, sel, data, len, 100);
337 int ipc_send_event(merlin_event *pkt)
339 ipc_is_connected(0);
341 pkt->hdr.sig.id = MERLIN_SIGNATURE;
342 pkt->hdr.protocol = MERLIN_PROTOCOL_VERSION;
343 /* Only modules get to say when packets are sent */
344 if (is_module)
345 gettimeofday(&pkt->hdr.sent, NULL);
347 if (node_send_event(&ipc, pkt, 0) < 0) {
348 ipc_reinit();
349 return -1;
352 return 0;