Expand PMF_FN_* macros.
[netbsd-mini2440.git] / external / gpl2 / lvm2 / dist / daemons / clvmd / clvmd.c
blob07f5ef619f4193e64cf84f24d925b3eeed87bb81
1 /* $NetBSD$ */
3 /*
4 * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
5 * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
7 * This file is part of LVM2.
9 * This copyrighted material is made available to anyone wishing to use,
10 * modify, copy, or redistribute it subject to the terms and conditions
11 * of the GNU General Public License v.2.
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software Foundation,
15 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 * CLVMD: Cluster LVM daemon
22 #define _GNU_SOURCE
23 #define _FILE_OFFSET_BITS 64
25 #include <configure.h>
26 #include <libdevmapper.h>
28 #include <pthread.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33 #include <sys/un.h>
34 #include <sys/time.h>
35 #include <sys/ioctl.h>
36 #include <sys/utsname.h>
37 #include <netinet/in.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stddef.h>
41 #include <stdarg.h>
42 #include <signal.h>
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <getopt.h>
46 #include <syslog.h>
47 #include <errno.h>
48 #include <limits.h>
49 #ifdef HAVE_COROSYNC_CONFDB_H
50 #include <corosync/confdb.h>
51 #endif
53 #include "clvmd-comms.h"
54 #include "lvm-functions.h"
55 #include "clvm.h"
56 #include "lvm-version.h"
57 #include "clvmd.h"
58 #include "refresh_clvmd.h"
59 #include "lvm-logging.h"
61 #ifndef TRUE
62 #define TRUE 1
63 #endif
64 #ifndef FALSE
65 #define FALSE 0
66 #endif
68 #define MAX_RETRIES 4
70 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
72 /* Head of the fd list. Also contains
73 the cluster_socket details */
74 static struct local_client local_client_head;
76 static unsigned short global_xid = 0; /* Last transaction ID issued */
78 struct cluster_ops *clops = NULL;
80 static char our_csid[MAX_CSID_LEN];
81 static unsigned max_csid_len;
82 static unsigned max_cluster_message;
83 static unsigned max_cluster_member_name_len;
85 /* Structure of items on the LVM thread list */
86 struct lvm_thread_cmd {
87 struct dm_list list;
89 struct local_client *client;
90 struct clvm_header *msg;
91 char csid[MAX_CSID_LEN];
92 int remote; /* Flag */
93 int msglen;
94 unsigned short xid;
97 debug_t debug;
98 static pthread_t lvm_thread;
99 static pthread_mutex_t lvm_thread_mutex;
100 static pthread_cond_t lvm_thread_cond;
101 static pthread_mutex_t lvm_start_mutex;
102 static struct dm_list lvm_cmd_head;
103 static volatile sig_atomic_t quit = 0;
104 static volatile sig_atomic_t reread_config = 0;
105 static int child_pipe[2];
107 /* Reasons the daemon failed initialisation */
108 #define DFAIL_INIT 1
109 #define DFAIL_LOCAL_SOCK 2
110 #define DFAIL_CLUSTER_IF 3
111 #define DFAIL_MALLOC 4
112 #define DFAIL_TIMEOUT 5
113 #define SUCCESS 0
115 typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC} if_type_t;
117 typedef void *(lvm_pthread_fn_t)(void*);
119 /* Prototypes for code further down */
120 static void sigusr2_handler(int sig);
121 static void sighup_handler(int sig);
122 static void sigterm_handler(int sig);
123 static void send_local_reply(struct local_client *client, int status,
124 int clientid);
125 static void free_reply(struct local_client *client);
126 static void send_version_message(void);
127 static void *pre_and_post_thread(void *arg);
128 static int send_message(void *buf, int msglen, const char *csid, int fd,
129 const char *errtext);
130 static int read_from_local_sock(struct local_client *thisfd);
131 static int process_local_command(struct clvm_header *msg, int msglen,
132 struct local_client *client,
133 unsigned short xid);
134 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
135 const char *csid);
136 static int process_reply(const struct clvm_header *msg, int msglen,
137 const char *csid);
138 static int open_local_sock(void);
139 static int check_local_clvmd(void);
140 static struct local_client *find_client(int clientid);
141 static void main_loop(int local_sock, int cmd_timeout);
142 static void be_daemon(int start_timeout);
143 static int check_all_clvmds_running(struct local_client *client);
144 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
145 int len, const char *csid,
146 struct local_client **new_client);
147 static void lvm_thread_fn(void *) __attribute__ ((noreturn));
148 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
149 int msglen, const char *csid);
150 static int distribute_command(struct local_client *thisfd);
151 static void hton_clvm(struct clvm_header *hdr);
152 static void ntoh_clvm(struct clvm_header *hdr);
153 static void add_reply_to_list(struct local_client *client, int status,
154 const char *csid, const char *buf, int len);
155 static if_type_t parse_cluster_interface(char *ifname);
156 static if_type_t get_cluster_type(void);
158 static void usage(char *prog, FILE *file)
160 fprintf(file, "Usage:\n");
161 fprintf(file, "%s [Vhd]\n", prog);
162 fprintf(file, "\n");
163 fprintf(file, " -V Show version of clvmd\n");
164 fprintf(file, " -h Show this help information\n");
165 fprintf(file, " -d Set debug level\n");
166 fprintf(file, " If starting clvmd then don't fork, run in the foreground\n");
167 fprintf(file, " -R Tell all running clvmds in the cluster to reload their device cache\n");
168 fprintf(file, " -C Sets debug level (from -d) on all clvmd instances clusterwide\n");
169 fprintf(file, " -t<secs> Command timeout (default 60 seconds)\n");
170 fprintf(file, " -T<secs> Startup timeout (default none)\n");
171 fprintf(file, " -I<cmgr> Cluster manager (default: auto)\n");
172 fprintf(file, " Available cluster managers: ");
173 #ifdef USE_COROSYNC
174 fprintf(file, "corosync ");
175 #endif
176 #ifdef USE_CMAN
177 fprintf(file, "cman ");
178 #endif
179 #ifdef USE_OPENAIS
180 fprintf(file, "openais ");
181 #endif
182 #ifdef USE_GULM
183 fprintf(file, "gulm ");
184 #endif
185 fprintf(file, "\n");
188 /* Called to signal the parent how well we got on during initialisation */
189 static void child_init_signal(int status)
191 if (child_pipe[1]) {
192 write(child_pipe[1], &status, sizeof(status));
193 close(child_pipe[1]);
195 if (status)
196 exit(status);
200 void debuglog(const char *fmt, ...)
202 time_t P;
203 va_list ap;
204 static int syslog_init = 0;
206 if (debug == DEBUG_STDERR) {
207 va_start(ap,fmt);
208 time(&P);
209 fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
210 vfprintf(stderr, fmt, ap);
211 va_end(ap);
213 if (debug == DEBUG_SYSLOG) {
214 if (!syslog_init) {
215 openlog("clvmd", LOG_PID, LOG_DAEMON);
216 syslog_init = 1;
219 va_start(ap,fmt);
220 vsyslog(LOG_DEBUG, fmt, ap);
221 va_end(ap);
225 static const char *decode_cmd(unsigned char cmdl)
227 static char buf[128];
228 const char *command;
230 switch (cmdl) {
231 case CLVMD_CMD_TEST:
232 command = "TEST";
233 break;
234 case CLVMD_CMD_LOCK_VG:
235 command = "LOCK_VG";
236 break;
237 case CLVMD_CMD_LOCK_LV:
238 command = "LOCK_LV";
239 break;
240 case CLVMD_CMD_REFRESH:
241 command = "REFRESH";
242 break;
243 case CLVMD_CMD_SET_DEBUG:
244 command = "SET_DEBUG";
245 break;
246 case CLVMD_CMD_GET_CLUSTERNAME:
247 command = "GET_CLUSTERNAME";
248 break;
249 case CLVMD_CMD_VG_BACKUP:
250 command = "VG_BACKUP";
251 break;
252 case CLVMD_CMD_REPLY:
253 command = "REPLY";
254 break;
255 case CLVMD_CMD_VERSION:
256 command = "VERSION";
257 break;
258 case CLVMD_CMD_GOAWAY:
259 command = "GOAWAY";
260 break;
261 case CLVMD_CMD_LOCK:
262 command = "LOCK";
263 break;
264 case CLVMD_CMD_UNLOCK:
265 command = "UNLOCK";
266 break;
267 case CLVMD_CMD_LOCK_QUERY:
268 command = "LOCK_QUERY";
269 break;
270 default:
271 command = "unknown";
272 break;
275 sprintf(buf, "%s (0x%x)", command, cmdl);
277 return buf;
280 int main(int argc, char *argv[])
282 int local_sock;
283 struct local_client *newfd;
284 struct utsname nodeinfo;
285 signed char opt;
286 int cmd_timeout = DEFAULT_CMD_TIMEOUT;
287 int start_timeout = 0;
288 if_type_t cluster_iface = IF_AUTO;
289 sigset_t ss;
290 int using_gulm = 0;
291 int debug_opt = 0;
292 int clusterwide_opt = 0;
294 /* Deal with command-line arguments */
295 opterr = 0;
296 optind = 0;
297 while ((opt = getopt(argc, argv, "?vVhd::t:RT:CI:")) != EOF) {
298 switch (opt) {
299 case 'h':
300 usage(argv[0], stdout);
301 exit(0);
303 case '?':
304 usage(argv[0], stderr);
305 exit(0);
307 case 'R':
308 return refresh_clvmd()==1?0:1;
310 case 'C':
311 clusterwide_opt = 1;
312 break;
314 case 'd':
315 debug_opt = 1;
316 if (optarg)
317 debug = atoi(optarg);
318 else
319 debug = DEBUG_STDERR;
320 break;
322 case 't':
323 cmd_timeout = atoi(optarg);
324 if (!cmd_timeout) {
325 fprintf(stderr, "command timeout is invalid\n");
326 usage(argv[0], stderr);
327 exit(1);
329 break;
330 case 'I':
331 cluster_iface = parse_cluster_interface(optarg);
332 break;
333 case 'T':
334 start_timeout = atoi(optarg);
335 if (start_timeout <= 0) {
336 fprintf(stderr, "startup timeout is invalid\n");
337 usage(argv[0], stderr);
338 exit(1);
340 break;
342 case 'V':
343 printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
344 printf("Protocol version: %d.%d.%d\n",
345 CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
346 CLVMD_PATCH_VERSION);
347 exit(1);
348 break;
353 /* Setting debug options on an existing clvmd */
354 if (debug_opt && !check_local_clvmd()) {
356 /* Sending to stderr makes no sense for a detached daemon */
357 if (debug == DEBUG_STDERR)
358 debug = DEBUG_SYSLOG;
359 return debug_clvmd(debug, clusterwide_opt)==1?0:1;
362 /* Fork into the background (unless requested not to) */
363 if (debug != DEBUG_STDERR) {
364 be_daemon(start_timeout);
367 DEBUGLOG("CLVMD started\n");
369 /* Open the Unix socket we listen for commands on.
370 We do this before opening the cluster socket so that
371 potential clients will block rather than error if we are running
372 but the cluster is not ready yet */
373 local_sock = open_local_sock();
374 if (local_sock < 0)
375 child_init_signal(DFAIL_LOCAL_SOCK);
377 /* Set up signal handlers, USR1 is for cluster change notifications (in cman)
378 USR2 causes child threads to exit.
379 HUP causes gulm version to re-read nodes list from CCS.
380 PIPE should be ignored */
381 signal(SIGUSR2, sigusr2_handler);
382 signal(SIGHUP, sighup_handler);
383 signal(SIGPIPE, SIG_IGN);
385 /* Block SIGUSR2/SIGINT/SIGTERM in process */
386 sigemptyset(&ss);
387 sigaddset(&ss, SIGUSR2);
388 sigaddset(&ss, SIGINT);
389 sigaddset(&ss, SIGTERM);
390 sigprocmask(SIG_BLOCK, &ss, NULL);
392 /* Initialise the LVM thread variables */
393 dm_list_init(&lvm_cmd_head);
394 pthread_mutex_init(&lvm_thread_mutex, NULL);
395 pthread_cond_init(&lvm_thread_cond, NULL);
396 pthread_mutex_init(&lvm_start_mutex, NULL);
397 init_lvhash();
399 /* Start the cluster interface */
400 if (cluster_iface == IF_AUTO)
401 cluster_iface = get_cluster_type();
403 #ifdef USE_CMAN
404 if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
405 max_csid_len = CMAN_MAX_CSID_LEN;
406 max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
407 max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
408 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
410 #endif
411 #ifdef USE_GULM
412 if (!clops)
413 if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
414 max_csid_len = GULM_MAX_CSID_LEN;
415 max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
416 max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
417 using_gulm = 1;
418 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
420 #endif
421 #ifdef USE_COROSYNC
422 if (!clops)
423 if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
424 max_csid_len = COROSYNC_CSID_LEN;
425 max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
426 max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
427 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
429 #endif
430 #ifdef USE_OPENAIS
431 if (!clops)
432 if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
433 max_csid_len = OPENAIS_CSID_LEN;
434 max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
435 max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
436 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
438 #endif
440 if (!clops) {
441 DEBUGLOG("Can't initialise cluster interface\n");
442 log_error("Can't initialise cluster interface\n");
443 child_init_signal(DFAIL_CLUSTER_IF);
445 DEBUGLOG("Cluster ready, doing some more initialisation\n");
447 /* Save our CSID */
448 uname(&nodeinfo);
449 clops->get_our_csid(our_csid);
451 /* Initialise the FD list head */
452 local_client_head.fd = clops->get_main_cluster_fd();
453 local_client_head.type = CLUSTER_MAIN_SOCK;
454 local_client_head.callback = clops->cluster_fd_callback;
456 /* Add the local socket to the list */
457 newfd = malloc(sizeof(struct local_client));
458 if (!newfd)
459 child_init_signal(DFAIL_MALLOC);
461 newfd->fd = local_sock;
462 newfd->removeme = 0;
463 newfd->type = LOCAL_RENDEZVOUS;
464 newfd->callback = local_rendezvous_callback;
465 newfd->next = local_client_head.next;
466 local_client_head.next = newfd;
468 /* This needs to be started after cluster initialisation
469 as it may need to take out locks */
470 DEBUGLOG("starting LVM thread\n");
472 /* Don't let anyone else to do work until we are started */
473 pthread_mutex_lock(&lvm_start_mutex);
474 pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
475 (void *)(long)using_gulm);
477 /* Tell the rest of the cluster our version number */
478 /* CMAN can do this immediately, gulm needs to wait until
479 the core initialisation has finished and the node list
480 has been gathered */
481 if (clops->cluster_init_completed)
482 clops->cluster_init_completed();
484 DEBUGLOG("clvmd ready for work\n");
485 child_init_signal(SUCCESS);
487 /* Try to shutdown neatly */
488 signal(SIGTERM, sigterm_handler);
489 signal(SIGINT, sigterm_handler);
491 /* Do some work */
492 main_loop(local_sock, cmd_timeout);
494 destroy_lvm();
496 return 0;
499 /* Called when the GuLM cluster layer has completed initialisation.
500 We send the version message */
501 void clvmd_cluster_init_completed()
503 send_version_message();
506 /* Data on a connected socket */
507 static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
508 const char *csid,
509 struct local_client **new_client)
511 *new_client = NULL;
512 return read_from_local_sock(thisfd);
515 /* Data on a connected socket */
516 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
517 int len, const char *csid,
518 struct local_client **new_client)
520 /* Someone connected to our local socket, accept it. */
522 struct sockaddr_un socka;
523 struct local_client *newfd;
524 socklen_t sl = sizeof(socka);
525 int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
527 if (client_fd == -1 && errno == EINTR)
528 return 1;
530 if (client_fd >= 0) {
531 newfd = malloc(sizeof(struct local_client));
532 if (!newfd) {
533 close(client_fd);
534 return 1;
536 newfd->fd = client_fd;
537 newfd->type = LOCAL_SOCK;
538 newfd->xid = 0;
539 newfd->removeme = 0;
540 newfd->callback = local_sock_callback;
541 newfd->bits.localsock.replies = NULL;
542 newfd->bits.localsock.expected_replies = 0;
543 newfd->bits.localsock.cmd = NULL;
544 newfd->bits.localsock.in_progress = FALSE;
545 newfd->bits.localsock.sent_out = FALSE;
546 newfd->bits.localsock.threadid = 0;
547 newfd->bits.localsock.finished = 0;
548 newfd->bits.localsock.pipe_client = NULL;
549 newfd->bits.localsock.private = NULL;
550 newfd->bits.localsock.all_success = 1;
551 DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
552 *new_client = newfd;
554 return 1;
557 static int local_pipe_callback(struct local_client *thisfd, char *buf,
558 int maxlen, const char *csid,
559 struct local_client **new_client)
561 int len;
562 char buffer[PIPE_BUF];
563 struct local_client *sock_client = thisfd->bits.pipe.client;
564 int status = -1; /* in error by default */
566 len = read(thisfd->fd, buffer, sizeof(int));
567 if (len == -1 && errno == EINTR)
568 return 1;
570 if (len == sizeof(int)) {
571 memcpy(&status, buffer, sizeof(int));
574 DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
575 thisfd->fd, len, status);
577 /* EOF on pipe or an error, close it */
578 if (len <= 0) {
579 int jstat;
580 void *ret = &status;
581 close(thisfd->fd);
583 /* Clear out the cross-link */
584 if (thisfd->bits.pipe.client != NULL)
585 thisfd->bits.pipe.client->bits.localsock.pipe_client =
586 NULL;
588 /* Reap child thread */
589 if (thisfd->bits.pipe.threadid) {
590 jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
591 thisfd->bits.pipe.threadid = 0;
592 if (thisfd->bits.pipe.client != NULL)
593 thisfd->bits.pipe.client->bits.localsock.
594 threadid = 0;
596 return -1;
597 } else {
598 DEBUGLOG("background routine status was %d, sock_client=%p\n",
599 status, sock_client);
600 /* But has the client gone away ?? */
601 if (sock_client == NULL) {
602 DEBUGLOG
603 ("Got PIPE response for dead client, ignoring it\n");
604 } else {
605 /* If error then just return that code */
606 if (status)
607 send_local_reply(sock_client, status,
608 sock_client->fd);
609 else {
610 if (sock_client->bits.localsock.state ==
611 POST_COMMAND) {
612 send_local_reply(sock_client, 0,
613 sock_client->fd);
614 } else // PRE_COMMAND finished.
616 if (
617 (status =
618 distribute_command(sock_client)) !=
619 0) send_local_reply(sock_client,
620 EFBIG,
621 sock_client->
622 fd);
627 return len;
630 /* If a noed is up, look for it in the reply array, if it's not there then
631 add one with "ETIMEDOUT".
632 NOTE: This won't race with real replies because they happen in the same thread.
634 static void timedout_callback(struct local_client *client, const char *csid,
635 int node_up)
637 if (node_up) {
638 struct node_reply *reply;
639 char nodename[max_cluster_member_name_len];
641 clops->name_from_csid(csid, nodename);
642 DEBUGLOG("Checking for a reply from %s\n", nodename);
643 pthread_mutex_lock(&client->bits.localsock.reply_mutex);
645 reply = client->bits.localsock.replies;
646 while (reply && strcmp(reply->node, nodename) != 0) {
647 reply = reply->next;
650 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
652 if (!reply) {
653 DEBUGLOG("Node %s timed-out\n", nodename);
654 add_reply_to_list(client, ETIMEDOUT, csid,
655 "Command timed out", 18);
660 /* Called when the request has timed out on at least one node. We fill in
661 the remaining node entries with ETIMEDOUT and return.
663 By the time we get here the node that caused
664 the timeout could have gone down, in which case we will never get the expected
665 number of replies that triggers the post command so we need to do it here
667 static void request_timed_out(struct local_client *client)
669 DEBUGLOG("Request timed-out. padding\n");
670 clops->cluster_do_node_callback(client, timedout_callback);
672 if (client->bits.localsock.num_replies !=
673 client->bits.localsock.expected_replies) {
674 /* Post-process the command */
675 if (client->bits.localsock.threadid) {
676 pthread_mutex_lock(&client->bits.localsock.mutex);
677 client->bits.localsock.state = POST_COMMAND;
678 pthread_cond_signal(&client->bits.localsock.cond);
679 pthread_mutex_unlock(&client->bits.localsock.mutex);
684 /* This is where the real work happens */
685 static void main_loop(int local_sock, int cmd_timeout)
687 DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
689 sigset_t ss;
690 sigemptyset(&ss);
691 sigaddset(&ss, SIGINT);
692 sigaddset(&ss, SIGTERM);
693 pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
694 /* Main loop */
695 while (!quit) {
696 fd_set in;
697 int select_status;
698 struct local_client *thisfd;
699 struct timeval tv = { cmd_timeout, 0 };
700 int quorate = clops->is_quorate();
702 /* Wait on the cluster FD and all local sockets/pipes */
703 local_client_head.fd = clops->get_main_cluster_fd();
704 FD_ZERO(&in);
705 for (thisfd = &local_client_head; thisfd != NULL;
706 thisfd = thisfd->next) {
708 if (thisfd->removeme)
709 continue;
711 /* if the cluster is not quorate then don't listen for new requests */
712 if ((thisfd->type != LOCAL_RENDEZVOUS &&
713 thisfd->type != LOCAL_SOCK) || quorate)
714 FD_SET(thisfd->fd, &in);
717 select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
719 if (reread_config) {
720 int saved_errno = errno;
722 reread_config = 0;
723 if (clops->reread_config)
724 clops->reread_config();
725 errno = saved_errno;
728 if (select_status > 0) {
729 struct local_client *lastfd = NULL;
730 char csid[MAX_CSID_LEN];
731 char buf[max_cluster_message];
733 for (thisfd = &local_client_head; thisfd != NULL;
734 thisfd = thisfd->next) {
736 if (thisfd->removeme) {
737 struct local_client *free_fd;
738 lastfd->next = thisfd->next;
739 free_fd = thisfd;
740 thisfd = lastfd;
742 DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
744 /* Queue cleanup, this also frees the client struct */
745 add_to_lvmqueue(free_fd, NULL, 0, NULL);
746 break;
749 if (FD_ISSET(thisfd->fd, &in)) {
750 struct local_client *newfd = NULL;
751 int ret;
753 /* Do callback */
754 ret =
755 thisfd->callback(thisfd, buf,
756 sizeof(buf), csid,
757 &newfd);
758 /* Ignore EAGAIN */
759 if (ret < 0 && (errno == EAGAIN ||
760 errno == EINTR)) continue;
762 /* Got error or EOF: Remove it from the list safely */
763 if (ret <= 0) {
764 struct local_client *free_fd;
765 int type = thisfd->type;
767 /* If the cluster socket shuts down, so do we */
768 if (type == CLUSTER_MAIN_SOCK ||
769 type == CLUSTER_INTERNAL)
770 goto closedown;
772 DEBUGLOG("ret == %d, errno = %d. removing client\n",
773 ret, errno);
774 lastfd->next = thisfd->next;
775 free_fd = thisfd;
776 thisfd = lastfd;
777 close(free_fd->fd);
779 /* Queue cleanup, this also frees the client struct */
780 add_to_lvmqueue(free_fd, NULL, 0, NULL);
781 break;
784 /* New client...simply add it to the list */
785 if (newfd) {
786 newfd->next = thisfd->next;
787 thisfd->next = newfd;
788 break;
791 lastfd = thisfd;
795 /* Select timed out. Check for clients that have been waiting too long for a response */
796 if (select_status == 0) {
797 time_t the_time = time(NULL);
799 for (thisfd = &local_client_head; thisfd != NULL;
800 thisfd = thisfd->next) {
801 if (thisfd->type == LOCAL_SOCK
802 && thisfd->bits.localsock.sent_out
803 && thisfd->bits.localsock.sent_time +
804 cmd_timeout < the_time
805 && thisfd->bits.localsock.
806 expected_replies !=
807 thisfd->bits.localsock.num_replies) {
808 /* Send timed out message + replies we already have */
809 DEBUGLOG
810 ("Request timed-out (send: %ld, now: %ld)\n",
811 thisfd->bits.localsock.sent_time,
812 the_time);
814 thisfd->bits.localsock.all_success = 0;
816 request_timed_out(thisfd);
820 if (select_status < 0) {
821 if (errno == EINTR)
822 continue;
824 #ifdef DEBUG
825 perror("select error");
826 exit(-1);
827 #endif
831 closedown:
832 clops->cluster_closedown();
833 close(local_sock);
836 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
838 int child_status;
839 int sstat;
840 fd_set fds;
841 struct timeval tv = {timeout, 0};
843 FD_ZERO(&fds);
844 FD_SET(c_pipe, &fds);
846 sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
847 if (sstat == 0) {
848 fprintf(stderr, "clvmd startup timed out\n");
849 exit(DFAIL_TIMEOUT);
851 if (sstat == 1) {
852 if (read(c_pipe, &child_status, sizeof(child_status)) !=
853 sizeof(child_status)) {
855 fprintf(stderr, "clvmd failed in initialisation\n");
856 exit(DFAIL_INIT);
858 else {
859 switch (child_status) {
860 case SUCCESS:
861 break;
862 case DFAIL_INIT:
863 fprintf(stderr, "clvmd failed in initialisation\n");
864 break;
865 case DFAIL_LOCAL_SOCK:
866 fprintf(stderr, "clvmd could not create local socket\n");
867 fprintf(stderr, "Another clvmd is probably already running\n");
868 break;
869 case DFAIL_CLUSTER_IF:
870 fprintf(stderr, "clvmd could not connect to cluster manager\n");
871 fprintf(stderr, "Consult syslog for more information\n");
872 break;
873 case DFAIL_MALLOC:
874 fprintf(stderr, "clvmd failed, not enough memory\n");
875 break;
876 default:
877 fprintf(stderr, "clvmd failed, error was %d\n", child_status);
878 break;
880 exit(child_status);
883 fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
884 exit(DFAIL_INIT);
888 * Fork into the background and detach from our parent process.
889 * In the interests of user-friendliness we wait for the daemon
890 * to complete initialisation before returning its status
891 * the the user.
893 static void be_daemon(int timeout)
895 pid_t pid;
896 int devnull = open("/dev/null", O_RDWR);
897 if (devnull == -1) {
898 perror("Can't open /dev/null");
899 exit(3);
902 pipe(child_pipe);
904 switch (pid = fork()) {
905 case -1:
906 perror("clvmd: can't fork");
907 exit(2);
909 case 0: /* Child */
910 close(child_pipe[0]);
911 break;
913 default: /* Parent */
914 close(child_pipe[1]);
915 wait_for_child(child_pipe[0], timeout);
918 /* Detach ourself from the calling environment */
919 if (close(0) || close(1) || close(2)) {
920 perror("Error closing terminal FDs");
921 exit(4);
923 setsid();
925 if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
926 || dup2(devnull, 2) < 0) {
927 perror("Error setting terminal FDs to /dev/null");
928 log_error("Error setting terminal FDs to /dev/null: %m");
929 exit(5);
931 if (chdir("/")) {
932 log_error("Error setting current directory to /: %m");
933 exit(6);
938 /* Called when we have a read from the local socket.
939 was in the main loop but it's grown up and is a big girl now */
940 static int read_from_local_sock(struct local_client *thisfd)
942 int len;
943 int argslen;
944 int missing_len;
945 char buffer[PIPE_BUF];
947 len = read(thisfd->fd, buffer, sizeof(buffer));
948 if (len == -1 && errno == EINTR)
949 return 1;
951 DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
953 /* EOF or error on socket */
954 if (len <= 0) {
955 int *status;
956 int jstat;
958 DEBUGLOG("EOF on local socket: inprogress=%d\n",
959 thisfd->bits.localsock.in_progress);
961 thisfd->bits.localsock.finished = 1;
963 /* If the client went away in mid command then tidy up */
964 if (thisfd->bits.localsock.in_progress) {
965 pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
966 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
967 thisfd->bits.localsock.state = POST_COMMAND;
968 pthread_cond_signal(&thisfd->bits.localsock.cond);
969 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
971 /* Free any unsent buffers */
972 free_reply(thisfd);
975 /* Kill the subthread & free resources */
976 if (thisfd->bits.localsock.threadid) {
977 DEBUGLOG("Waiting for child thread\n");
978 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
979 thisfd->bits.localsock.state = PRE_COMMAND;
980 pthread_cond_signal(&thisfd->bits.localsock.cond);
981 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
983 jstat =
984 pthread_join(thisfd->bits.localsock.threadid,
985 (void **) &status);
986 DEBUGLOG("Joined child thread\n");
988 thisfd->bits.localsock.threadid = 0;
989 pthread_cond_destroy(&thisfd->bits.localsock.cond);
990 pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
992 /* Remove the pipe client */
993 if (thisfd->bits.localsock.pipe_client != NULL) {
994 struct local_client *newfd;
995 struct local_client *lastfd = NULL;
996 struct local_client *free_fd = NULL;
998 close(thisfd->bits.localsock.pipe_client->fd); /* Close pipe */
999 close(thisfd->bits.localsock.pipe);
1001 /* Remove pipe client */
1002 for (newfd = &local_client_head; newfd != NULL;
1003 newfd = newfd->next) {
1004 if (thisfd->bits.localsock.
1005 pipe_client == newfd) {
1006 thisfd->bits.localsock.
1007 pipe_client = NULL;
1009 lastfd->next = newfd->next;
1010 free_fd = newfd;
1011 newfd->next = lastfd;
1012 free(free_fd);
1013 break;
1015 lastfd = newfd;
1020 /* Free the command buffer */
1021 free(thisfd->bits.localsock.cmd);
1023 /* Clear out the cross-link */
1024 if (thisfd->bits.localsock.pipe_client != NULL)
1025 thisfd->bits.localsock.pipe_client->bits.pipe.client =
1026 NULL;
1028 close(thisfd->fd);
1029 return 0;
1030 } else {
1031 int comms_pipe[2];
1032 struct local_client *newfd;
1033 char csid[MAX_CSID_LEN];
1034 struct clvm_header *inheader;
1035 int status;
1037 inheader = (struct clvm_header *) buffer;
1039 /* Fill in the client ID */
1040 inheader->clientid = htonl(thisfd->fd);
1042 /* If we are already busy then return an error */
1043 if (thisfd->bits.localsock.in_progress) {
1044 struct clvm_header reply;
1045 reply.cmd = CLVMD_CMD_REPLY;
1046 reply.status = EBUSY;
1047 reply.arglen = 0;
1048 reply.flags = 0;
1049 send_message(&reply, sizeof(reply), our_csid,
1050 thisfd->fd,
1051 "Error sending EBUSY reply to local user");
1052 return len;
1055 /* Free any old buffer space */
1056 free(thisfd->bits.localsock.cmd);
1058 /* See if we have the whole message */
1059 argslen =
1060 len - strlen(inheader->node) - sizeof(struct clvm_header);
1061 missing_len = inheader->arglen - argslen;
1063 if (missing_len < 0)
1064 missing_len = 0;
1066 /* Save the message */
1067 thisfd->bits.localsock.cmd = malloc(len + missing_len);
1069 if (!thisfd->bits.localsock.cmd) {
1070 struct clvm_header reply;
1071 reply.cmd = CLVMD_CMD_REPLY;
1072 reply.status = ENOMEM;
1073 reply.arglen = 0;
1074 reply.flags = 0;
1075 send_message(&reply, sizeof(reply), our_csid,
1076 thisfd->fd,
1077 "Error sending ENOMEM reply to local user");
1078 return 0;
1080 memcpy(thisfd->bits.localsock.cmd, buffer, len);
1081 thisfd->bits.localsock.cmd_len = len + missing_len;
1082 inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1084 /* If we don't have the full message then read the rest now */
1085 if (missing_len) {
1086 char *argptr =
1087 inheader->node + strlen(inheader->node) + 1;
1089 while (missing_len > 0 && len >= 0) {
1090 DEBUGLOG
1091 ("got %d bytes, need another %d (total %d)\n",
1092 argslen, missing_len, inheader->arglen);
1093 len = read(thisfd->fd, argptr + argslen,
1094 missing_len);
1095 if (len >= 0) {
1096 missing_len -= len;
1097 argslen += len;
1102 /* Initialise and lock the mutex so the subthread will wait after
1103 finishing the PRE routine */
1104 if (!thisfd->bits.localsock.threadid) {
1105 pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1106 pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1107 pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1110 /* Only run the command if all the cluster nodes are running CLVMD */
1111 if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1112 (check_all_clvmds_running(thisfd) == -1)) {
1113 thisfd->bits.localsock.expected_replies = 0;
1114 thisfd->bits.localsock.num_replies = 0;
1115 send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1116 return len;
1119 /* Check the node name for validity */
1120 if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1121 /* Error, node is not in the cluster */
1122 struct clvm_header reply;
1123 DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1125 reply.cmd = CLVMD_CMD_REPLY;
1126 reply.status = ENOENT;
1127 reply.flags = 0;
1128 reply.arglen = 0;
1129 send_message(&reply, sizeof(reply), our_csid,
1130 thisfd->fd,
1131 "Error sending ENOENT reply to local user");
1132 thisfd->bits.localsock.expected_replies = 0;
1133 thisfd->bits.localsock.num_replies = 0;
1134 thisfd->bits.localsock.in_progress = FALSE;
1135 thisfd->bits.localsock.sent_out = FALSE;
1136 return len;
1139 /* If we already have a subthread then just signal it to start */
1140 if (thisfd->bits.localsock.threadid) {
1141 pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1142 thisfd->bits.localsock.state = PRE_COMMAND;
1143 pthread_cond_signal(&thisfd->bits.localsock.cond);
1144 pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1145 return len;
1148 /* Create a pipe and add the reading end to our FD list */
1149 pipe(comms_pipe);
1150 newfd = malloc(sizeof(struct local_client));
1151 if (!newfd) {
1152 struct clvm_header reply;
1153 close(comms_pipe[0]);
1154 close(comms_pipe[1]);
1156 reply.cmd = CLVMD_CMD_REPLY;
1157 reply.status = ENOMEM;
1158 reply.arglen = 0;
1159 reply.flags = 0;
1160 send_message(&reply, sizeof(reply), our_csid,
1161 thisfd->fd,
1162 "Error sending ENOMEM reply to local user");
1163 return len;
1165 DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1166 comms_pipe[1]);
1167 newfd->fd = comms_pipe[0];
1168 newfd->removeme = 0;
1169 newfd->type = THREAD_PIPE;
1170 newfd->callback = local_pipe_callback;
1171 newfd->next = thisfd->next;
1172 newfd->bits.pipe.client = thisfd;
1173 newfd->bits.pipe.threadid = 0;
1174 thisfd->next = newfd;
1176 /* Store a cross link to the pipe */
1177 thisfd->bits.localsock.pipe_client = newfd;
1179 thisfd->bits.localsock.pipe = comms_pipe[1];
1181 /* Make sure the thread has a copy of it's own ID */
1182 newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1184 /* Run the pre routine */
1185 thisfd->bits.localsock.in_progress = TRUE;
1186 thisfd->bits.localsock.state = PRE_COMMAND;
1187 DEBUGLOG("Creating pre&post thread\n");
1188 status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1189 pre_and_post_thread, thisfd);
1190 DEBUGLOG("Created pre&post thread, state = %d\n", status);
1192 return len;
1195 /* Add a file descriptor from the cluster or comms interface to
1196 our list of FDs for select
1198 int add_client(struct local_client *new_client)
1200 new_client->next = local_client_head.next;
1201 local_client_head.next = new_client;
1203 return 0;
1206 /* Called when the pre-command has completed successfully - we
1207 now execute the real command on all the requested nodes */
1208 static int distribute_command(struct local_client *thisfd)
1210 struct clvm_header *inheader =
1211 (struct clvm_header *) thisfd->bits.localsock.cmd;
1212 int len = thisfd->bits.localsock.cmd_len;
1214 thisfd->xid = global_xid++;
1215 DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1217 /* Forward it to other nodes in the cluster if needed */
1218 if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1219 /* if node is empty then do it on the whole cluster */
1220 if (inheader->node[0] == '\0') {
1221 thisfd->bits.localsock.expected_replies =
1222 clops->get_num_nodes();
1223 thisfd->bits.localsock.num_replies = 0;
1224 thisfd->bits.localsock.sent_time = time(NULL);
1225 thisfd->bits.localsock.in_progress = TRUE;
1226 thisfd->bits.localsock.sent_out = TRUE;
1228 /* Do it here first */
1229 add_to_lvmqueue(thisfd, inheader, len, NULL);
1231 DEBUGLOG("Sending message to all cluster nodes\n");
1232 inheader->xid = thisfd->xid;
1233 send_message(inheader, len, NULL, -1,
1234 "Error forwarding message to cluster");
1235 } else {
1236 /* Do it on a single node */
1237 char csid[MAX_CSID_LEN];
1239 if (clops->csid_from_name(csid, inheader->node)) {
1240 /* This has already been checked so should not happen */
1241 return 0;
1242 } else {
1243 /* OK, found a node... */
1244 thisfd->bits.localsock.expected_replies = 1;
1245 thisfd->bits.localsock.num_replies = 0;
1246 thisfd->bits.localsock.in_progress = TRUE;
1248 /* Are we the requested node ?? */
1249 if (memcmp(csid, our_csid, max_csid_len) == 0) {
1250 DEBUGLOG("Doing command on local node only\n");
1251 add_to_lvmqueue(thisfd, inheader, len, NULL);
1252 } else {
1253 DEBUGLOG("Sending message to single node: %s\n",
1254 inheader->node);
1255 inheader->xid = thisfd->xid;
1256 send_message(inheader, len,
1257 csid, -1,
1258 "Error forwarding message to cluster node");
1262 } else {
1263 /* Local explicitly requested, ignore nodes */
1264 thisfd->bits.localsock.in_progress = TRUE;
1265 thisfd->bits.localsock.expected_replies = 1;
1266 thisfd->bits.localsock.num_replies = 0;
1267 add_to_lvmqueue(thisfd, inheader, len, NULL);
1269 return 0;
1272 /* Process a command from a remote node and return the result */
1273 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1274 const char *csid)
1276 char *replyargs;
1277 char nodename[max_cluster_member_name_len];
1278 int replylen = 0;
1279 int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1280 int status;
1281 int msg_malloced = 0;
1283 /* Get the node name as we /may/ need it later */
1284 clops->name_from_csid(csid, nodename);
1286 DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1287 decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1289 /* Check for GOAWAY and sulk */
1290 if (msg->cmd == CLVMD_CMD_GOAWAY) {
1292 DEBUGLOG("Told to go away by %s\n", nodename);
1293 log_error("Told to go away by %s\n", nodename);
1294 exit(99);
1297 /* Version check is internal - don't bother exposing it in
1298 clvmd-command.c */
1299 if (msg->cmd == CLVMD_CMD_VERSION) {
1300 int version_nums[3];
1301 char node[256];
1303 memcpy(version_nums, msg->args, sizeof(version_nums));
1305 clops->name_from_csid(csid, node);
1306 DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1307 node,
1308 ntohl(version_nums[0]),
1309 ntohl(version_nums[1]), ntohl(version_nums[2]));
1311 if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1312 struct clvm_header byebyemsg;
1313 DEBUGLOG
1314 ("Telling node %s to go away because of incompatible version number\n",
1315 node);
1316 log_notice
1317 ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1318 node, ntohl(version_nums[0]),
1319 ntohl(version_nums[1]), ntohl(version_nums[2]));
1321 byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1322 byebyemsg.status = 0;
1323 byebyemsg.flags = 0;
1324 byebyemsg.arglen = 0;
1325 byebyemsg.clientid = 0;
1326 clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1327 our_csid,
1328 "Error Sending GOAWAY message");
1329 } else {
1330 clops->add_up_node(csid);
1332 return;
1335 /* Allocate a default reply buffer */
1336 replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1338 if (replyargs != NULL) {
1339 /* Run the command */
1340 status =
1341 do_command(NULL, msg, msglen, &replyargs, buflen,
1342 &replylen);
1343 } else {
1344 status = ENOMEM;
1347 /* If it wasn't a reply, then reply */
1348 if (msg->cmd != CLVMD_CMD_REPLY) {
1349 char *aggreply;
1351 aggreply =
1352 realloc(replyargs, replylen + sizeof(struct clvm_header));
1353 if (aggreply) {
1354 struct clvm_header *agghead =
1355 (struct clvm_header *) aggreply;
1357 replyargs = aggreply;
1358 /* Move it up so there's room for a header in front of the data */
1359 memmove(aggreply + offsetof(struct clvm_header, args),
1360 replyargs, replylen);
1362 agghead->xid = msg->xid;
1363 agghead->cmd = CLVMD_CMD_REPLY;
1364 agghead->status = status;
1365 agghead->flags = 0;
1366 agghead->clientid = msg->clientid;
1367 agghead->arglen = replylen;
1368 agghead->node[0] = '\0';
1369 send_message(aggreply,
1370 sizeof(struct clvm_header) +
1371 replylen, csid, fd,
1372 "Error sending command reply");
1373 } else {
1374 struct clvm_header head;
1376 DEBUGLOG("Error attempting to realloc return buffer\n");
1377 /* Return a failure response */
1378 head.cmd = CLVMD_CMD_REPLY;
1379 head.status = ENOMEM;
1380 head.flags = 0;
1381 head.clientid = msg->clientid;
1382 head.arglen = 0;
1383 head.node[0] = '\0';
1384 send_message(&head, sizeof(struct clvm_header), csid,
1385 fd, "Error sending ENOMEM command reply");
1386 return;
1390 /* Free buffer if it was malloced */
1391 if (msg_malloced) {
1392 free(msg);
1394 free(replyargs);
1397 /* Add a reply to a command to the list of replies for this client.
1398 If we have got a full set then send them to the waiting client down the local
1399 socket */
1400 static void add_reply_to_list(struct local_client *client, int status,
1401 const char *csid, const char *buf, int len)
1403 struct node_reply *reply;
1405 pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1407 /* Add it to the list of replies */
1408 reply = malloc(sizeof(struct node_reply));
1409 if (reply) {
1410 reply->status = status;
1411 clops->name_from_csid(csid, reply->node);
1412 DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1414 if (len > 0) {
1415 reply->replymsg = malloc(len);
1416 if (!reply->replymsg) {
1417 reply->status = ENOMEM;
1418 } else {
1419 memcpy(reply->replymsg, buf, len);
1421 } else {
1422 reply->replymsg = NULL;
1424 /* Hook it onto the reply chain */
1425 reply->next = client->bits.localsock.replies;
1426 client->bits.localsock.replies = reply;
1427 } else {
1428 /* It's all gone horribly wrong... */
1429 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1430 send_local_reply(client, ENOMEM, client->fd);
1431 return;
1433 DEBUGLOG("Got %d replies, expecting: %d\n",
1434 client->bits.localsock.num_replies + 1,
1435 client->bits.localsock.expected_replies);
1437 /* If we have the whole lot then do the post-process */
1438 if (++client->bits.localsock.num_replies ==
1439 client->bits.localsock.expected_replies) {
1440 /* Post-process the command */
1441 if (client->bits.localsock.threadid) {
1442 pthread_mutex_lock(&client->bits.localsock.mutex);
1443 client->bits.localsock.state = POST_COMMAND;
1444 pthread_cond_signal(&client->bits.localsock.cond);
1445 pthread_mutex_unlock(&client->bits.localsock.mutex);
1448 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1451 /* This is the thread that runs the PRE and post commands for a particular connection */
1452 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1454 struct local_client *client = (struct local_client *) arg;
1455 int status;
1456 int write_status;
1457 sigset_t ss;
1458 int pipe_fd = client->bits.localsock.pipe;
1460 DEBUGLOG("in sub thread: client = %p\n", client);
1462 /* Don't start until the LVM thread is ready */
1463 pthread_mutex_lock(&lvm_start_mutex);
1464 pthread_mutex_unlock(&lvm_start_mutex);
1465 DEBUGLOG("Sub thread ready for work.\n");
1467 /* Ignore SIGUSR1 (handled by master process) but enable
1468 SIGUSR2 (kills subthreads) */
1469 sigemptyset(&ss);
1470 sigaddset(&ss, SIGUSR1);
1471 pthread_sigmask(SIG_BLOCK, &ss, NULL);
1473 sigdelset(&ss, SIGUSR1);
1474 sigaddset(&ss, SIGUSR2);
1475 pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1477 /* Loop around doing PRE and POST functions until the client goes away */
1478 while (!client->bits.localsock.finished) {
1479 /* Execute the code */
1480 status = do_pre_command(client);
1482 if (status)
1483 client->bits.localsock.all_success = 0;
1485 DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1487 /* Tell the parent process we have finished this bit */
1488 do {
1489 write_status = write(pipe_fd, &status, sizeof(int));
1490 if (write_status == sizeof(int))
1491 break;
1492 if (write_status < 0 &&
1493 (errno == EINTR || errno == EAGAIN))
1494 continue;
1495 log_error("Error sending to pipe: %m\n");
1496 break;
1497 } while(1);
1499 if (status) {
1500 client->bits.localsock.state = POST_COMMAND;
1501 goto next_pre;
1504 /* We may need to wait for the condition variable before running the post command */
1505 pthread_mutex_lock(&client->bits.localsock.mutex);
1506 DEBUGLOG("Waiting to do post command - state = %d\n",
1507 client->bits.localsock.state);
1509 if (client->bits.localsock.state != POST_COMMAND) {
1510 pthread_cond_wait(&client->bits.localsock.cond,
1511 &client->bits.localsock.mutex);
1513 pthread_mutex_unlock(&client->bits.localsock.mutex);
1515 DEBUGLOG("Got post command condition...\n");
1517 /* POST function must always run, even if the client aborts */
1518 status = 0;
1519 do_post_command(client);
1521 do {
1522 write_status = write(pipe_fd, &status, sizeof(int));
1523 if (write_status == sizeof(int))
1524 break;
1525 if (write_status < 0 &&
1526 (errno == EINTR || errno == EAGAIN))
1527 continue;
1528 log_error("Error sending to pipe: %m\n");
1529 break;
1530 } while(1);
1531 next_pre:
1532 DEBUGLOG("Waiting for next pre command\n");
1534 pthread_mutex_lock(&client->bits.localsock.mutex);
1535 if (client->bits.localsock.state != PRE_COMMAND &&
1536 !client->bits.localsock.finished) {
1537 pthread_cond_wait(&client->bits.localsock.cond,
1538 &client->bits.localsock.mutex);
1540 pthread_mutex_unlock(&client->bits.localsock.mutex);
1542 DEBUGLOG("Got pre command condition...\n");
1544 DEBUGLOG("Subthread finished\n");
1545 pthread_exit((void *) 0);
1548 /* Process a command on the local node and store the result */
1549 static int process_local_command(struct clvm_header *msg, int msglen,
1550 struct local_client *client,
1551 unsigned short xid)
1553 char *replybuf = malloc(max_cluster_message);
1554 int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1555 int replylen = 0;
1556 int status;
1558 DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1559 decode_cmd(msg->cmd), msg, msglen, client);
1561 if (replybuf == NULL)
1562 return -1;
1564 status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1566 if (status)
1567 client->bits.localsock.all_success = 0;
1569 /* If we took too long then discard the reply */
1570 if (xid == client->xid) {
1571 add_reply_to_list(client, status, our_csid, replybuf, replylen);
1572 } else {
1573 DEBUGLOG
1574 ("Local command took too long, discarding xid %d, current is %d\n",
1575 xid, client->xid);
1578 free(replybuf);
1579 return status;
1582 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1584 struct local_client *client = NULL;
1586 client = find_client(msg->clientid);
1587 if (!client) {
1588 DEBUGLOG("Got message for unknown client 0x%x\n",
1589 msg->clientid);
1590 log_error("Got message for unknown client 0x%x\n",
1591 msg->clientid);
1592 return -1;
1595 if (msg->status)
1596 client->bits.localsock.all_success = 0;
1598 /* Gather replies together for this client id */
1599 if (msg->xid == client->xid) {
1600 add_reply_to_list(client, msg->status, csid, msg->args,
1601 msg->arglen);
1602 } else {
1603 DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1604 msg->xid, client->xid);
1606 return 0;
1609 /* Send an aggregated reply back to the client */
1610 static void send_local_reply(struct local_client *client, int status, int fd)
1612 struct clvm_header *clientreply;
1613 struct node_reply *thisreply = client->bits.localsock.replies;
1614 char *replybuf;
1615 char *ptr;
1616 int message_len = 0;
1618 DEBUGLOG("Send local reply\n");
1620 /* Work out the total size of the reply */
1621 while (thisreply) {
1622 if (thisreply->replymsg)
1623 message_len += strlen(thisreply->replymsg) + 1;
1624 else
1625 message_len++;
1627 message_len += strlen(thisreply->node) + 1 + sizeof(int);
1629 thisreply = thisreply->next;
1632 /* Add in the size of our header */
1633 message_len = message_len + sizeof(struct clvm_header) + 1;
1634 replybuf = malloc(message_len);
1636 clientreply = (struct clvm_header *) replybuf;
1637 clientreply->status = status;
1638 clientreply->cmd = CLVMD_CMD_REPLY;
1639 clientreply->node[0] = '\0';
1640 clientreply->flags = 0;
1642 ptr = clientreply->args;
1644 /* Add in all the replies, and free them as we go */
1645 thisreply = client->bits.localsock.replies;
1646 while (thisreply) {
1647 struct node_reply *tempreply = thisreply;
1649 strcpy(ptr, thisreply->node);
1650 ptr += strlen(thisreply->node) + 1;
1652 if (thisreply->status)
1653 clientreply->flags |= CLVMD_FLAG_NODEERRS;
1655 memcpy(ptr, &thisreply->status, sizeof(int));
1656 ptr += sizeof(int);
1658 if (thisreply->replymsg) {
1659 strcpy(ptr, thisreply->replymsg);
1660 ptr += strlen(thisreply->replymsg) + 1;
1661 } else {
1662 ptr[0] = '\0';
1663 ptr++;
1665 thisreply = thisreply->next;
1667 free(tempreply->replymsg);
1668 free(tempreply);
1671 /* Terminate with an empty node name */
1672 *ptr = '\0';
1674 clientreply->arglen = ptr - clientreply->args + 1;
1676 /* And send it */
1677 send_message(replybuf, message_len, our_csid, fd,
1678 "Error sending REPLY to client");
1679 free(replybuf);
1681 /* Reset comms variables */
1682 client->bits.localsock.replies = NULL;
1683 client->bits.localsock.expected_replies = 0;
1684 client->bits.localsock.in_progress = FALSE;
1685 client->bits.localsock.sent_out = FALSE;
1688 /* Just free a reply chain baceuse it wasn't used. */
1689 static void free_reply(struct local_client *client)
1691 /* Add in all the replies, and free them as we go */
1692 struct node_reply *thisreply = client->bits.localsock.replies;
1693 while (thisreply) {
1694 struct node_reply *tempreply = thisreply;
1696 thisreply = thisreply->next;
1698 free(tempreply->replymsg);
1699 free(tempreply);
1701 client->bits.localsock.replies = NULL;
1704 /* Send our version number to the cluster */
1705 static void send_version_message()
1707 char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1708 struct clvm_header *msg = (struct clvm_header *) message;
1709 int version_nums[3];
1711 msg->cmd = CLVMD_CMD_VERSION;
1712 msg->status = 0;
1713 msg->flags = 0;
1714 msg->clientid = 0;
1715 msg->arglen = sizeof(version_nums);
1717 version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1718 version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1719 version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1721 memcpy(&msg->args, version_nums, sizeof(version_nums));
1723 hton_clvm(msg);
1725 clops->cluster_send_message(message, sizeof(message), NULL,
1726 "Error Sending version number");
1729 /* Send a message to either a local client or another server */
1730 static int send_message(void *buf, int msglen, const char *csid, int fd,
1731 const char *errtext)
1733 int len = 0;
1734 int saved_errno = 0;
1735 struct timespec delay;
1736 struct timespec remtime;
1738 int retry_cnt = 0;
1740 /* Send remote messages down the cluster socket */
1741 if (csid == NULL || !ISLOCAL_CSID(csid)) {
1742 hton_clvm((struct clvm_header *) buf);
1743 return clops->cluster_send_message(buf, msglen, csid, errtext);
1744 } else {
1745 int ptr = 0;
1747 /* Make sure it all goes */
1748 do {
1749 if (retry_cnt > MAX_RETRIES)
1751 errno = saved_errno;
1752 log_error("%s", errtext);
1753 errno = saved_errno;
1754 break;
1757 len = write(fd, buf + ptr, msglen - ptr);
1759 if (len <= 0) {
1760 if (errno == EINTR)
1761 continue;
1762 if (errno == EAGAIN ||
1763 errno == EIO ||
1764 errno == ENOSPC) {
1765 saved_errno = errno;
1766 retry_cnt++;
1768 delay.tv_sec = 0;
1769 delay.tv_nsec = 100000;
1770 remtime.tv_sec = 0;
1771 remtime.tv_nsec = 0;
1772 (void) nanosleep (&delay, &remtime);
1774 continue;
1776 log_error("%s", errtext);
1777 break;
1779 ptr += len;
1780 } while (ptr < msglen);
1782 return len;
1785 static int process_work_item(struct lvm_thread_cmd *cmd)
1787 /* If msg is NULL then this is a cleanup request */
1788 if (cmd->msg == NULL) {
1789 DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1790 cmd_client_cleanup(cmd->client);
1791 free(cmd->client);
1792 return 0;
1795 if (!cmd->remote) {
1796 DEBUGLOG("process_work_item: local\n");
1797 process_local_command(cmd->msg, cmd->msglen, cmd->client,
1798 cmd->xid);
1799 } else {
1800 DEBUGLOG("process_work_item: remote\n");
1801 process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1802 cmd->csid);
1804 return 0;
1808 * Routine that runs in the "LVM thread".
1810 static void lvm_thread_fn(void *arg)
1812 struct dm_list *cmdl, *tmp;
1813 sigset_t ss;
1814 int using_gulm = (int)(long)arg;
1816 DEBUGLOG("LVM thread function started\n");
1818 /* Ignore SIGUSR1 & 2 */
1819 sigemptyset(&ss);
1820 sigaddset(&ss, SIGUSR1);
1821 sigaddset(&ss, SIGUSR2);
1822 pthread_sigmask(SIG_BLOCK, &ss, NULL);
1824 /* Initialise the interface to liblvm */
1825 init_lvm(using_gulm);
1827 /* Allow others to get moving */
1828 pthread_mutex_unlock(&lvm_start_mutex);
1830 /* Now wait for some actual work */
1831 for (;;) {
1832 DEBUGLOG("LVM thread waiting for work\n");
1834 pthread_mutex_lock(&lvm_thread_mutex);
1835 if (dm_list_empty(&lvm_cmd_head))
1836 pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1838 dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1839 struct lvm_thread_cmd *cmd;
1841 cmd =
1842 dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1843 dm_list_del(&cmd->list);
1844 pthread_mutex_unlock(&lvm_thread_mutex);
1846 process_work_item(cmd);
1847 free(cmd->msg);
1848 free(cmd);
1850 pthread_mutex_lock(&lvm_thread_mutex);
1852 pthread_mutex_unlock(&lvm_thread_mutex);
1856 /* Pass down some work to the LVM thread */
1857 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1858 int msglen, const char *csid)
1860 struct lvm_thread_cmd *cmd;
1862 cmd = malloc(sizeof(struct lvm_thread_cmd));
1863 if (!cmd)
1864 return ENOMEM;
1866 if (msglen) {
1867 cmd->msg = malloc(msglen);
1868 if (!cmd->msg) {
1869 log_error("Unable to allocate buffer space\n");
1870 free(cmd);
1871 return -1;
1873 memcpy(cmd->msg, msg, msglen);
1875 else {
1876 cmd->msg = NULL;
1878 cmd->client = client;
1879 cmd->msglen = msglen;
1880 cmd->xid = client->xid;
1882 if (csid) {
1883 memcpy(cmd->csid, csid, max_csid_len);
1884 cmd->remote = 1;
1885 } else {
1886 cmd->remote = 0;
1889 DEBUGLOG
1890 ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1891 cmd, client, msg, msglen, csid, cmd->xid);
1892 pthread_mutex_lock(&lvm_thread_mutex);
1893 dm_list_add(&lvm_cmd_head, &cmd->list);
1894 pthread_cond_signal(&lvm_thread_cond);
1895 pthread_mutex_unlock(&lvm_thread_mutex);
1897 return 0;
1900 /* Return 0 if we can talk to an existing clvmd */
1901 static int check_local_clvmd(void)
1903 int local_socket;
1904 struct sockaddr_un sockaddr;
1905 int ret = 0;
1907 /* Open local socket */
1908 if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
1909 return -1;
1912 memset(&sockaddr, 0, sizeof(sockaddr));
1913 memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1914 sockaddr.sun_family = AF_UNIX;
1916 if (connect(local_socket,(struct sockaddr *) &sockaddr,
1917 sizeof(sockaddr))) {
1918 ret = -1;
1921 close(local_socket);
1922 return ret;
1926 /* Open the local socket, that's the one we talk to libclvm down */
1927 static int open_local_sock()
1929 int local_socket;
1930 struct sockaddr_un sockaddr;
1932 /* Open local socket */
1933 if (CLVMD_SOCKNAME[0] != '\0')
1934 unlink(CLVMD_SOCKNAME);
1935 local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
1936 if (local_socket < 0) {
1937 log_error("Can't create local socket: %m");
1938 return -1;
1940 /* Set Close-on-exec & non-blocking */
1941 fcntl(local_socket, F_SETFD, 1);
1942 fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
1944 memset(&sockaddr, 0, sizeof(sockaddr));
1945 memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1946 sockaddr.sun_family = AF_UNIX;
1947 if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
1948 log_error("can't bind local socket: %m");
1949 close(local_socket);
1950 return -1;
1952 if (listen(local_socket, 1) != 0) {
1953 log_error("listen local: %m");
1954 close(local_socket);
1955 return -1;
1957 if (CLVMD_SOCKNAME[0] != '\0')
1958 chmod(CLVMD_SOCKNAME, 0600);
1960 return local_socket;
1963 void process_message(struct local_client *client, const char *buf, int len,
1964 const char *csid)
1966 struct clvm_header *inheader;
1968 inheader = (struct clvm_header *) buf;
1969 ntoh_clvm(inheader); /* Byteswap fields */
1970 if (inheader->cmd == CLVMD_CMD_REPLY)
1971 process_reply(inheader, len, csid);
1972 else
1973 add_to_lvmqueue(client, inheader, len, csid);
1977 static void check_all_callback(struct local_client *client, const char *csid,
1978 int node_up)
1980 if (!node_up)
1981 add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
1982 18);
1985 /* Check to see if all CLVMDs are running (ie one on
1986 every node in the cluster).
1987 If not, returns -1 and prints out a list of errant nodes */
1988 static int check_all_clvmds_running(struct local_client *client)
1990 DEBUGLOG("check_all_clvmds_running\n");
1991 return clops->cluster_do_node_callback(client, check_all_callback);
1994 /* Return a local_client struct given a client ID.
1995 client IDs are in network byte order */
1996 static struct local_client *find_client(int clientid)
1998 struct local_client *thisfd;
1999 for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
2000 if (thisfd->fd == ntohl(clientid))
2001 return thisfd;
2003 return NULL;
2006 /* Byte-swapping routines for the header so we
2007 work in a heterogeneous environment */
2008 static void hton_clvm(struct clvm_header *hdr)
2010 hdr->status = htonl(hdr->status);
2011 hdr->arglen = htonl(hdr->arglen);
2012 hdr->xid = htons(hdr->xid);
2013 /* Don't swap clientid as it's only a token as far as
2014 remote nodes are concerned */
2017 static void ntoh_clvm(struct clvm_header *hdr)
2019 hdr->status = ntohl(hdr->status);
2020 hdr->arglen = ntohl(hdr->arglen);
2021 hdr->xid = ntohs(hdr->xid);
2024 /* Handler for SIGUSR2 - sent to kill subthreads */
2025 static void sigusr2_handler(int sig)
2027 DEBUGLOG("SIGUSR2 received\n");
2028 return;
2031 static void sigterm_handler(int sig)
2033 DEBUGLOG("SIGTERM received\n");
2034 quit = 1;
2035 return;
2038 static void sighup_handler(int sig)
2040 DEBUGLOG("got SIGHUP\n");
2041 reread_config = 1;
2044 int sync_lock(const char *resource, int mode, int flags, int *lockid)
2046 return clops->sync_lock(resource, mode, flags, lockid);
2049 int sync_unlock(const char *resource, int lockid)
2051 return clops->sync_unlock(resource, lockid);
2054 static if_type_t parse_cluster_interface(char *ifname)
2056 if_type_t iface = IF_AUTO;
2058 if (!strcmp(ifname, "auto"))
2059 iface = IF_AUTO;
2060 if (!strcmp(ifname, "cman"))
2061 iface = IF_CMAN;
2062 if (!strcmp(ifname, "gulm"))
2063 iface = IF_GULM;
2064 if (!strcmp(ifname, "openais"))
2065 iface = IF_OPENAIS;
2066 if (!strcmp(ifname, "corosync"))
2067 iface = IF_COROSYNC;
2069 return iface;
2073 * Try and find a cluster system in corosync's objdb, if it is running. This is
2074 * only called if the command-line option is not present, and if it fails
2075 * we still try the interfaces in order.
2077 static if_type_t get_cluster_type()
2079 #ifdef HAVE_COROSYNC_CONFDB_H
2080 confdb_handle_t handle;
2081 if_type_t type = IF_AUTO;
2082 int result;
2083 char buf[255];
2084 size_t namelen = sizeof(buf);
2085 hdb_handle_t cluster_handle;
2086 hdb_handle_t clvmd_handle;
2087 confdb_callbacks_t callbacks = {
2088 .confdb_key_change_notify_fn = NULL,
2089 .confdb_object_create_change_notify_fn = NULL,
2090 .confdb_object_delete_change_notify_fn = NULL
2093 result = confdb_initialize (&handle, &callbacks);
2094 if (result != CS_OK)
2095 return type;
2097 result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
2098 if (result != CS_OK)
2099 goto out;
2101 result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
2102 if (result != CS_OK)
2103 goto out;
2105 result = confdb_object_find_start(handle, cluster_handle);
2106 if (result != CS_OK)
2107 goto out;
2109 result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
2110 if (result != CS_OK)
2111 goto out;
2113 result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
2114 if (result != CS_OK)
2115 goto out;
2117 buf[namelen] = '\0';
2118 type = parse_cluster_interface(buf);
2119 DEBUGLOG("got interface type '%s' from confdb\n", buf);
2120 out:
2121 confdb_finalize(handle);
2122 return type;
2123 #else
2124 return IF_AUTO;
2125 #endif