2 * vi: set autoindent tabstop=4 shiftwidth=4 :
4 * Copyright (c) 2006 Red Hat, Inc.
8 * Author: Patrick Caulfield (pcaulfie@redhat.com)
10 * This software licensed under BSD license, the text of which follows:
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions are met:
15 * - Redistributions of source code must retain the above copyright notice,
16 * this list of conditions and the following disclaimer.
17 * - Redistributions in binary form must reproduce the above copyright notice,
18 * this list of conditions and the following disclaimer in the documentation
19 * and/or other materials provided with the distribution.
20 * - Neither the name of the MontaVista Software, Inc. nor the names of its
21 * contributors may be used to endorse or promote products derived from this
22 * software without specific prior written permission.
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
34 * THE POSSIBILITY OF SUCH DAMAGE.
37 * Provides a closed process group API using the openais executive
44 #include <sys/types.h>
45 #include <sys/socket.h>
58 cpg_flow_control_state_t flow_control_state
;
59 cpg_callbacks_t callbacks
;
61 pthread_mutex_t response_mutex
;
62 pthread_mutex_t dispatch_mutex
;
65 static void cpg_instance_destructor (void *instance
);
67 static struct saHandleDatabase cpg_handle_t_db
= {
70 .mutex
= PTHREAD_MUTEX_INITIALIZER
,
71 .handleInstanceDestructor
= cpg_instance_destructor
75 * Clean up function for a cpg instance (cpg_nitialize) handle
77 static void cpg_instance_destructor (void *instance
)
79 struct cpg_inst
*cpg_inst
= instance
;
81 pthread_mutex_destroy (&cpg_inst
->response_mutex
);
82 pthread_mutex_destroy (&cpg_inst
->dispatch_mutex
);
87 * @defgroup cpg_openais The closed process group API
93 cpg_error_t
cpg_initialize (
95 cpg_callbacks_t
*callbacks
)
98 struct cpg_inst
*cpg_inst
;
100 error
= saHandleCreate (&cpg_handle_t_db
, sizeof (struct cpg_inst
), handle
);
101 if (error
!= SA_AIS_OK
) {
102 goto error_no_destroy
;
105 error
= saHandleInstanceGet (&cpg_handle_t_db
, *handle
, (void *)&cpg_inst
);
106 if (error
!= SA_AIS_OK
) {
110 error
= saServiceConnect (&cpg_inst
->dispatch_fd
,
111 &cpg_inst
->response_fd
,
113 if (error
!= SA_AIS_OK
) {
114 goto error_put_destroy
;
117 memcpy (&cpg_inst
->callbacks
, callbacks
, sizeof (cpg_callbacks_t
));
119 pthread_mutex_init (&cpg_inst
->response_mutex
, NULL
);
121 pthread_mutex_init (&cpg_inst
->dispatch_mutex
, NULL
);
123 saHandleInstancePut (&cpg_handle_t_db
, *handle
);
128 saHandleInstancePut (&cpg_handle_t_db
, *handle
);
130 saHandleDestroy (&cpg_handle_t_db
, *handle
);
135 cpg_error_t
cpg_finalize (
138 struct cpg_inst
*cpg_inst
;
141 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
142 if (error
!= SA_AIS_OK
) {
146 pthread_mutex_lock (&cpg_inst
->response_mutex
);
149 * Another thread has already started finalizing
151 if (cpg_inst
->finalize
) {
152 pthread_mutex_unlock (&cpg_inst
->response_mutex
);
153 saHandleInstancePut (&cpg_handle_t_db
, handle
);
154 return (CPG_ERR_BAD_HANDLE
);
157 cpg_inst
->finalize
= 1;
159 pthread_mutex_unlock (&cpg_inst
->response_mutex
);
161 saHandleDestroy (&cpg_handle_t_db
, handle
);
164 * Disconnect from the server
166 if (cpg_inst
->response_fd
!= -1) {
167 shutdown(cpg_inst
->response_fd
, 0);
168 close(cpg_inst
->response_fd
);
170 if (cpg_inst
->dispatch_fd
!= -1) {
171 shutdown(cpg_inst
->dispatch_fd
, 0);
172 close(cpg_inst
->dispatch_fd
);
174 saHandleInstancePut (&cpg_handle_t_db
, handle
);
179 cpg_error_t
cpg_fd_get (
184 struct cpg_inst
*cpg_inst
;
186 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
187 if (error
!= SA_AIS_OK
) {
191 *fd
= cpg_inst
->dispatch_fd
;
193 saHandleInstancePut (&cpg_handle_t_db
, handle
);
198 cpg_error_t
cpg_context_get (
203 struct cpg_inst
*cpg_inst
;
205 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
206 if (error
!= SA_AIS_OK
) {
210 *context
= cpg_inst
->context
;
212 saHandleInstancePut (&cpg_handle_t_db
, handle
);
217 cpg_error_t
cpg_context_set (
222 struct cpg_inst
*cpg_inst
;
224 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
225 if (error
!= SA_AIS_OK
) {
229 cpg_inst
->context
= context
;
231 saHandleInstancePut (&cpg_handle_t_db
, handle
);
237 mar_res_header_t header
__attribute__((aligned(8)));
241 cpg_error_t
cpg_dispatch (
243 cpg_dispatch_t dispatch_types
)
248 int cont
= 1; /* always continue do loop except when set to 0 */
250 struct cpg_inst
*cpg_inst
;
251 struct res_lib_cpg_confchg_callback
*res_cpg_confchg_callback
;
252 struct res_lib_cpg_deliver_callback
*res_cpg_deliver_callback
;
253 cpg_callbacks_t callbacks
;
254 struct res_overlay dispatch_data
;
255 int ignore_dispatch
= 0;
256 struct cpg_address member_list
[CPG_MEMBERS_MAX
];
257 struct cpg_address left_list
[CPG_MEMBERS_MAX
];
258 struct cpg_address joined_list
[CPG_MEMBERS_MAX
];
259 struct cpg_name group_name
;
260 mar_cpg_address_t
*left_list_start
;
261 mar_cpg_address_t
*joined_list_start
;
264 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
265 if (error
!= SA_AIS_OK
) {
270 * Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and
271 * wait indefinately for SA_DISPATCH_BLOCKING
273 if (dispatch_types
== CPG_DISPATCH_ALL
) {
278 ufds
.fd
= cpg_inst
->dispatch_fd
;
279 ufds
.events
= POLLIN
;
282 error
= saPollRetry (&ufds
, 1, timeout
);
283 if (error
!= SA_AIS_OK
) {
287 pthread_mutex_lock (&cpg_inst
->dispatch_mutex
);
290 * Regather poll data in case ufds has changed since taking lock
292 error
= saPollRetry (&ufds
, 1, timeout
);
293 if (error
!= SA_AIS_OK
) {
298 * Handle has been finalized in another thread
300 if (cpg_inst
->finalize
== 1) {
302 pthread_mutex_unlock (&cpg_inst
->dispatch_mutex
);
306 dispatch_avail
= ufds
.revents
& POLLIN
;
307 if (dispatch_avail
== 0 && dispatch_types
== CPG_DISPATCH_ALL
) {
308 pthread_mutex_unlock (&cpg_inst
->dispatch_mutex
);
309 break; /* exit do while cont is 1 loop */
311 if (dispatch_avail
== 0) {
312 pthread_mutex_unlock (&cpg_inst
->dispatch_mutex
);
313 continue; /* next poll */
316 if (ufds
.revents
& POLLIN
) {
318 * Queue empty, read response from socket
320 error
= saRecvRetry (cpg_inst
->dispatch_fd
, &dispatch_data
.header
,
321 sizeof (mar_res_header_t
));
322 if (error
!= SA_AIS_OK
) {
325 if (dispatch_data
.header
.size
> sizeof (mar_res_header_t
)) {
326 error
= saRecvRetry (cpg_inst
->dispatch_fd
, &dispatch_data
.data
,
327 dispatch_data
.header
.size
- sizeof (mar_res_header_t
));
329 if (error
!= SA_AIS_OK
) {
334 pthread_mutex_unlock (&cpg_inst
->dispatch_mutex
);
339 * Make copy of callbacks, message data, unlock instance, and call callback
340 * A risk of this dispatch method is that the callback routines may
341 * operate at the same time that cpgFinalize has been called.
343 memcpy (&callbacks
, &cpg_inst
->callbacks
, sizeof (cpg_callbacks_t
));
345 pthread_mutex_unlock (&cpg_inst
->dispatch_mutex
);
347 * Dispatch incoming message
349 switch (dispatch_data
.header
.id
) {
350 case MESSAGE_RES_CPG_DELIVER_CALLBACK
:
351 res_cpg_deliver_callback
= (struct res_lib_cpg_deliver_callback
*)&dispatch_data
;
353 cpg_inst
->flow_control_state
= res_cpg_deliver_callback
->flow_control_state
;
354 marshall_from_mar_cpg_name_t (
356 &res_cpg_deliver_callback
->group_name
);
358 callbacks
.cpg_deliver_fn (handle
,
360 res_cpg_deliver_callback
->nodeid
,
361 res_cpg_deliver_callback
->pid
,
362 &res_cpg_deliver_callback
->message
,
363 res_cpg_deliver_callback
->msglen
);
366 case MESSAGE_RES_CPG_CONFCHG_CALLBACK
:
367 res_cpg_confchg_callback
= (struct res_lib_cpg_confchg_callback
*)&dispatch_data
;
369 for (i
= 0; i
< res_cpg_confchg_callback
->member_list_entries
; i
++) {
370 marshall_from_mar_cpg_address_t (&member_list
[i
],
371 &res_cpg_confchg_callback
->member_list
[i
]);
373 left_list_start
= res_cpg_confchg_callback
->member_list
+
374 res_cpg_confchg_callback
->member_list_entries
;
375 for (i
= 0; i
< res_cpg_confchg_callback
->left_list_entries
; i
++) {
376 marshall_from_mar_cpg_address_t (&left_list
[i
],
377 &left_list_start
[i
]);
379 joined_list_start
= res_cpg_confchg_callback
->member_list
+
380 res_cpg_confchg_callback
->member_list_entries
+
381 res_cpg_confchg_callback
->left_list_entries
;
382 for (i
= 0; i
< res_cpg_confchg_callback
->joined_list_entries
; i
++) {
383 marshall_from_mar_cpg_address_t (&joined_list
[i
],
384 &joined_list_start
[i
]);
386 marshall_from_mar_cpg_name_t (
388 &res_cpg_confchg_callback
->group_name
);
390 callbacks
.cpg_confchg_fn (handle
,
393 res_cpg_confchg_callback
->member_list_entries
,
395 res_cpg_confchg_callback
->left_list_entries
,
397 res_cpg_confchg_callback
->joined_list_entries
);
401 error
= SA_AIS_ERR_LIBRARY
;
407 * Determine if more messages should be processed
409 switch (dispatch_types
) {
410 case CPG_DISPATCH_ONE
:
411 if (ignore_dispatch
) {
417 case CPG_DISPATCH_ALL
:
418 if (ignore_dispatch
) {
422 case CPG_DISPATCH_BLOCKING
:
428 saHandleInstancePut (&cpg_handle_t_db
, handle
);
433 cpg_error_t
cpg_join (
435 struct cpg_name
*group
)
438 struct cpg_inst
*cpg_inst
;
440 struct req_lib_cpg_join req_lib_cpg_join
;
441 struct res_lib_cpg_join res_lib_cpg_join
;
442 struct req_lib_cpg_trackstart req_lib_cpg_trackstart
;
443 struct res_lib_cpg_trackstart res_lib_cpg_trackstart
;
445 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
446 if (error
!= SA_AIS_OK
) {
450 pthread_mutex_lock (&cpg_inst
->response_mutex
);
452 /* Automatically add a tracker */
453 req_lib_cpg_trackstart
.header
.size
= sizeof (struct req_lib_cpg_trackstart
);
454 req_lib_cpg_trackstart
.header
.id
= MESSAGE_REQ_CPG_TRACKSTART
;
455 marshall_to_mar_cpg_name_t (&req_lib_cpg_trackstart
.group_name
,
458 iov
[0].iov_base
= (char *)&req_lib_cpg_trackstart
;
459 iov
[0].iov_len
= sizeof (struct req_lib_cpg_trackstart
);
461 error
= saSendMsgReceiveReply (cpg_inst
->dispatch_fd
, iov
, 1,
462 &res_lib_cpg_trackstart
, sizeof (struct res_lib_cpg_trackstart
));
464 if (error
!= SA_AIS_OK
) {
465 pthread_mutex_unlock (&cpg_inst
->response_mutex
);
470 req_lib_cpg_join
.header
.size
= sizeof (struct req_lib_cpg_join
);
471 req_lib_cpg_join
.header
.id
= MESSAGE_REQ_CPG_JOIN
;
472 req_lib_cpg_join
.pid
= getpid();
473 marshall_to_mar_cpg_name_t (&req_lib_cpg_join
.group_name
,
476 iov
[0].iov_base
= (char *)&req_lib_cpg_join
;
477 iov
[0].iov_len
= sizeof (struct req_lib_cpg_join
);
479 error
= saSendMsgReceiveReply (cpg_inst
->response_fd
, iov
, 1,
480 &res_lib_cpg_join
, sizeof (struct res_lib_cpg_join
));
482 pthread_mutex_unlock (&cpg_inst
->response_mutex
);
484 if (error
!= SA_AIS_OK
) {
488 error
= res_lib_cpg_join
.header
.error
;
491 saHandleInstancePut (&cpg_handle_t_db
, handle
);
496 cpg_error_t
cpg_leave (
498 struct cpg_name
*group
)
501 struct cpg_inst
*cpg_inst
;
503 struct req_lib_cpg_leave req_lib_cpg_leave
;
504 struct res_lib_cpg_leave res_lib_cpg_leave
;
506 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
507 if (error
!= SA_AIS_OK
) {
511 req_lib_cpg_leave
.header
.size
= sizeof (struct req_lib_cpg_leave
);
512 req_lib_cpg_leave
.header
.id
= MESSAGE_REQ_CPG_LEAVE
;
513 req_lib_cpg_leave
.pid
= getpid();
514 marshall_to_mar_cpg_name_t (&req_lib_cpg_leave
.group_name
,
517 iov
[0].iov_base
= (char *)&req_lib_cpg_leave
;
518 iov
[0].iov_len
= sizeof (struct req_lib_cpg_leave
);
520 pthread_mutex_lock (&cpg_inst
->response_mutex
);
522 error
= saSendMsgReceiveReply (cpg_inst
->response_fd
, iov
, 1,
523 &res_lib_cpg_leave
, sizeof (struct res_lib_cpg_leave
));
525 pthread_mutex_unlock (&cpg_inst
->response_mutex
);
526 if (error
!= SA_AIS_OK
) {
530 error
= res_lib_cpg_leave
.header
.error
;
533 saHandleInstancePut (&cpg_handle_t_db
, handle
);
538 cpg_error_t
cpg_mcast_joined (
540 cpg_guarantee_t guarantee
,
546 struct cpg_inst
*cpg_inst
;
547 struct iovec iov
[64];
548 struct req_lib_cpg_mcast req_lib_cpg_mcast
;
549 struct res_lib_cpg_mcast res_lib_cpg_mcast
;
552 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
553 if (error
!= SA_AIS_OK
) {
557 for (i
= 0; i
< iov_len
; i
++ ) {
558 msg_len
+= iovec
[i
].iov_len
;
561 req_lib_cpg_mcast
.header
.size
= sizeof (struct req_lib_cpg_mcast
) +
564 req_lib_cpg_mcast
.header
.id
= MESSAGE_REQ_CPG_MCAST
;
565 req_lib_cpg_mcast
.guarantee
= guarantee
;
566 req_lib_cpg_mcast
.msglen
= msg_len
;
568 iov
[0].iov_base
= (char *)&req_lib_cpg_mcast
;
569 iov
[0].iov_len
= sizeof (struct req_lib_cpg_mcast
);
570 memcpy (&iov
[1], iovec
, iov_len
* sizeof (struct iovec
));
572 pthread_mutex_lock (&cpg_inst
->response_mutex
);
574 error
= saSendMsgReceiveReply (cpg_inst
->response_fd
, iov
, iov_len
+ 1,
575 &res_lib_cpg_mcast
, sizeof (res_lib_cpg_mcast
));
577 pthread_mutex_unlock (&cpg_inst
->response_mutex
);
579 if (error
!= SA_AIS_OK
) {
583 cpg_inst
->flow_control_state
= res_lib_cpg_mcast
.flow_control_state
;
584 if (res_lib_cpg_mcast
.header
.error
== CPG_ERR_TRY_AGAIN
) {
585 cpg_inst
->flow_control_state
= CPG_FLOW_CONTROL_ENABLED
;
587 error
= res_lib_cpg_mcast
.header
.error
;
590 saHandleInstancePut (&cpg_handle_t_db
, handle
);
595 cpg_error_t
cpg_membership_get (
597 struct cpg_name
*group_name
,
598 struct cpg_address
*member_list
,
599 int *member_list_entries
)
602 struct cpg_inst
*cpg_inst
;
604 struct req_lib_cpg_membership req_lib_cpg_membership_get
;
605 struct res_lib_cpg_confchg_callback res_lib_cpg_membership_get
;
608 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
609 if (error
!= SA_AIS_OK
) {
613 req_lib_cpg_membership_get
.header
.size
= sizeof (mar_req_header_t
);
614 req_lib_cpg_membership_get
.header
.id
= MESSAGE_REQ_CPG_MEMBERSHIP
;
615 marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get
.group_name
,
618 iov
.iov_base
= (char *)&req_lib_cpg_membership_get
;
619 iov
.iov_len
= sizeof (mar_req_header_t
);
621 pthread_mutex_lock (&cpg_inst
->response_mutex
);
623 error
= saSendMsgReceiveReply (cpg_inst
->response_fd
, &iov
, 1,
624 &res_lib_cpg_membership_get
, sizeof (mar_res_header_t
));
626 pthread_mutex_unlock (&cpg_inst
->response_mutex
);
628 if (error
!= SA_AIS_OK
) {
632 error
= res_lib_cpg_membership_get
.header
.error
;
635 * Copy results to caller
637 *member_list_entries
= res_lib_cpg_membership_get
.member_list_entries
;
639 for (i
= 0; i
< res_lib_cpg_membership_get
.member_list_entries
; i
++) {
640 marshall_from_mar_cpg_address_t (&member_list
[i
],
641 &res_lib_cpg_membership_get
.member_list
[i
]);
646 saHandleInstancePut (&cpg_handle_t_db
, handle
);
651 cpg_error_t
cpg_local_get (
653 unsigned int *local_nodeid
)
656 struct cpg_inst
*cpg_inst
;
658 struct req_lib_cpg_local_get req_lib_cpg_local_get
;
659 struct res_lib_cpg_local_get res_lib_cpg_local_get
;
661 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
662 if (error
!= SA_AIS_OK
) {
666 req_lib_cpg_local_get
.header
.size
= sizeof (mar_req_header_t
);
667 req_lib_cpg_local_get
.header
.id
= MESSAGE_REQ_CPG_LOCAL_GET
;
669 iov
.iov_base
= &req_lib_cpg_local_get
;
670 iov
.iov_len
= sizeof (struct req_lib_cpg_local_get
);
672 pthread_mutex_lock (&cpg_inst
->response_mutex
);
674 error
= saSendMsgReceiveReply (cpg_inst
->response_fd
, &iov
, 1,
675 &res_lib_cpg_local_get
, sizeof (res_lib_cpg_local_get
));
677 pthread_mutex_unlock (&cpg_inst
->response_mutex
);
679 if (error
!= SA_AIS_OK
) {
683 error
= res_lib_cpg_local_get
.header
.error
;
685 *local_nodeid
= res_lib_cpg_local_get
.local_nodeid
;
688 saHandleInstancePut (&cpg_handle_t_db
, handle
);
693 cpg_error_t
cpg_flow_control_state_get (
695 cpg_flow_control_state_t
*flow_control_state
)
698 struct cpg_inst
*cpg_inst
;
700 error
= saHandleInstanceGet (&cpg_handle_t_db
, handle
, (void *)&cpg_inst
);
701 if (error
!= SA_AIS_OK
) {
705 *flow_control_state
= cpg_inst
->flow_control_state
;
707 saHandleInstancePut (&cpg_handle_t_db
, handle
);