2 * Copyright (c) 2006 Red Hat, Inc.
3 * Copyright (c) 2006 Sun Microsystems, Inc.
7 * Author: Patrick Caulfield (pcaulfie@redhat.com)
9 * This software licensed under BSD license, the text of which follows:
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
38 #include <sys/types.h>
39 #include <sys/socket.h>
41 #include <sys/ioctl.h>
42 #include <netinet/in.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
55 #include "../include/saAis.h"
56 #include "../include/saClm.h"
57 #include "../include/ipc_gen.h"
58 #include "../include/ipc_cpg.h"
59 #include "../include/mar_cpg.h"
60 #include "../include/list.h"
61 #include "../include/queue.h"
62 #include "../lcr/lcr_comp.h"
78 LOGSYS_DECLARE_SUBSYS ("CPG", LOG_INFO
);
80 #define GROUP_HASH_SIZE 32
82 #define PI_FLAG_MEMBER 1
84 enum cpg_message_req_types
{
85 MESSAGE_REQ_EXEC_CPG_PROCJOIN
= 0,
86 MESSAGE_REQ_EXEC_CPG_PROCLEAVE
= 1,
87 MESSAGE_REQ_EXEC_CPG_JOINLIST
= 2,
88 MESSAGE_REQ_EXEC_CPG_MCAST
= 3,
89 MESSAGE_REQ_EXEC_CPG_DOWNLIST
= 4
94 struct group_info
*gi
;
95 struct list_head list
; /* on removed_list */
96 int left_list_entries
;
97 mar_cpg_address_t left_list
[PROCESSOR_COUNT_MAX
];
102 mar_cpg_name_t group_name
;
103 struct list_head members
;
104 struct list_head list
; /* on hash list */
105 struct removed_group
*rg
; /* when a node goes down */
108 struct process_info
{
114 struct group_info
*group
;
115 enum openais_flow_control_state flow_control_state
;
116 struct list_head list
; /* on the group_info members list */
119 struct join_list_entry
{
121 mar_cpg_name_t group_name
;
124 static struct list_head group_lists
[GROUP_HASH_SIZE
];
127 * Service Interfaces required by service_message_handler struct
129 static void cpg_confchg_fn (
130 enum totem_configuration_type configuration_type
,
131 unsigned int *member_list
, int member_list_entries
,
132 unsigned int *left_list
, int left_list_entries
,
133 unsigned int *joined_list
, int joined_list_entries
,
134 struct memb_ring_id
*ring_id
);
136 static int cpg_exec_init_fn (struct objdb_iface_ver0
*objdb
);
138 static int cpg_lib_init_fn (void *conn
);
140 static int cpg_lib_exit_fn (void *conn
);
142 static void message_handler_req_exec_cpg_procjoin (
144 unsigned int nodeid
);
146 static void message_handler_req_exec_cpg_procleave (
148 unsigned int nodeid
);
150 static void message_handler_req_exec_cpg_joinlist (
152 unsigned int nodeid
);
154 static void message_handler_req_exec_cpg_mcast (
156 unsigned int nodeid
);
158 static void message_handler_req_exec_cpg_downlist (
160 unsigned int nodeid
);
162 static void exec_cpg_procjoin_endian_convert (void *msg
);
164 static void exec_cpg_joinlist_endian_convert (void *msg
);
166 static void exec_cpg_mcast_endian_convert (void *msg
);
168 static void exec_cpg_downlist_endian_convert (void *msg
);
170 static void message_handler_req_lib_cpg_join (void *conn
, void *message
);
172 static void message_handler_req_lib_cpg_leave (void *conn
, void *message
);
174 static void message_handler_req_lib_cpg_mcast (void *conn
, void *message
);
176 static void message_handler_req_lib_cpg_membership (void *conn
, void *message
);
178 static void message_handler_req_lib_cpg_trackstart (void *conn
, void *message
);
180 static void message_handler_req_lib_cpg_trackstop (void *conn
, void *message
);
182 static void message_handler_req_lib_cpg_local_get (void *conn
, void *message
);
184 static int cpg_node_joinleave_send (struct group_info
*gi
, struct process_info
*pi
, int fn
, int reason
);
186 static int cpg_exec_send_joinlist(void);
188 static void cpg_sync_init (void);
189 static int cpg_sync_process (void);
190 static void cpg_sync_activate (void);
191 static void cpg_sync_abort (void);
193 * Library Handler Definition
195 static struct openais_lib_handler cpg_lib_service
[] =
198 .lib_handler_fn
= message_handler_req_lib_cpg_join
,
199 .response_size
= sizeof (struct res_lib_cpg_join
),
200 .response_id
= MESSAGE_RES_CPG_JOIN
,
201 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
204 .lib_handler_fn
= message_handler_req_lib_cpg_leave
,
205 .response_size
= sizeof (struct res_lib_cpg_leave
),
206 .response_id
= MESSAGE_RES_CPG_LEAVE
,
207 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
210 .lib_handler_fn
= message_handler_req_lib_cpg_mcast
,
211 .response_size
= sizeof (struct res_lib_cpg_mcast
),
212 .response_id
= MESSAGE_RES_CPG_MCAST
,
213 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
216 .lib_handler_fn
= message_handler_req_lib_cpg_membership
,
217 .response_size
= sizeof (mar_res_header_t
),
218 .response_id
= MESSAGE_RES_CPG_MEMBERSHIP
,
219 .flow_control
= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
222 .lib_handler_fn
= message_handler_req_lib_cpg_trackstart
,
223 .response_size
= sizeof (struct res_lib_cpg_trackstart
),
224 .response_id
= MESSAGE_RES_CPG_TRACKSTART
,
225 .flow_control
= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
228 .lib_handler_fn
= message_handler_req_lib_cpg_trackstop
,
229 .response_size
= sizeof (struct res_lib_cpg_trackstart
),
230 .response_id
= MESSAGE_RES_CPG_TRACKSTOP
,
231 .flow_control
= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
234 .lib_handler_fn
= message_handler_req_lib_cpg_local_get
,
235 .response_size
= sizeof (struct res_lib_cpg_local_get
),
236 .response_id
= MESSAGE_RES_CPG_LOCAL_GET
,
237 .flow_control
= OPENAIS_FLOW_CONTROL_NOT_REQUIRED
241 static struct openais_exec_handler cpg_exec_service
[] =
244 .exec_handler_fn
= message_handler_req_exec_cpg_procjoin
,
245 .exec_endian_convert_fn
= exec_cpg_procjoin_endian_convert
248 .exec_handler_fn
= message_handler_req_exec_cpg_procleave
,
249 .exec_endian_convert_fn
= exec_cpg_procjoin_endian_convert
252 .exec_handler_fn
= message_handler_req_exec_cpg_joinlist
,
253 .exec_endian_convert_fn
= exec_cpg_joinlist_endian_convert
256 .exec_handler_fn
= message_handler_req_exec_cpg_mcast
,
257 .exec_endian_convert_fn
= exec_cpg_mcast_endian_convert
260 .exec_handler_fn
= message_handler_req_exec_cpg_downlist
,
261 .exec_endian_convert_fn
= exec_cpg_downlist_endian_convert
265 struct openais_service_handler cpg_service_handler
= {
266 .name
= "openais cluster closed process group service v1.01",
268 .private_data_size
= sizeof (struct process_info
),
269 .flow_control
= OPENAIS_FLOW_CONTROL_REQUIRED
,
270 .lib_init_fn
= cpg_lib_init_fn
,
271 .lib_exit_fn
= cpg_lib_exit_fn
,
272 .lib_service
= cpg_lib_service
,
273 .lib_service_count
= sizeof (cpg_lib_service
) / sizeof (struct openais_lib_handler
),
274 .exec_init_fn
= cpg_exec_init_fn
,
275 .exec_dump_fn
= NULL
,
276 .exec_service
= cpg_exec_service
,
277 .exec_service_count
= sizeof (cpg_exec_service
) / sizeof (struct openais_exec_handler
),
278 .confchg_fn
= cpg_confchg_fn
,
279 .sync_init
= cpg_sync_init
,
280 .sync_process
= cpg_sync_process
,
281 .sync_activate
= cpg_sync_activate
,
282 .sync_abort
= cpg_sync_abort
286 * Dynamic loader definition
288 static struct openais_service_handler
*cpg_get_service_handler_ver0 (void);
290 static struct openais_service_handler_iface_ver0 cpg_service_handler_iface
= {
291 .openais_get_service_handler_ver0
= cpg_get_service_handler_ver0
294 static struct lcr_iface openais_cpg_ver0
[1] = {
296 .name
= "openais_cpg",
298 .versions_replace
= 0,
299 .versions_replace_count
= 0,
301 .dependency_count
= 0,
308 static struct lcr_comp cpg_comp_ver0
= {
310 .ifaces
= openais_cpg_ver0
314 static struct openais_service_handler
*cpg_get_service_handler_ver0 (void)
316 return (&cpg_service_handler
);
319 __attribute__ ((constructor
)) static void cpg_comp_register (void) {
320 lcr_interfaces_set (&openais_cpg_ver0
[0], &cpg_service_handler_iface
);
322 lcr_component_register (&cpg_comp_ver0
);
325 struct req_exec_cpg_procjoin
{
326 mar_req_header_t header
__attribute__((aligned(8)));
327 mar_cpg_name_t group_name
__attribute__((aligned(8)));
328 mar_uint32_t pid
__attribute__((aligned(8)));
329 mar_uint32_t reason
__attribute__((aligned(8)));
332 struct req_exec_cpg_mcast
{
333 mar_req_header_t header
__attribute__((aligned(8)));
334 mar_cpg_name_t group_name
__attribute__((aligned(8)));
335 mar_uint32_t msglen
__attribute__((aligned(8)));
336 mar_uint32_t pid
__attribute__((aligned(8)));
337 mar_message_source_t source
__attribute__((aligned(8)));
338 mar_uint8_t message
[] __attribute__((aligned(8)));
341 struct req_exec_cpg_downlist
{
342 mar_req_header_t header
__attribute__((aligned(8)));
343 mar_uint32_t left_nodes
__attribute__((aligned(8)));
344 mar_uint32_t nodeids
[PROCESSOR_COUNT_MAX
] __attribute__((aligned(8)));
347 static struct req_exec_cpg_downlist req_exec_cpg_downlist
;
349 static void cpg_sync_init (void)
353 static int cpg_sync_process (void)
355 return cpg_exec_send_joinlist();
358 static void cpg_sync_activate (void)
362 static void cpg_sync_abort (void)
369 static int notify_lib_joinlist(
370 struct group_info
*gi
,
372 int joined_list_entries
,
373 mar_cpg_address_t
*joined_list
,
374 int left_list_entries
,
375 mar_cpg_address_t
*left_list
,
380 struct res_lib_cpg_confchg_callback
*res
;
381 struct list_head
*iter
;
382 struct list_head
*tmp
;
383 mar_cpg_address_t
*retgi
;
386 /* First, we need to know how many nodes are in the list. While we're
387 traversing this list, look for the 'us' entry so we know which
388 connection to send back down */
389 for (iter
= gi
->members
.next
; iter
!= &gi
->members
; iter
= iter
->next
) {
390 struct process_info
*pi
= list_entry(iter
, struct process_info
, list
);
395 log_printf(LOG_LEVEL_DEBUG
, "Sending new joinlist (%d elements) to clients\n", count
);
397 size
= sizeof(struct res_lib_cpg_confchg_callback
) +
398 sizeof(mar_cpg_address_t
) * (count
+ left_list_entries
+ joined_list_entries
);
401 return SA_AIS_ERR_NO_SPACE
;
403 res
= (struct res_lib_cpg_confchg_callback
*)buf
;
404 res
->joined_list_entries
= joined_list_entries
;
405 res
->left_list_entries
= left_list_entries
;
406 retgi
= res
->member_list
;
408 res
->header
.size
= size
;
410 memcpy(&res
->group_name
, &gi
->group_name
, sizeof(mar_cpg_name_t
));
412 /* Build up the message */
414 for (iter
= gi
->members
.next
; iter
!= &gi
->members
; iter
= iter
->next
) {
415 struct process_info
*pi
= list_entry(iter
, struct process_info
, list
);
417 /* Processes leaving will be removed AFTER this is done (so that they get their
418 own leave notifications), so exclude them from the members list here */
420 for (i
=0; i
<left_list_entries
; i
++) {
421 if (left_list
[i
].pid
== pi
->pid
&& left_list
[i
].nodeid
== pi
->nodeid
)
425 retgi
->nodeid
= pi
->nodeid
;
426 retgi
->pid
= pi
->pid
;
432 res
->member_list_entries
= count
;
434 if (left_list_entries
) {
435 memcpy(retgi
, left_list
, left_list_entries
* sizeof(mar_cpg_address_t
));
436 retgi
+= left_list_entries
;
439 if (joined_list_entries
) {
440 memcpy(retgi
, joined_list
, joined_list_entries
* sizeof(mar_cpg_address_t
));
441 retgi
+= joined_list_entries
;
445 openais_conn_send_response(conn
, buf
, size
);
448 /* Send it to all listeners */
449 for (iter
= gi
->members
.next
, tmp
=iter
->next
; iter
!= &gi
->members
; iter
= tmp
, tmp
=iter
->next
) {
450 struct process_info
*pi
= list_entry(iter
, struct process_info
, list
);
451 if (pi
->trackerconn
&& (pi
->flags
& PI_FLAG_MEMBER
)) {
452 if (openais_conn_send_response(pi
->trackerconn
, buf
, size
) == -1) {
462 static void remove_group(struct group_info
*gi
)
469 static int cpg_exec_init_fn (struct objdb_iface_ver0
*objdb
)
473 for (i
=0; i
<GROUP_HASH_SIZE
; i
++) {
474 list_init(&group_lists
[i
]);
480 static int cpg_lib_exit_fn (void *conn
)
482 struct process_info
*pi
= (struct process_info
*)openais_conn_private_data_get (conn
);
483 struct group_info
*gi
= pi
->group
;
484 mar_cpg_address_t notify_info
;
486 log_printf(LOG_LEVEL_DEBUG
, "exit_fn for conn=%p\n", conn
);
489 notify_info
.pid
= pi
->pid
;
490 notify_info
.nodeid
= totempg_my_nodeid_get();
491 notify_info
.reason
= CONFCHG_CPG_REASON_PROCDOWN
;
492 cpg_node_joinleave_send(gi
, pi
, MESSAGE_REQ_EXEC_CPG_PROCLEAVE
, CONFCHG_CPG_REASON_PROCDOWN
);
498 static struct group_info
*get_group(mar_cpg_name_t
*name
)
500 struct list_head
*iter
;
501 struct group_info
*gi
= NULL
;
502 struct group_info
*itergi
;
503 uint32_t hash
= jhash(name
->value
, name
->length
, 0) % GROUP_HASH_SIZE
;
505 for (iter
= group_lists
[hash
].next
; iter
!= &group_lists
[hash
]; iter
= iter
->next
) {
506 itergi
= list_entry(iter
, struct group_info
, list
);
507 if (memcmp(itergi
->group_name
.value
, name
->value
, name
->length
) == 0) {
514 gi
= malloc(sizeof(struct group_info
));
516 log_printf(LOG_LEVEL_WARNING
, "Unable to allocate group_info struct");
519 memcpy(&gi
->group_name
, name
, sizeof(mar_cpg_name_t
));
521 list_init(&gi
->members
);
522 list_add(&gi
->list
, &group_lists
[hash
]);
527 static int cpg_node_joinleave_send (struct group_info
*gi
, struct process_info
*pi
, int fn
, int reason
)
529 struct req_exec_cpg_procjoin req_exec_cpg_procjoin
;
530 struct iovec req_exec_cpg_iovec
;
533 memcpy(&req_exec_cpg_procjoin
.group_name
, &gi
->group_name
, sizeof(mar_cpg_name_t
));
534 req_exec_cpg_procjoin
.pid
= pi
->pid
;
535 req_exec_cpg_procjoin
.reason
= reason
;
537 req_exec_cpg_procjoin
.header
.size
= sizeof(req_exec_cpg_procjoin
);
538 req_exec_cpg_procjoin
.header
.id
= SERVICE_ID_MAKE(CPG_SERVICE
, fn
);
540 req_exec_cpg_iovec
.iov_base
= (char *)&req_exec_cpg_procjoin
;
541 req_exec_cpg_iovec
.iov_len
= sizeof(req_exec_cpg_procjoin
);
543 result
= totempg_groups_mcast_joined (openais_group_handle
, &req_exec_cpg_iovec
, 1, TOTEMPG_AGREED
);
548 static void remove_node_from_groups(
550 struct list_head
*remlist
)
553 struct list_head
*iter
, *iter2
, *tmp
;
554 struct process_info
*pi
;
555 struct group_info
*gi
;
557 for (i
=0; i
< GROUP_HASH_SIZE
; i
++) {
558 for (iter
= group_lists
[i
].next
; iter
!= &group_lists
[i
]; iter
= iter
->next
) {
559 gi
= list_entry(iter
, struct group_info
, list
);
560 for (iter2
= gi
->members
.next
, tmp
= iter2
->next
; iter2
!= &gi
->members
; iter2
= tmp
, tmp
= iter2
->next
) {
561 pi
= list_entry(iter2
, struct process_info
, list
);
563 if (pi
->nodeid
== nodeid
) {
565 /* Add it to the list of nodes to send notifications for */
567 gi
->rg
= malloc(sizeof(struct removed_group
));
569 list_add(&gi
->rg
->list
, remlist
);
571 gi
->rg
->left_list_entries
= 0;
572 gi
->rg
->left_list_size
= PROCESSOR_COUNT_MAX
;
575 log_printf(LOG_LEVEL_CRIT
, "Unable to allocate removed group struct. CPG callbacks will be junk.");
579 /* Do we need to increase the size ?
580 * Yes, I increase this exponentially. Generally, if you've got a lot of groups,
581 * you'll have a /lot/ of groups, and cgp_groupinfo is pretty small anyway
583 if (gi
->rg
->left_list_size
== gi
->rg
->left_list_entries
) {
585 struct removed_group
*newrg
;
587 list_del(&gi
->rg
->list
);
588 newsize
= gi
->rg
->left_list_size
* 2;
589 newrg
= realloc(gi
->rg
, sizeof(struct removed_group
) + newsize
*sizeof(mar_cpg_address_t
));
591 log_printf(LOG_LEVEL_CRIT
, "Unable to realloc removed group struct. CPG callbacks will be junk.");
594 newrg
->left_list_size
= newsize
+PROCESSOR_COUNT_MAX
;
596 list_add(&gi
->rg
->list
, remlist
);
598 gi
->rg
->left_list
[gi
->rg
->left_list_entries
].pid
= pi
->pid
;
599 gi
->rg
->left_list
[gi
->rg
->left_list_entries
].nodeid
= pi
->nodeid
;
600 gi
->rg
->left_list
[gi
->rg
->left_list_entries
].reason
= CONFCHG_CPG_REASON_NODEDOWN
;
601 gi
->rg
->left_list_entries
++;
603 /* Remove node info for dead node */
613 static void cpg_confchg_fn (
614 enum totem_configuration_type configuration_type
,
615 unsigned int *member_list
, int member_list_entries
,
616 unsigned int *left_list
, int left_list_entries
,
617 unsigned int *joined_list
, int joined_list_entries
,
618 struct memb_ring_id
*ring_id
)
621 uint32_t lowest_nodeid
= 0xffffff;
622 struct iovec req_exec_cpg_iovec
;
624 /* We don't send the library joinlist in here because it can end up
625 out of order with the rest of the messages (which are totem ordered).
626 So we get the lowest nodeid to send out a list of left nodes instead.
627 On receipt of that message, all nodes will then notify their local clients
628 of the new joinlist */
630 if (left_list_entries
) {
631 for (i
= 0; i
< member_list_entries
; i
++) {
632 if (member_list
[i
] < lowest_nodeid
)
633 lowest_nodeid
= member_list
[i
];
636 log_printf(LOG_LEVEL_DEBUG
, "confchg, low nodeid=%d, us = %d\n", lowest_nodeid
, totempg_my_nodeid_get());
637 if (lowest_nodeid
== totempg_my_nodeid_get()) {
639 req_exec_cpg_downlist
.header
.id
= SERVICE_ID_MAKE(CPG_SERVICE
, MESSAGE_REQ_EXEC_CPG_DOWNLIST
);
640 req_exec_cpg_downlist
.header
.size
= sizeof(struct req_exec_cpg_downlist
);
642 req_exec_cpg_downlist
.left_nodes
= left_list_entries
;
643 for (i
= 0; i
< left_list_entries
; i
++) {
644 req_exec_cpg_downlist
.nodeids
[i
] = left_list
[i
];
646 log_printf(LOG_LEVEL_DEBUG
, "confchg, build downlist: %d nodes\n", left_list_entries
);
650 /* Don't send this message until we get the final configuration message */
651 if (configuration_type
== TOTEM_CONFIGURATION_REGULAR
&& req_exec_cpg_downlist
.left_nodes
) {
652 req_exec_cpg_iovec
.iov_base
= (char *)&req_exec_cpg_downlist
;
653 req_exec_cpg_iovec
.iov_len
= req_exec_cpg_downlist
.header
.size
;
655 totempg_groups_mcast_joined (openais_group_handle
, &req_exec_cpg_iovec
, 1, TOTEMPG_AGREED
);
656 req_exec_cpg_downlist
.left_nodes
= 0;
657 log_printf(LOG_LEVEL_DEBUG
, "confchg, sent downlist\n");
661 static void cpg_flow_control_state_set_fn (
663 enum openais_flow_control_state flow_control_state
)
665 struct process_info
*process_info
= (struct process_info
*)context
;
667 process_info
->flow_control_state
= flow_control_state
;
670 /* Can byteswap join & leave messages */
671 static void exec_cpg_procjoin_endian_convert (void *msg
)
673 struct req_exec_cpg_procjoin
*req_exec_cpg_procjoin
= (struct req_exec_cpg_procjoin
*)msg
;
675 req_exec_cpg_procjoin
->pid
= swab32(req_exec_cpg_procjoin
->pid
);
676 swab_mar_cpg_name_t (&req_exec_cpg_procjoin
->group_name
);
677 req_exec_cpg_procjoin
->reason
= swab32(req_exec_cpg_procjoin
->reason
);
680 static void exec_cpg_joinlist_endian_convert (void *msg
)
682 mar_res_header_t
*res
= (mar_res_header_t
*)msg
;
683 struct join_list_entry
*jle
= (struct join_list_entry
*)(msg
+ sizeof(mar_res_header_t
));
685 /* XXX shouldn't mar_res_header be swabbed? */
687 while ((void*)jle
< msg
+ res
->size
) {
688 jle
->pid
= swab32(jle
->pid
);
689 swab_mar_cpg_name_t (&jle
->group_name
);
694 static void exec_cpg_downlist_endian_convert (void *msg
)
696 struct req_exec_cpg_downlist
*req_exec_cpg_downlist
= (struct req_exec_cpg_downlist
*)msg
;
699 req_exec_cpg_downlist
->left_nodes
= swab32(req_exec_cpg_downlist
->left_nodes
);
701 for (i
= 0; i
< req_exec_cpg_downlist
->left_nodes
; i
++) {
702 req_exec_cpg_downlist
->nodeids
[i
] = swab32(req_exec_cpg_downlist
->nodeids
[i
]);
707 static void exec_cpg_mcast_endian_convert (void *msg
)
709 struct req_exec_cpg_mcast
*req_exec_cpg_mcast
= (struct req_exec_cpg_mcast
*)msg
;
711 swab_mar_req_header_t (&req_exec_cpg_mcast
->header
);
712 swab_mar_cpg_name_t (&req_exec_cpg_mcast
->group_name
);
713 req_exec_cpg_mcast
->pid
= swab32(req_exec_cpg_mcast
->pid
);
714 req_exec_cpg_mcast
->msglen
= swab32(req_exec_cpg_mcast
->msglen
);
715 swab_mar_message_source_t (&req_exec_cpg_mcast
->source
);
718 static void do_proc_join(
719 mar_cpg_name_t
*name
,
724 struct group_info
*gi
;
725 struct process_info
*pi
;
726 struct list_head
*iter
;
727 mar_cpg_address_t notify_info
;
729 gi
= get_group(name
); /* this will always succeed ! */
732 /* See if it already exists in this group */
733 for (iter
= gi
->members
.next
; iter
!= &gi
->members
; iter
= iter
->next
) {
734 pi
= list_entry(iter
, struct process_info
, list
);
735 if (pi
->pid
== pid
&& pi
->nodeid
== nodeid
) {
737 /* It could be a local join message */
738 if ((nodeid
== totempg_my_nodeid_get()) &&
739 (!pi
->flags
& PI_FLAG_MEMBER
)) {
747 pi
= malloc(sizeof(struct process_info
));
749 log_printf(LOG_LEVEL_WARNING
, "Unable to allocate process_info struct");
756 pi
->trackerconn
= NULL
;
757 list_add_tail(&pi
->list
, &gi
->members
);
761 pi
->flags
= PI_FLAG_MEMBER
;
762 notify_info
.pid
= pi
->pid
;
763 notify_info
.nodeid
= nodeid
;
764 notify_info
.reason
= reason
;
766 notify_lib_joinlist(gi
, NULL
,
769 MESSAGE_RES_CPG_CONFCHG_CALLBACK
);
772 static void message_handler_req_exec_cpg_downlist (
776 struct req_exec_cpg_downlist
*req_exec_cpg_downlist
= (struct req_exec_cpg_downlist
*)message
;
778 struct list_head removed_list
;
780 log_printf(LOG_LEVEL_DEBUG
, "downlist left_list: %d\n", req_exec_cpg_downlist
->left_nodes
);
782 list_init(&removed_list
);
784 /* Remove nodes from joined groups and add removed groups to the list */
785 for (i
= 0; i
< req_exec_cpg_downlist
->left_nodes
; i
++) {
786 remove_node_from_groups( req_exec_cpg_downlist
->nodeids
[i
], &removed_list
);
789 if (!list_empty(&removed_list
)) {
790 struct list_head
*iter
, *tmp
;
792 for (iter
= removed_list
.next
, tmp
=iter
->next
; iter
!= &removed_list
; iter
= tmp
, tmp
= iter
->next
) {
793 struct removed_group
*rg
= list_entry(iter
, struct removed_group
, list
);
795 notify_lib_joinlist(rg
->gi
, NULL
,
797 rg
->left_list_entries
, rg
->left_list
,
798 MESSAGE_RES_CPG_CONFCHG_CALLBACK
);
805 static void message_handler_req_exec_cpg_procjoin (
809 struct req_exec_cpg_procjoin
*req_exec_cpg_procjoin
= (struct req_exec_cpg_procjoin
*)message
;
811 log_printf(LOG_LEVEL_DEBUG
, "got procjoin message from cluster node %d\n", nodeid
);
813 do_proc_join(&req_exec_cpg_procjoin
->group_name
,
814 req_exec_cpg_procjoin
->pid
, nodeid
,
815 CONFCHG_CPG_REASON_JOIN
);
818 static void message_handler_req_exec_cpg_procleave (
822 struct req_exec_cpg_procjoin
*req_exec_cpg_procjoin
= (struct req_exec_cpg_procjoin
*)message
;
823 struct group_info
*gi
;
824 struct process_info
*pi
;
825 struct list_head
*iter
;
826 mar_cpg_address_t notify_info
;
828 log_printf(LOG_LEVEL_DEBUG
, "got procleave message from cluster node %d\n", nodeid
);
830 gi
= get_group(&req_exec_cpg_procjoin
->group_name
); /* this will always succeed ! */
833 notify_info
.pid
= req_exec_cpg_procjoin
->pid
;
834 notify_info
.nodeid
= nodeid
;
835 notify_info
.reason
= req_exec_cpg_procjoin
->reason
;
837 notify_lib_joinlist(gi
, NULL
,
840 MESSAGE_RES_CPG_CONFCHG_CALLBACK
);
842 /* Find the node/PID to remove */
843 for (iter
= gi
->members
.next
; iter
!= &gi
->members
; iter
= iter
->next
) {
844 pi
= list_entry(iter
, struct process_info
, list
);
845 if (pi
->pid
== req_exec_cpg_procjoin
->pid
&&
846 pi
->nodeid
== nodeid
) {
854 if (list_empty(&gi
->members
)) {
863 /* Got a proclist from another node */
864 static void message_handler_req_exec_cpg_joinlist (
868 mar_res_header_t
*res
= (mar_res_header_t
*)message
;
869 struct join_list_entry
*jle
= (struct join_list_entry
*)(message
+ sizeof(mar_res_header_t
));
871 log_printf(LOG_LEVEL_NOTICE
, "got joinlist message from node %d\n",
874 /* Ignore our own messages */
875 if (nodeid
== totempg_my_nodeid_get()) {
879 while ((void*)jle
< message
+ res
->size
) {
880 do_proc_join(&jle
->group_name
, jle
->pid
, nodeid
,
881 CONFCHG_CPG_REASON_NODEUP
);
886 static void message_handler_req_exec_cpg_mcast (
890 struct req_exec_cpg_mcast
*req_exec_cpg_mcast
= (struct req_exec_cpg_mcast
*)message
;
891 struct res_lib_cpg_deliver_callback
*res_lib_cpg_mcast
;
892 struct process_info
*process_info
;
893 int msglen
= req_exec_cpg_mcast
->msglen
;
894 char buf
[sizeof(*res_lib_cpg_mcast
) + msglen
];
895 struct group_info
*gi
;
896 struct list_head
*iter
;
899 * Track local messages so that flow is controlled on the local node
901 gi
= get_group(&req_exec_cpg_mcast
->group_name
); /* this will always succeed ! */
904 res_lib_cpg_mcast
= (struct res_lib_cpg_deliver_callback
*)buf
;
905 res_lib_cpg_mcast
->header
.id
= MESSAGE_RES_CPG_DELIVER_CALLBACK
;
906 res_lib_cpg_mcast
->header
.size
= sizeof(*res_lib_cpg_mcast
) + msglen
;
907 res_lib_cpg_mcast
->msglen
= msglen
;
908 res_lib_cpg_mcast
->pid
= req_exec_cpg_mcast
->pid
;
909 res_lib_cpg_mcast
->nodeid
= nodeid
;
910 res_lib_cpg_mcast
->flow_control_state
= CPG_FLOW_CONTROL_DISABLED
;
911 if (message_source_is_local (&req_exec_cpg_mcast
->source
)) {
912 openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast
->source
.conn
);
913 process_info
= (struct process_info
*)openais_conn_private_data_get (req_exec_cpg_mcast
->source
.conn
);
914 res_lib_cpg_mcast
->flow_control_state
= process_info
->flow_control_state
;
916 memcpy(&res_lib_cpg_mcast
->group_name
, &gi
->group_name
,
917 sizeof(mar_cpg_name_t
));
918 memcpy(&res_lib_cpg_mcast
->message
, (char*)message
+sizeof(*req_exec_cpg_mcast
),
921 /* Send to all interested members */
922 for (iter
= gi
->members
.next
; iter
!= &gi
->members
; iter
= iter
->next
) {
923 struct process_info
*pi
= list_entry(iter
, struct process_info
, list
);
924 if (pi
->trackerconn
) {
925 openais_conn_send_response(
928 res_lib_cpg_mcast
->header
.size
);
934 static int cpg_exec_send_joinlist(void)
939 struct list_head
*iter
;
940 struct list_head
*iter2
;
941 struct group_info
*gi
;
942 mar_res_header_t
*res
;
943 struct join_list_entry
*jle
;
944 struct iovec req_exec_cpg_iovec
;
946 log_printf(LOG_LEVEL_DEBUG
, "sending joinlist to cluster\n");
948 /* Count the number of groups we are a member of */
949 for (i
=0; i
<GROUP_HASH_SIZE
; i
++) {
950 for (iter
= group_lists
[i
].next
; iter
!= &group_lists
[i
]; iter
= iter
->next
) {
951 gi
= list_entry(iter
, struct group_info
, list
);
952 for (iter2
= gi
->members
.next
; iter2
!= &gi
->members
; iter2
= iter2
->next
) {
953 struct process_info
*pi
= list_entry(iter2
, struct process_info
, list
);
954 if (pi
->pid
&& pi
->nodeid
== totempg_my_nodeid_get()) {
961 /* Nothing to send */
965 buf
= alloca(sizeof(mar_res_header_t
) + sizeof(struct join_list_entry
) * count
);
967 log_printf(LOG_LEVEL_WARNING
, "Unable to allocate joinlist buffer");
971 jle
= (struct join_list_entry
*)(buf
+ sizeof(mar_res_header_t
));
972 res
= (mar_res_header_t
*)buf
;
974 for (i
=0; i
<GROUP_HASH_SIZE
; i
++) {
975 for (iter
= group_lists
[i
].next
; iter
!= &group_lists
[i
]; iter
= iter
->next
) {
977 gi
= list_entry(iter
, struct group_info
, list
);
978 for (iter2
= gi
->members
.next
; iter2
!= &gi
->members
; iter2
= iter2
->next
) {
980 struct process_info
*pi
= list_entry(iter2
, struct process_info
, list
);
981 if (pi
->pid
&& pi
->nodeid
== totempg_my_nodeid_get()) {
982 memcpy(&jle
->group_name
, &gi
->group_name
, sizeof(mar_cpg_name_t
));
990 res
->id
= SERVICE_ID_MAKE(CPG_SERVICE
, MESSAGE_REQ_EXEC_CPG_JOINLIST
);
991 res
->size
= sizeof(mar_res_header_t
)+sizeof(struct join_list_entry
) * count
;
993 req_exec_cpg_iovec
.iov_base
= buf
;
994 req_exec_cpg_iovec
.iov_len
= res
->size
;
996 return totempg_groups_mcast_joined (openais_group_handle
, &req_exec_cpg_iovec
, 1, TOTEMPG_AGREED
);
999 static int cpg_lib_init_fn (void *conn
)
1001 struct process_info
*pi
= (struct process_info
*)openais_conn_private_data_get (conn
);
1004 log_printf(LOG_LEVEL_DEBUG
, "lib_init_fn: conn=%p, pi=%p\n", conn
, pi
);
1008 /* Join message from the library */
1009 static void message_handler_req_lib_cpg_join (void *conn
, void *message
)
1011 struct req_lib_cpg_join
*req_lib_cpg_join
= (struct req_lib_cpg_join
*)message
;
1012 struct process_info
*pi
= (struct process_info
*)openais_conn_private_data_get (conn
);
1013 struct res_lib_cpg_join res_lib_cpg_join
;
1014 struct group_info
*gi
;
1015 SaAisErrorT error
= SA_AIS_OK
;
1017 log_printf(LOG_LEVEL_DEBUG
, "got join request on %p, pi=%p, pi->pid=%d\n", conn
, pi
, pi
->pid
);
1019 /* Already joined on this conn */
1021 error
= SA_AIS_ERR_INVALID_PARAM
;
1025 gi
= get_group(&req_lib_cpg_join
->group_name
);
1027 error
= SA_AIS_ERR_NO_SPACE
;
1031 openais_ipc_flow_control_create (
1034 req_lib_cpg_join
->group_name
.value
,
1035 req_lib_cpg_join
->group_name
.length
,
1036 cpg_flow_control_state_set_fn
,
1039 /* Add a node entry for us */
1040 pi
->nodeid
= totempg_my_nodeid_get();
1041 pi
->pid
= req_lib_cpg_join
->pid
;
1043 list_add(&pi
->list
, &gi
->members
);
1045 /* Tell the rest of the cluster */
1046 cpg_node_joinleave_send(gi
, pi
, MESSAGE_REQ_EXEC_CPG_PROCJOIN
, CONFCHG_CPG_REASON_JOIN
);
1049 res_lib_cpg_join
.header
.size
= sizeof(res_lib_cpg_join
);
1050 res_lib_cpg_join
.header
.id
= MESSAGE_RES_CPG_JOIN
;
1051 res_lib_cpg_join
.header
.error
= error
;
1052 openais_conn_send_response(conn
, &res_lib_cpg_join
, sizeof(res_lib_cpg_join
));
1055 /* Leave message from the library */
1056 static void message_handler_req_lib_cpg_leave (void *conn
, void *message
)
1058 struct process_info
*pi
= (struct process_info
*)openais_conn_private_data_get (conn
);
1059 struct res_lib_cpg_leave res_lib_cpg_leave
;
1060 struct group_info
*gi
;
1061 SaAisErrorT error
= SA_AIS_OK
;
1063 log_printf(LOG_LEVEL_DEBUG
, "got leave request on %p\n", conn
);
1065 if (!pi
|| !pi
->pid
|| !pi
->group
) {
1066 error
= SA_AIS_ERR_INVALID_PARAM
;
1071 /* Tell other nodes we are leaving.
1072 When we get this message back we will leave too */
1073 cpg_node_joinleave_send(gi
, pi
, MESSAGE_REQ_EXEC_CPG_PROCLEAVE
, CONFCHG_CPG_REASON_LEAVE
);
1076 openais_ipc_flow_control_destroy (
1079 (unsigned char *)gi
->group_name
.value
,
1080 (unsigned int)gi
->group_name
.length
);
1084 res_lib_cpg_leave
.header
.size
= sizeof(res_lib_cpg_leave
);
1085 res_lib_cpg_leave
.header
.id
= MESSAGE_RES_CPG_LEAVE
;
1086 res_lib_cpg_leave
.header
.error
= error
;
1087 openais_conn_send_response(conn
, &res_lib_cpg_leave
, sizeof(res_lib_cpg_leave
));
1090 /* Mcast message from the library */
1091 static void message_handler_req_lib_cpg_mcast (void *conn
, void *message
)
1093 struct req_lib_cpg_mcast
*req_lib_cpg_mcast
= (struct req_lib_cpg_mcast
*)message
;
1094 struct process_info
*pi
= (struct process_info
*)openais_conn_private_data_get (conn
);
1095 struct group_info
*gi
= pi
->group
;
1096 struct iovec req_exec_cpg_iovec
[2];
1097 struct req_exec_cpg_mcast req_exec_cpg_mcast
;
1098 struct res_lib_cpg_mcast res_lib_cpg_mcast
;
1099 int msglen
= req_lib_cpg_mcast
->msglen
;
1102 log_printf(LOG_LEVEL_DEBUG
, "got mcast request on %p\n", conn
);
1104 /* Can't send if we're not joined */
1106 res_lib_cpg_mcast
.header
.size
= sizeof(res_lib_cpg_mcast
);
1107 res_lib_cpg_mcast
.header
.id
= MESSAGE_RES_CPG_MCAST
;
1108 res_lib_cpg_mcast
.header
.error
= SA_AIS_ERR_ACCESS
; /* TODO Better error code ?? */
1109 res_lib_cpg_mcast
.flow_control_state
= CPG_FLOW_CONTROL_DISABLED
;
1110 openais_conn_send_response(conn
, &res_lib_cpg_mcast
,
1111 sizeof(res_lib_cpg_mcast
));
1115 req_exec_cpg_mcast
.header
.size
= sizeof(req_exec_cpg_mcast
) + msglen
;
1116 req_exec_cpg_mcast
.header
.id
= SERVICE_ID_MAKE(CPG_SERVICE
,
1117 MESSAGE_REQ_EXEC_CPG_MCAST
);
1118 req_exec_cpg_mcast
.pid
= pi
->pid
;
1119 req_exec_cpg_mcast
.msglen
= msglen
;
1120 message_source_set (&req_exec_cpg_mcast
.source
, conn
);
1121 memcpy(&req_exec_cpg_mcast
.group_name
, &gi
->group_name
,
1122 sizeof(mar_cpg_name_t
));
1124 req_exec_cpg_iovec
[0].iov_base
= (char *)&req_exec_cpg_mcast
;
1125 req_exec_cpg_iovec
[0].iov_len
= sizeof(req_exec_cpg_mcast
);
1126 req_exec_cpg_iovec
[1].iov_base
= (char *)&req_lib_cpg_mcast
->message
;
1127 req_exec_cpg_iovec
[1].iov_len
= msglen
;
1129 // TODO: guarantee type...
1130 result
= totempg_groups_mcast_joined (openais_group_handle
, req_exec_cpg_iovec
, 2, TOTEMPG_AGREED
);
1131 openais_ipc_flow_control_local_increment (conn
);
1133 res_lib_cpg_mcast
.header
.size
= sizeof(res_lib_cpg_mcast
);
1134 res_lib_cpg_mcast
.header
.id
= MESSAGE_RES_CPG_MCAST
;
1135 res_lib_cpg_mcast
.header
.error
= SA_AIS_OK
;
1136 res_lib_cpg_mcast
.flow_control_state
= pi
->flow_control_state
;
1137 openais_conn_send_response(conn
, &res_lib_cpg_mcast
,
1138 sizeof(res_lib_cpg_mcast
));
1141 static void message_handler_req_lib_cpg_membership (void *conn
, void *message
)
1143 struct process_info
*pi
= (struct process_info
*)openais_conn_private_data_get (conn
);
1145 log_printf(LOG_LEVEL_DEBUG
, "got membership request on %p\n", conn
);
1147 mar_res_header_t res
;
1148 res
.size
= sizeof(res
);
1149 res
.id
= MESSAGE_RES_CPG_MEMBERSHIP
;
1150 res
.error
= SA_AIS_ERR_ACCESS
; /* TODO Better error code */
1151 openais_conn_send_response(conn
, &res
, sizeof(res
));
1155 notify_lib_joinlist(pi
->group
, conn
, 0, NULL
, 0, NULL
, MESSAGE_RES_CPG_MEMBERSHIP
);
1159 static void message_handler_req_lib_cpg_trackstart (void *conn
, void *message
)
1161 struct req_lib_cpg_trackstart
*req_lib_cpg_trackstart
= (struct req_lib_cpg_trackstart
*)message
;
1162 struct res_lib_cpg_trackstart res_lib_cpg_trackstart
;
1163 struct group_info
*gi
;
1164 struct process_info
*otherpi
;
1166 SaAisErrorT error
= SA_AIS_OK
;
1168 log_printf(LOG_LEVEL_DEBUG
, "got trackstart request on %p\n", conn
);
1170 gi
= get_group(&req_lib_cpg_trackstart
->group_name
);
1172 error
= SA_AIS_ERR_NO_SPACE
;
1176 /* Find the partner connection and add us to it's process_info struct */
1177 otherconn
= openais_conn_partner_get (conn
);
1178 otherpi
= (struct process_info
*)openais_conn_private_data_get (conn
);
1179 otherpi
->trackerconn
= conn
;
1182 res_lib_cpg_trackstart
.header
.size
= sizeof(res_lib_cpg_trackstart
);
1183 res_lib_cpg_trackstart
.header
.id
= MESSAGE_RES_CPG_TRACKSTART
;
1184 res_lib_cpg_trackstart
.header
.error
= SA_AIS_OK
;
1185 openais_conn_send_response(conn
, &res_lib_cpg_trackstart
, sizeof(res_lib_cpg_trackstart
));
1188 static void message_handler_req_lib_cpg_trackstop (void *conn
, void *message
)
1190 struct req_lib_cpg_trackstop
*req_lib_cpg_trackstop
= (struct req_lib_cpg_trackstop
*)message
;
1191 struct res_lib_cpg_trackstop res_lib_cpg_trackstop
;
1192 struct process_info
*otherpi
;
1194 struct group_info
*gi
;
1195 SaAisErrorT error
= SA_AIS_OK
;
1197 log_printf(LOG_LEVEL_DEBUG
, "got trackstop request on %p\n", conn
);
1199 gi
= get_group(&req_lib_cpg_trackstop
->group_name
);
1201 error
= SA_AIS_ERR_NO_SPACE
;
1205 /* Find the partner connection and add us to it's process_info struct */
1206 otherconn
= openais_conn_partner_get (conn
);
1207 otherpi
= (struct process_info
*)openais_conn_private_data_get (conn
);
1208 otherpi
->trackerconn
= NULL
;
1211 res_lib_cpg_trackstop
.header
.size
= sizeof(res_lib_cpg_trackstop
);
1212 res_lib_cpg_trackstop
.header
.id
= MESSAGE_RES_CPG_TRACKSTOP
;
1213 res_lib_cpg_trackstop
.header
.error
= SA_AIS_OK
;
1214 openais_conn_send_response(conn
, &res_lib_cpg_trackstop
.header
, sizeof(res_lib_cpg_trackstop
));
1217 static void message_handler_req_lib_cpg_local_get (void *conn
, void *message
)
1219 struct res_lib_cpg_local_get res_lib_cpg_local_get
;
1221 res_lib_cpg_local_get
.header
.size
= sizeof(res_lib_cpg_local_get
);
1222 res_lib_cpg_local_get
.header
.id
= MESSAGE_RES_CPG_LOCAL_GET
;
1223 res_lib_cpg_local_get
.header
.error
= SA_AIS_OK
;
1224 res_lib_cpg_local_get
.local_nodeid
= totempg_my_nodeid_get ();
1226 openais_conn_send_response(conn
, &res_lib_cpg_local_get
,
1227 sizeof(res_lib_cpg_local_get
));