Expose confdb write to the library.
[openais.git] / exec / cpg.c
blob6a93d871a9a7856f5f6ad7fb6a5bd67b9127233a
1 /*
2 * Copyright (c) 2006 Red Hat, Inc.
3 * Copyright (c) 2006 Sun Microsystems, Inc.
5 * All rights reserved.
7 * Author: Patrick Caulfield (pcaulfie@redhat.com)
9 * This software licensed under BSD license, the text of which follows:
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
35 #ifndef OPENAIS_BSD
36 #include <alloca.h>
37 #endif
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41 #include <sys/ioctl.h>
42 #include <netinet/in.h>
43 #include <sys/uio.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <stdlib.h>
47 #include <stdio.h>
48 #include <errno.h>
49 #include <signal.h>
50 #include <time.h>
51 #include <unistd.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
55 #include "../include/saAis.h"
56 #include "../include/saClm.h"
57 #include "../include/ipc_gen.h"
58 #include "../include/ipc_cpg.h"
59 #include "../include/mar_cpg.h"
60 #include "../include/list.h"
61 #include "../include/queue.h"
62 #include "../lcr/lcr_comp.h"
63 #include "totempg.h"
64 #include "totemip.h"
65 #include "main.h"
66 #include "flow.h"
67 #include "tlist.h"
68 #include "ipc.h"
69 #include "mempool.h"
70 #include "objdb.h"
71 #include "service.h"
72 #include "jhash.h"
73 #include "swab.h"
74 #include "ipc.h"
75 #include "flow.h"
76 #include "logsys.h"
78 LOGSYS_DECLARE_SUBSYS ("CPG", LOG_INFO);
80 #define GROUP_HASH_SIZE 32
82 #define PI_FLAG_MEMBER 1
84 enum cpg_message_req_types {
85 MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
86 MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
87 MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
88 MESSAGE_REQ_EXEC_CPG_MCAST = 3,
89 MESSAGE_REQ_EXEC_CPG_DOWNLIST = 4
92 struct removed_group
94 struct group_info *gi;
95 struct list_head list; /* on removed_list */
96 int left_list_entries;
97 mar_cpg_address_t left_list[PROCESSOR_COUNT_MAX];
98 int left_list_size;
101 struct group_info {
102 mar_cpg_name_t group_name;
103 struct list_head members;
104 struct list_head list; /* on hash list */
105 struct removed_group *rg; /* when a node goes down */
108 struct process_info {
109 unsigned int nodeid;
110 uint32_t pid;
111 uint32_t flags;
112 void *conn;
113 void *trackerconn;
114 struct group_info *group;
115 enum openais_flow_control_state flow_control_state;
116 struct list_head list; /* on the group_info members list */
119 struct join_list_entry {
120 uint32_t pid;
121 mar_cpg_name_t group_name;
124 static struct list_head group_lists[GROUP_HASH_SIZE];
127 * Service Interfaces required by service_message_handler struct
129 static void cpg_confchg_fn (
130 enum totem_configuration_type configuration_type,
131 unsigned int *member_list, int member_list_entries,
132 unsigned int *left_list, int left_list_entries,
133 unsigned int *joined_list, int joined_list_entries,
134 struct memb_ring_id *ring_id);
136 static int cpg_exec_init_fn (struct objdb_iface_ver0 *objdb);
138 static int cpg_lib_init_fn (void *conn);
140 static int cpg_lib_exit_fn (void *conn);
142 static void message_handler_req_exec_cpg_procjoin (
143 void *message,
144 unsigned int nodeid);
146 static void message_handler_req_exec_cpg_procleave (
147 void *message,
148 unsigned int nodeid);
150 static void message_handler_req_exec_cpg_joinlist (
151 void *message,
152 unsigned int nodeid);
154 static void message_handler_req_exec_cpg_mcast (
155 void *message,
156 unsigned int nodeid);
158 static void message_handler_req_exec_cpg_downlist (
159 void *message,
160 unsigned int nodeid);
162 static void exec_cpg_procjoin_endian_convert (void *msg);
164 static void exec_cpg_joinlist_endian_convert (void *msg);
166 static void exec_cpg_mcast_endian_convert (void *msg);
168 static void exec_cpg_downlist_endian_convert (void *msg);
170 static void message_handler_req_lib_cpg_join (void *conn, void *message);
172 static void message_handler_req_lib_cpg_leave (void *conn, void *message);
174 static void message_handler_req_lib_cpg_mcast (void *conn, void *message);
176 static void message_handler_req_lib_cpg_membership (void *conn, void *message);
178 static void message_handler_req_lib_cpg_trackstart (void *conn, void *message);
180 static void message_handler_req_lib_cpg_trackstop (void *conn, void *message);
182 static void message_handler_req_lib_cpg_local_get (void *conn, void *message);
184 static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason);
186 static int cpg_exec_send_joinlist(void);
188 static void cpg_sync_init (void);
189 static int cpg_sync_process (void);
190 static void cpg_sync_activate (void);
191 static void cpg_sync_abort (void);
193 * Library Handler Definition
195 static struct openais_lib_handler cpg_lib_service[] =
197 { /* 0 */
198 .lib_handler_fn = message_handler_req_lib_cpg_join,
199 .response_size = sizeof (struct res_lib_cpg_join),
200 .response_id = MESSAGE_RES_CPG_JOIN,
201 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
203 { /* 1 */
204 .lib_handler_fn = message_handler_req_lib_cpg_leave,
205 .response_size = sizeof (struct res_lib_cpg_leave),
206 .response_id = MESSAGE_RES_CPG_LEAVE,
207 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
209 { /* 2 */
210 .lib_handler_fn = message_handler_req_lib_cpg_mcast,
211 .response_size = sizeof (struct res_lib_cpg_mcast),
212 .response_id = MESSAGE_RES_CPG_MCAST,
213 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
215 { /* 3 */
216 .lib_handler_fn = message_handler_req_lib_cpg_membership,
217 .response_size = sizeof (mar_res_header_t),
218 .response_id = MESSAGE_RES_CPG_MEMBERSHIP,
219 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
221 { /* 4 */
222 .lib_handler_fn = message_handler_req_lib_cpg_trackstart,
223 .response_size = sizeof (struct res_lib_cpg_trackstart),
224 .response_id = MESSAGE_RES_CPG_TRACKSTART,
225 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
227 { /* 5 */
228 .lib_handler_fn = message_handler_req_lib_cpg_trackstop,
229 .response_size = sizeof (struct res_lib_cpg_trackstart),
230 .response_id = MESSAGE_RES_CPG_TRACKSTOP,
231 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
233 { /* 6 */
234 .lib_handler_fn = message_handler_req_lib_cpg_local_get,
235 .response_size = sizeof (struct res_lib_cpg_local_get),
236 .response_id = MESSAGE_RES_CPG_LOCAL_GET,
237 .flow_control = OPENAIS_FLOW_CONTROL_NOT_REQUIRED
241 static struct openais_exec_handler cpg_exec_service[] =
243 { /* 0 */
244 .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
245 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
247 { /* 1 */
248 .exec_handler_fn = message_handler_req_exec_cpg_procleave,
249 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
251 { /* 2 */
252 .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
253 .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
255 { /* 3 */
256 .exec_handler_fn = message_handler_req_exec_cpg_mcast,
257 .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
259 { /* 4 */
260 .exec_handler_fn = message_handler_req_exec_cpg_downlist,
261 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
265 struct openais_service_handler cpg_service_handler = {
266 .name = "openais cluster closed process group service v1.01",
267 .id = CPG_SERVICE,
268 .private_data_size = sizeof (struct process_info),
269 .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED,
270 .lib_init_fn = cpg_lib_init_fn,
271 .lib_exit_fn = cpg_lib_exit_fn,
272 .lib_service = cpg_lib_service,
273 .lib_service_count = sizeof (cpg_lib_service) / sizeof (struct openais_lib_handler),
274 .exec_init_fn = cpg_exec_init_fn,
275 .exec_dump_fn = NULL,
276 .exec_service = cpg_exec_service,
277 .exec_service_count = sizeof (cpg_exec_service) / sizeof (struct openais_exec_handler),
278 .confchg_fn = cpg_confchg_fn,
279 .sync_init = cpg_sync_init,
280 .sync_process = cpg_sync_process,
281 .sync_activate = cpg_sync_activate,
282 .sync_abort = cpg_sync_abort
286 * Dynamic loader definition
288 static struct openais_service_handler *cpg_get_service_handler_ver0 (void);
290 static struct openais_service_handler_iface_ver0 cpg_service_handler_iface = {
291 .openais_get_service_handler_ver0 = cpg_get_service_handler_ver0
294 static struct lcr_iface openais_cpg_ver0[1] = {
296 .name = "openais_cpg",
297 .version = 0,
298 .versions_replace = 0,
299 .versions_replace_count = 0,
300 .dependencies = 0,
301 .dependency_count = 0,
302 .constructor = NULL,
303 .destructor = NULL,
304 .interfaces = NULL
308 static struct lcr_comp cpg_comp_ver0 = {
309 .iface_count = 1,
310 .ifaces = openais_cpg_ver0
314 static struct openais_service_handler *cpg_get_service_handler_ver0 (void)
316 return (&cpg_service_handler);
319 __attribute__ ((constructor)) static void cpg_comp_register (void) {
320 lcr_interfaces_set (&openais_cpg_ver0[0], &cpg_service_handler_iface);
322 lcr_component_register (&cpg_comp_ver0);
325 struct req_exec_cpg_procjoin {
326 mar_req_header_t header __attribute__((aligned(8)));
327 mar_cpg_name_t group_name __attribute__((aligned(8)));
328 mar_uint32_t pid __attribute__((aligned(8)));
329 mar_uint32_t reason __attribute__((aligned(8)));
332 struct req_exec_cpg_mcast {
333 mar_req_header_t header __attribute__((aligned(8)));
334 mar_cpg_name_t group_name __attribute__((aligned(8)));
335 mar_uint32_t msglen __attribute__((aligned(8)));
336 mar_uint32_t pid __attribute__((aligned(8)));
337 mar_message_source_t source __attribute__((aligned(8)));
338 mar_uint8_t message[] __attribute__((aligned(8)));
341 struct req_exec_cpg_downlist {
342 mar_req_header_t header __attribute__((aligned(8)));
343 mar_uint32_t left_nodes __attribute__((aligned(8)));
344 mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
347 static struct req_exec_cpg_downlist req_exec_cpg_downlist;
349 static void cpg_sync_init (void)
353 static int cpg_sync_process (void)
355 return cpg_exec_send_joinlist();
358 static void cpg_sync_activate (void)
362 static void cpg_sync_abort (void)
369 static int notify_lib_joinlist(
370 struct group_info *gi,
371 void *conn,
372 int joined_list_entries,
373 mar_cpg_address_t *joined_list,
374 int left_list_entries,
375 mar_cpg_address_t *left_list,
376 int id)
378 int count = 0;
379 char *buf;
380 struct res_lib_cpg_confchg_callback *res;
381 struct list_head *iter;
382 struct list_head *tmp;
383 mar_cpg_address_t *retgi;
384 int size;
386 /* First, we need to know how many nodes are in the list. While we're
387 traversing this list, look for the 'us' entry so we know which
388 connection to send back down */
389 for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
390 struct process_info *pi = list_entry(iter, struct process_info, list);
391 if (pi->pid)
392 count++;
395 log_printf(LOG_LEVEL_DEBUG, "Sending new joinlist (%d elements) to clients\n", count);
397 size = sizeof(struct res_lib_cpg_confchg_callback) +
398 sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
399 buf = alloca(size);
400 if (!buf)
401 return SA_AIS_ERR_NO_SPACE;
403 res = (struct res_lib_cpg_confchg_callback *)buf;
404 res->joined_list_entries = joined_list_entries;
405 res->left_list_entries = left_list_entries;
406 retgi = res->member_list;
408 res->header.size = size;
409 res->header.id = id;
410 memcpy(&res->group_name, &gi->group_name, sizeof(mar_cpg_name_t));
412 /* Build up the message */
413 count = 0;
414 for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
415 struct process_info *pi = list_entry(iter, struct process_info, list);
416 if (pi->pid) {
417 /* Processes leaving will be removed AFTER this is done (so that they get their
418 own leave notifications), so exclude them from the members list here */
419 int i;
420 for (i=0; i<left_list_entries; i++) {
421 if (left_list[i].pid == pi->pid && left_list[i].nodeid == pi->nodeid)
422 goto next_member;
425 retgi->nodeid = pi->nodeid;
426 retgi->pid = pi->pid;
427 retgi++;
428 count++;
429 next_member: ;
432 res->member_list_entries = count;
434 if (left_list_entries) {
435 memcpy(retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
436 retgi += left_list_entries;
439 if (joined_list_entries) {
440 memcpy(retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
441 retgi += joined_list_entries;
444 if (conn) {
445 openais_conn_send_response(conn, buf, size);
447 else {
448 /* Send it to all listeners */
449 for (iter = gi->members.next, tmp=iter->next; iter != &gi->members; iter = tmp, tmp=iter->next) {
450 struct process_info *pi = list_entry(iter, struct process_info, list);
451 if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
452 if (openais_conn_send_response(pi->trackerconn, buf, size) == -1) {
453 // Error ??
459 return SA_AIS_OK;
462 static void remove_group(struct group_info *gi)
464 list_del(&gi->list);
465 free(gi);
469 static int cpg_exec_init_fn (struct objdb_iface_ver0 *objdb)
471 int i;
473 for (i=0; i<GROUP_HASH_SIZE; i++) {
474 list_init(&group_lists[i]);
477 return (0);
480 static int cpg_lib_exit_fn (void *conn)
482 struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn);
483 struct group_info *gi = pi->group;
484 mar_cpg_address_t notify_info;
486 log_printf(LOG_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn);
488 if (gi) {
489 notify_info.pid = pi->pid;
490 notify_info.nodeid = totempg_my_nodeid_get();
491 notify_info.reason = CONFCHG_CPG_REASON_PROCDOWN;
492 cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN);
493 list_del(&pi->list);
495 return (0);
498 static struct group_info *get_group(mar_cpg_name_t *name)
500 struct list_head *iter;
501 struct group_info *gi = NULL;
502 struct group_info *itergi;
503 uint32_t hash = jhash(name->value, name->length, 0) % GROUP_HASH_SIZE;
505 for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) {
506 itergi = list_entry(iter, struct group_info, list);
507 if (memcmp(itergi->group_name.value, name->value, name->length) == 0) {
508 gi = itergi;
509 break;
513 if (!gi) {
514 gi = malloc(sizeof(struct group_info));
515 if (!gi) {
516 log_printf(LOG_LEVEL_WARNING, "Unable to allocate group_info struct");
517 return NULL;
519 memcpy(&gi->group_name, name, sizeof(mar_cpg_name_t));
520 gi->rg = NULL;
521 list_init(&gi->members);
522 list_add(&gi->list, &group_lists[hash]);
524 return gi;
527 static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason)
529 struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
530 struct iovec req_exec_cpg_iovec;
531 int result;
533 memcpy(&req_exec_cpg_procjoin.group_name, &gi->group_name, sizeof(mar_cpg_name_t));
534 req_exec_cpg_procjoin.pid = pi->pid;
535 req_exec_cpg_procjoin.reason = reason;
537 req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
538 req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
540 req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
541 req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
543 result = totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED);
545 return (result);
548 static void remove_node_from_groups(
549 unsigned int nodeid,
550 struct list_head *remlist)
552 int i;
553 struct list_head *iter, *iter2, *tmp;
554 struct process_info *pi;
555 struct group_info *gi;
557 for (i=0; i < GROUP_HASH_SIZE; i++) {
558 for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) {
559 gi = list_entry(iter, struct group_info, list);
560 for (iter2 = gi->members.next, tmp = iter2->next; iter2 != &gi->members; iter2 = tmp, tmp = iter2->next) {
561 pi = list_entry(iter2, struct process_info, list);
563 if (pi->nodeid == nodeid) {
565 /* Add it to the list of nodes to send notifications for */
566 if (!gi->rg) {
567 gi->rg = malloc(sizeof(struct removed_group));
568 if (gi->rg) {
569 list_add(&gi->rg->list, remlist);
570 gi->rg->gi = gi;
571 gi->rg->left_list_entries = 0;
572 gi->rg->left_list_size = PROCESSOR_COUNT_MAX;
574 else {
575 log_printf(LOG_LEVEL_CRIT, "Unable to allocate removed group struct. CPG callbacks will be junk.");
576 return;
579 /* Do we need to increase the size ?
580 * Yes, I increase this exponentially. Generally, if you've got a lot of groups,
581 * you'll have a /lot/ of groups, and cgp_groupinfo is pretty small anyway
583 if (gi->rg->left_list_size == gi->rg->left_list_entries) {
584 int newsize;
585 struct removed_group *newrg;
587 list_del(&gi->rg->list);
588 newsize = gi->rg->left_list_size * 2;
589 newrg = realloc(gi->rg, sizeof(struct removed_group) + newsize*sizeof(mar_cpg_address_t));
590 if (!newrg) {
591 log_printf(LOG_LEVEL_CRIT, "Unable to realloc removed group struct. CPG callbacks will be junk.");
592 return;
594 newrg->left_list_size = newsize+PROCESSOR_COUNT_MAX;
595 gi->rg = newrg;
596 list_add(&gi->rg->list, remlist);
598 gi->rg->left_list[gi->rg->left_list_entries].pid = pi->pid;
599 gi->rg->left_list[gi->rg->left_list_entries].nodeid = pi->nodeid;
600 gi->rg->left_list[gi->rg->left_list_entries].reason = CONFCHG_CPG_REASON_NODEDOWN;
601 gi->rg->left_list_entries++;
603 /* Remove node info for dead node */
604 list_del(&pi->list);
605 free(pi);
613 static void cpg_confchg_fn (
614 enum totem_configuration_type configuration_type,
615 unsigned int *member_list, int member_list_entries,
616 unsigned int *left_list, int left_list_entries,
617 unsigned int *joined_list, int joined_list_entries,
618 struct memb_ring_id *ring_id)
620 int i;
621 uint32_t lowest_nodeid = 0xffffff;
622 struct iovec req_exec_cpg_iovec;
624 /* We don't send the library joinlist in here because it can end up
625 out of order with the rest of the messages (which are totem ordered).
626 So we get the lowest nodeid to send out a list of left nodes instead.
627 On receipt of that message, all nodes will then notify their local clients
628 of the new joinlist */
630 if (left_list_entries) {
631 for (i = 0; i < member_list_entries; i++) {
632 if (member_list[i] < lowest_nodeid)
633 lowest_nodeid = member_list[i];
636 log_printf(LOG_LEVEL_DEBUG, "confchg, low nodeid=%d, us = %d\n", lowest_nodeid, totempg_my_nodeid_get());
637 if (lowest_nodeid == totempg_my_nodeid_get()) {
639 req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
640 req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
642 req_exec_cpg_downlist.left_nodes = left_list_entries;
643 for (i = 0; i < left_list_entries; i++) {
644 req_exec_cpg_downlist.nodeids[i] = left_list[i];
646 log_printf(LOG_LEVEL_DEBUG, "confchg, build downlist: %d nodes\n", left_list_entries);
650 /* Don't send this message until we get the final configuration message */
651 if (configuration_type == TOTEM_CONFIGURATION_REGULAR && req_exec_cpg_downlist.left_nodes) {
652 req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_downlist;
653 req_exec_cpg_iovec.iov_len = req_exec_cpg_downlist.header.size;
655 totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED);
656 req_exec_cpg_downlist.left_nodes = 0;
657 log_printf(LOG_LEVEL_DEBUG, "confchg, sent downlist\n");
661 static void cpg_flow_control_state_set_fn (
662 void *context,
663 enum openais_flow_control_state flow_control_state)
665 struct process_info *process_info = (struct process_info *)context;
667 process_info->flow_control_state = flow_control_state;
670 /* Can byteswap join & leave messages */
671 static void exec_cpg_procjoin_endian_convert (void *msg)
673 struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)msg;
675 req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
676 swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
677 req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
680 static void exec_cpg_joinlist_endian_convert (void *msg)
682 mar_res_header_t *res = (mar_res_header_t *)msg;
683 struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(mar_res_header_t));
685 /* XXX shouldn't mar_res_header be swabbed? */
687 while ((void*)jle < msg + res->size) {
688 jle->pid = swab32(jle->pid);
689 swab_mar_cpg_name_t (&jle->group_name);
690 jle++;
694 static void exec_cpg_downlist_endian_convert (void *msg)
696 struct req_exec_cpg_downlist *req_exec_cpg_downlist = (struct req_exec_cpg_downlist *)msg;
697 unsigned int i;
699 req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
701 for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
702 req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
707 static void exec_cpg_mcast_endian_convert (void *msg)
709 struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)msg;
711 swab_mar_req_header_t (&req_exec_cpg_mcast->header);
712 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
713 req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
714 req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
715 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
718 static void do_proc_join(
719 mar_cpg_name_t *name,
720 uint32_t pid,
721 unsigned int nodeid,
722 int reason)
724 struct group_info *gi;
725 struct process_info *pi;
726 struct list_head *iter;
727 mar_cpg_address_t notify_info;
729 gi = get_group(name); /* this will always succeed ! */
730 assert(gi);
732 /* See if it already exists in this group */
733 for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
734 pi = list_entry(iter, struct process_info, list);
735 if (pi->pid == pid && pi->nodeid == nodeid) {
737 /* It could be a local join message */
738 if ((nodeid == totempg_my_nodeid_get()) &&
739 (!pi->flags & PI_FLAG_MEMBER)) {
740 goto local_join;
741 } else {
742 return;
747 pi = malloc(sizeof(struct process_info));
748 if (!pi) {
749 log_printf(LOG_LEVEL_WARNING, "Unable to allocate process_info struct");
750 return;
752 pi->nodeid = nodeid;
753 pi->pid = pid;
754 pi->group = gi;
755 pi->conn = NULL;
756 pi->trackerconn = NULL;
757 list_add_tail(&pi->list, &gi->members);
759 local_join:
761 pi->flags = PI_FLAG_MEMBER;
762 notify_info.pid = pi->pid;
763 notify_info.nodeid = nodeid;
764 notify_info.reason = reason;
766 notify_lib_joinlist(gi, NULL,
767 1, &notify_info,
768 0, NULL,
769 MESSAGE_RES_CPG_CONFCHG_CALLBACK);
772 static void message_handler_req_exec_cpg_downlist (
773 void *message,
774 unsigned int nodeid)
776 struct req_exec_cpg_downlist *req_exec_cpg_downlist = (struct req_exec_cpg_downlist *)message;
777 int i;
778 struct list_head removed_list;
780 log_printf(LOG_LEVEL_DEBUG, "downlist left_list: %d\n", req_exec_cpg_downlist->left_nodes);
782 list_init(&removed_list);
784 /* Remove nodes from joined groups and add removed groups to the list */
785 for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
786 remove_node_from_groups( req_exec_cpg_downlist->nodeids[i], &removed_list);
789 if (!list_empty(&removed_list)) {
790 struct list_head *iter, *tmp;
792 for (iter = removed_list.next, tmp=iter->next; iter != &removed_list; iter = tmp, tmp = iter->next) {
793 struct removed_group *rg = list_entry(iter, struct removed_group, list);
795 notify_lib_joinlist(rg->gi, NULL,
796 0, NULL,
797 rg->left_list_entries, rg->left_list,
798 MESSAGE_RES_CPG_CONFCHG_CALLBACK);
799 rg->gi->rg = NULL;
800 free(rg);
805 static void message_handler_req_exec_cpg_procjoin (
806 void *message,
807 unsigned int nodeid)
809 struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)message;
811 log_printf(LOG_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid);
813 do_proc_join(&req_exec_cpg_procjoin->group_name,
814 req_exec_cpg_procjoin->pid, nodeid,
815 CONFCHG_CPG_REASON_JOIN);
818 static void message_handler_req_exec_cpg_procleave (
819 void *message,
820 unsigned int nodeid)
822 struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = (struct req_exec_cpg_procjoin *)message;
823 struct group_info *gi;
824 struct process_info *pi;
825 struct list_head *iter;
826 mar_cpg_address_t notify_info;
828 log_printf(LOG_LEVEL_DEBUG, "got procleave message from cluster node %d\n", nodeid);
830 gi = get_group(&req_exec_cpg_procjoin->group_name); /* this will always succeed ! */
831 assert(gi);
833 notify_info.pid = req_exec_cpg_procjoin->pid;
834 notify_info.nodeid = nodeid;
835 notify_info.reason = req_exec_cpg_procjoin->reason;
837 notify_lib_joinlist(gi, NULL,
838 0, NULL,
839 1, &notify_info,
840 MESSAGE_RES_CPG_CONFCHG_CALLBACK);
842 /* Find the node/PID to remove */
843 for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
844 pi = list_entry(iter, struct process_info, list);
845 if (pi->pid == req_exec_cpg_procjoin->pid &&
846 pi->nodeid == nodeid) {
848 list_del(&pi->list);
849 if (!pi->conn)
850 free(pi);
851 else
852 pi->pid = 0;
854 if (list_empty(&gi->members)) {
855 remove_group(gi);
857 break;
863 /* Got a proclist from another node */
864 static void message_handler_req_exec_cpg_joinlist (
865 void *message,
866 unsigned int nodeid)
868 mar_res_header_t *res = (mar_res_header_t *)message;
869 struct join_list_entry *jle = (struct join_list_entry *)(message + sizeof(mar_res_header_t));
871 log_printf(LOG_LEVEL_NOTICE, "got joinlist message from node %d\n",
872 nodeid);
874 /* Ignore our own messages */
875 if (nodeid == totempg_my_nodeid_get()) {
876 return;
879 while ((void*)jle < message + res->size) {
880 do_proc_join(&jle->group_name, jle->pid, nodeid,
881 CONFCHG_CPG_REASON_NODEUP);
882 jle++;
886 static void message_handler_req_exec_cpg_mcast (
887 void *message,
888 unsigned int nodeid)
890 struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message;
891 struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
892 struct process_info *process_info;
893 int msglen = req_exec_cpg_mcast->msglen;
894 char buf[sizeof(*res_lib_cpg_mcast) + msglen];
895 struct group_info *gi;
896 struct list_head *iter;
899 * Track local messages so that flow is controlled on the local node
901 gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */
902 assert(gi);
904 res_lib_cpg_mcast = (struct res_lib_cpg_deliver_callback *)buf;
905 res_lib_cpg_mcast->header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK;
906 res_lib_cpg_mcast->header.size = sizeof(*res_lib_cpg_mcast) + msglen;
907 res_lib_cpg_mcast->msglen = msglen;
908 res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid;
909 res_lib_cpg_mcast->nodeid = nodeid;
910 res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
911 if (message_source_is_local (&req_exec_cpg_mcast->source)) {
912 openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast->source.conn);
913 process_info = (struct process_info *)openais_conn_private_data_get (req_exec_cpg_mcast->source.conn);
914 res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state;
916 memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
917 sizeof(mar_cpg_name_t));
918 memcpy(&res_lib_cpg_mcast->message, (char*)message+sizeof(*req_exec_cpg_mcast),
919 msglen);
921 /* Send to all interested members */
922 for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
923 struct process_info *pi = list_entry(iter, struct process_info, list);
924 if (pi->trackerconn) {
925 openais_conn_send_response(
926 pi->trackerconn,
927 buf,
928 res_lib_cpg_mcast->header.size);
934 static int cpg_exec_send_joinlist(void)
936 int count = 0;
937 char *buf;
938 int i;
939 struct list_head *iter;
940 struct list_head *iter2;
941 struct group_info *gi;
942 mar_res_header_t *res;
943 struct join_list_entry *jle;
944 struct iovec req_exec_cpg_iovec;
946 log_printf(LOG_LEVEL_DEBUG, "sending joinlist to cluster\n");
948 /* Count the number of groups we are a member of */
949 for (i=0; i<GROUP_HASH_SIZE; i++) {
950 for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) {
951 gi = list_entry(iter, struct group_info, list);
952 for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) {
953 struct process_info *pi = list_entry(iter2, struct process_info, list);
954 if (pi->pid && pi->nodeid == totempg_my_nodeid_get()) {
955 count++;
961 /* Nothing to send */
962 if (!count)
963 return 0;
965 buf = alloca(sizeof(mar_res_header_t) + sizeof(struct join_list_entry) * count);
966 if (!buf) {
967 log_printf(LOG_LEVEL_WARNING, "Unable to allocate joinlist buffer");
968 return -1;
971 jle = (struct join_list_entry *)(buf + sizeof(mar_res_header_t));
972 res = (mar_res_header_t *)buf;
974 for (i=0; i<GROUP_HASH_SIZE; i++) {
975 for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) {
977 gi = list_entry(iter, struct group_info, list);
978 for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) {
980 struct process_info *pi = list_entry(iter2, struct process_info, list);
981 if (pi->pid && pi->nodeid == totempg_my_nodeid_get()) {
982 memcpy(&jle->group_name, &gi->group_name, sizeof(mar_cpg_name_t));
983 jle->pid = pi->pid;
984 jle++;
990 res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST);
991 res->size = sizeof(mar_res_header_t)+sizeof(struct join_list_entry) * count;
993 req_exec_cpg_iovec.iov_base = buf;
994 req_exec_cpg_iovec.iov_len = res->size;
996 return totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED);
999 static int cpg_lib_init_fn (void *conn)
1001 struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn);
1002 pi->conn = conn;
1004 log_printf(LOG_LEVEL_DEBUG, "lib_init_fn: conn=%p, pi=%p\n", conn, pi);
1005 return (0);
1008 /* Join message from the library */
1009 static void message_handler_req_lib_cpg_join (void *conn, void *message)
1011 struct req_lib_cpg_join *req_lib_cpg_join = (struct req_lib_cpg_join *)message;
1012 struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn);
1013 struct res_lib_cpg_join res_lib_cpg_join;
1014 struct group_info *gi;
1015 SaAisErrorT error = SA_AIS_OK;
1017 log_printf(LOG_LEVEL_DEBUG, "got join request on %p, pi=%p, pi->pid=%d\n", conn, pi, pi->pid);
1019 /* Already joined on this conn */
1020 if (pi->pid) {
1021 error = SA_AIS_ERR_INVALID_PARAM;
1022 goto join_err;
1025 gi = get_group(&req_lib_cpg_join->group_name);
1026 if (!gi) {
1027 error = SA_AIS_ERR_NO_SPACE;
1028 goto join_err;
1031 openais_ipc_flow_control_create (
1032 conn,
1033 CPG_SERVICE,
1034 req_lib_cpg_join->group_name.value,
1035 req_lib_cpg_join->group_name.length,
1036 cpg_flow_control_state_set_fn,
1037 pi);
1039 /* Add a node entry for us */
1040 pi->nodeid = totempg_my_nodeid_get();
1041 pi->pid = req_lib_cpg_join->pid;
1042 pi->group = gi;
1043 list_add(&pi->list, &gi->members);
1045 /* Tell the rest of the cluster */
1046 cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN);
1048 join_err:
1049 res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
1050 res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN;
1051 res_lib_cpg_join.header.error = error;
1052 openais_conn_send_response(conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
1055 /* Leave message from the library */
1056 static void message_handler_req_lib_cpg_leave (void *conn, void *message)
1058 struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn);
1059 struct res_lib_cpg_leave res_lib_cpg_leave;
1060 struct group_info *gi;
1061 SaAisErrorT error = SA_AIS_OK;
1063 log_printf(LOG_LEVEL_DEBUG, "got leave request on %p\n", conn);
1065 if (!pi || !pi->pid || !pi->group) {
1066 error = SA_AIS_ERR_INVALID_PARAM;
1067 goto leave_ret;
1069 gi = pi->group;
1071 /* Tell other nodes we are leaving.
1072 When we get this message back we will leave too */
1073 cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE);
1074 pi->group = NULL;
1076 openais_ipc_flow_control_destroy (
1077 conn,
1078 CPG_SERVICE,
1079 (unsigned char *)gi->group_name.value,
1080 (unsigned int)gi->group_name.length);
1082 leave_ret:
1083 /* send return */
1084 res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
1085 res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE;
1086 res_lib_cpg_leave.header.error = error;
1087 openais_conn_send_response(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave));
1090 /* Mcast message from the library */
1091 static void message_handler_req_lib_cpg_mcast (void *conn, void *message)
1093 struct req_lib_cpg_mcast *req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)message;
1094 struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn);
1095 struct group_info *gi = pi->group;
1096 struct iovec req_exec_cpg_iovec[2];
1097 struct req_exec_cpg_mcast req_exec_cpg_mcast;
1098 struct res_lib_cpg_mcast res_lib_cpg_mcast;
1099 int msglen = req_lib_cpg_mcast->msglen;
1100 int result;
1102 log_printf(LOG_LEVEL_DEBUG, "got mcast request on %p\n", conn);
1104 /* Can't send if we're not joined */
1105 if (!gi) {
1106 res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
1107 res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
1108 res_lib_cpg_mcast.header.error = SA_AIS_ERR_ACCESS; /* TODO Better error code ?? */
1109 res_lib_cpg_mcast.flow_control_state = CPG_FLOW_CONTROL_DISABLED;
1110 openais_conn_send_response(conn, &res_lib_cpg_mcast,
1111 sizeof(res_lib_cpg_mcast));
1112 return;
1115 req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
1116 req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
1117 MESSAGE_REQ_EXEC_CPG_MCAST);
1118 req_exec_cpg_mcast.pid = pi->pid;
1119 req_exec_cpg_mcast.msglen = msglen;
1120 message_source_set (&req_exec_cpg_mcast.source, conn);
1121 memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name,
1122 sizeof(mar_cpg_name_t));
1124 req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1125 req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1126 req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
1127 req_exec_cpg_iovec[1].iov_len = msglen;
1129 // TODO: guarantee type...
1130 result = totempg_groups_mcast_joined (openais_group_handle, req_exec_cpg_iovec, 2, TOTEMPG_AGREED);
1131 openais_ipc_flow_control_local_increment (conn);
1133 res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
1134 res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
1135 res_lib_cpg_mcast.header.error = SA_AIS_OK;
1136 res_lib_cpg_mcast.flow_control_state = pi->flow_control_state;
1137 openais_conn_send_response(conn, &res_lib_cpg_mcast,
1138 sizeof(res_lib_cpg_mcast));
1141 static void message_handler_req_lib_cpg_membership (void *conn, void *message)
1143 struct process_info *pi = (struct process_info *)openais_conn_private_data_get (conn);
1145 log_printf(LOG_LEVEL_DEBUG, "got membership request on %p\n", conn);
1146 if (!pi->group) {
1147 mar_res_header_t res;
1148 res.size = sizeof(res);
1149 res.id = MESSAGE_RES_CPG_MEMBERSHIP;
1150 res.error = SA_AIS_ERR_ACCESS; /* TODO Better error code */
1151 openais_conn_send_response(conn, &res, sizeof(res));
1152 return;
1155 notify_lib_joinlist(pi->group, conn, 0, NULL, 0, NULL, MESSAGE_RES_CPG_MEMBERSHIP);
1159 static void message_handler_req_lib_cpg_trackstart (void *conn, void *message)
1161 struct req_lib_cpg_trackstart *req_lib_cpg_trackstart = (struct req_lib_cpg_trackstart *)message;
1162 struct res_lib_cpg_trackstart res_lib_cpg_trackstart;
1163 struct group_info *gi;
1164 struct process_info *otherpi;
1165 void *otherconn;
1166 SaAisErrorT error = SA_AIS_OK;
1168 log_printf(LOG_LEVEL_DEBUG, "got trackstart request on %p\n", conn);
1170 gi = get_group(&req_lib_cpg_trackstart->group_name);
1171 if (!gi) {
1172 error = SA_AIS_ERR_NO_SPACE;
1173 goto tstart_ret;
1176 /* Find the partner connection and add us to it's process_info struct */
1177 otherconn = openais_conn_partner_get (conn);
1178 otherpi = (struct process_info *)openais_conn_private_data_get (conn);
1179 otherpi->trackerconn = conn;
1181 tstart_ret:
1182 res_lib_cpg_trackstart.header.size = sizeof(res_lib_cpg_trackstart);
1183 res_lib_cpg_trackstart.header.id = MESSAGE_RES_CPG_TRACKSTART;
1184 res_lib_cpg_trackstart.header.error = SA_AIS_OK;
1185 openais_conn_send_response(conn, &res_lib_cpg_trackstart, sizeof(res_lib_cpg_trackstart));
1188 static void message_handler_req_lib_cpg_trackstop (void *conn, void *message)
1190 struct req_lib_cpg_trackstop *req_lib_cpg_trackstop = (struct req_lib_cpg_trackstop *)message;
1191 struct res_lib_cpg_trackstop res_lib_cpg_trackstop;
1192 struct process_info *otherpi;
1193 void *otherconn;
1194 struct group_info *gi;
1195 SaAisErrorT error = SA_AIS_OK;
1197 log_printf(LOG_LEVEL_DEBUG, "got trackstop request on %p\n", conn);
1199 gi = get_group(&req_lib_cpg_trackstop->group_name);
1200 if (!gi) {
1201 error = SA_AIS_ERR_NO_SPACE;
1202 goto tstop_ret;
1205 /* Find the partner connection and add us to it's process_info struct */
1206 otherconn = openais_conn_partner_get (conn);
1207 otherpi = (struct process_info *)openais_conn_private_data_get (conn);
1208 otherpi->trackerconn = NULL;
1210 tstop_ret:
1211 res_lib_cpg_trackstop.header.size = sizeof(res_lib_cpg_trackstop);
1212 res_lib_cpg_trackstop.header.id = MESSAGE_RES_CPG_TRACKSTOP;
1213 res_lib_cpg_trackstop.header.error = SA_AIS_OK;
1214 openais_conn_send_response(conn, &res_lib_cpg_trackstop.header, sizeof(res_lib_cpg_trackstop));
1217 static void message_handler_req_lib_cpg_local_get (void *conn, void *message)
1219 struct res_lib_cpg_local_get res_lib_cpg_local_get;
1221 res_lib_cpg_local_get.header.size = sizeof(res_lib_cpg_local_get);
1222 res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
1223 res_lib_cpg_local_get.header.error = SA_AIS_OK;
1224 res_lib_cpg_local_get.local_nodeid = totempg_my_nodeid_get ();
1226 openais_conn_send_response(conn, &res_lib_cpg_local_get,
1227 sizeof(res_lib_cpg_local_get));