2 * vi: set autoindent tabstop=4 shiftwidth=4 :
4 * Copyright (c) 2004-2005 MontaVista Software, Inc.
5 * Copyright (c) 2006 Sun Microsystems, Inc.
9 * Author: Steven Dake (sdake@mvista.com)
11 * This software licensed under BSD license, the text of which follows:
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
38 * Provides an extended virtual synchrony API using the openais executive
45 #include <sys/types.h>
46 #include <sys/socket.h>
50 #include "../exec/totem.h"
60 evs_callbacks_t callbacks
;
61 pthread_mutex_t response_mutex
;
62 pthread_mutex_t dispatch_mutex
;
66 mar_res_header_t header
__attribute__((aligned(8)));
70 static void evs_instance_destructor (void *instance
);
72 static struct saHandleDatabase evs_handle_t_db
= {
75 .mutex
= PTHREAD_MUTEX_INITIALIZER
,
76 .handleInstanceDestructor
= evs_instance_destructor
80 * Clean up function for an evt instance (saEvtInitialize) handle
82 static void evs_instance_destructor (void *instance
)
84 struct evs_inst
*evs_inst
= instance
;
86 pthread_mutex_destroy (&evs_inst
->response_mutex
);
87 pthread_mutex_destroy (&evs_inst
->dispatch_mutex
);
92 * @defgroup evs_openais The extended virtual synchrony passthrough API
99 * @param handle The handle of evs initialize
100 * @param callbacks The callbacks for evs_initialize
103 evs_error_t
evs_initialize (
104 evs_handle_t
*handle
,
105 evs_callbacks_t
*callbacks
)
108 struct evs_inst
*evs_inst
;
110 error
= saHandleCreate (&evs_handle_t_db
, sizeof (struct evs_inst
), handle
);
111 if (error
!= SA_AIS_OK
) {
112 goto error_no_destroy
;
115 error
= saHandleInstanceGet (&evs_handle_t_db
, *handle
, (void *)&evs_inst
);
116 if (error
!= SA_AIS_OK
) {
120 error
= saServiceConnect (&evs_inst
->response_fd
,
121 &evs_inst
->dispatch_fd
,
123 if (error
!= SA_AIS_OK
) {
124 goto error_put_destroy
;
127 memcpy (&evs_inst
->callbacks
, callbacks
, sizeof (evs_callbacks_t
));
129 pthread_mutex_init (&evs_inst
->response_mutex
, NULL
);
131 pthread_mutex_init (&evs_inst
->dispatch_mutex
, NULL
);
133 saHandleInstancePut (&evs_handle_t_db
, *handle
);
138 saHandleInstancePut (&evs_handle_t_db
, *handle
);
140 saHandleDestroy (&evs_handle_t_db
, *handle
);
145 evs_error_t
evs_finalize (
148 struct evs_inst
*evs_inst
;
151 error
= saHandleInstanceGet (&evs_handle_t_db
, handle
, (void *)&evs_inst
);
152 if (error
!= SA_AIS_OK
) {
155 // TODO is the locking right here
156 pthread_mutex_lock (&evs_inst
->response_mutex
);
159 * Another thread has already started finalizing
161 if (evs_inst
->finalize
) {
162 pthread_mutex_unlock (&evs_inst
->response_mutex
);
163 saHandleInstancePut (&evs_handle_t_db
, handle
);
164 return (EVS_ERR_BAD_HANDLE
);
167 evs_inst
->finalize
= 1;
169 pthread_mutex_unlock (&evs_inst
->response_mutex
);
171 saHandleDestroy (&evs_handle_t_db
, handle
);
173 * Disconnect from the server
175 if (evs_inst
->response_fd
!= -1) {
176 shutdown(evs_inst
->response_fd
, 0);
177 close(evs_inst
->response_fd
);
179 if (evs_inst
->dispatch_fd
!= -1) {
180 shutdown(evs_inst
->dispatch_fd
, 0);
181 close(evs_inst
->dispatch_fd
);
183 saHandleInstancePut (&evs_handle_t_db
, handle
);
189 evs_error_t
evs_fd_get (
194 struct evs_inst
*evs_inst
;
196 error
= saHandleInstanceGet (&evs_handle_t_db
, handle
, (void *)&evs_inst
);
197 if (error
!= SA_AIS_OK
) {
201 *fd
= evs_inst
->dispatch_fd
;
203 saHandleInstancePut (&evs_handle_t_db
, handle
);
208 evs_error_t
evs_dispatch (
210 evs_dispatch_t dispatch_types
)
215 int cont
= 1; /* always continue do loop except when set to 0 */
217 struct evs_inst
*evs_inst
;
218 struct res_evs_confchg_callback
*res_evs_confchg_callback
;
219 struct res_evs_deliver_callback
*res_evs_deliver_callback
;
220 evs_callbacks_t callbacks
;
221 struct res_overlay dispatch_data
;
222 int ignore_dispatch
= 0;
224 error
= saHandleInstanceGet (&evs_handle_t_db
, handle
, (void *)&evs_inst
);
225 if (error
!= SA_AIS_OK
) {
230 * Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and
231 * wait indefinately for SA_DISPATCH_BLOCKING
233 if (dispatch_types
== EVS_DISPATCH_ALL
) {
238 ufds
.fd
= evs_inst
->dispatch_fd
;
239 ufds
.events
= POLLIN
;
242 error
= saPollRetry (&ufds
, 1, timeout
);
243 if (error
!= SA_AIS_OK
) {
247 pthread_mutex_lock (&evs_inst
->dispatch_mutex
);
250 * Regather poll data in case ufds has changed since taking lock
252 error
= saPollRetry (&ufds
, 1, 0);
253 if (error
!= SA_AIS_OK
) {
258 * Handle has been finalized in another thread
260 if (evs_inst
->finalize
== 1) {
262 pthread_mutex_unlock (&evs_inst
->dispatch_mutex
);
266 dispatch_avail
= ufds
.revents
& POLLIN
;
267 if (dispatch_avail
== 0 && dispatch_types
== EVS_DISPATCH_ALL
) {
268 pthread_mutex_unlock (&evs_inst
->dispatch_mutex
);
269 break; /* exit do while cont is 1 loop */
271 if (dispatch_avail
== 0) {
272 pthread_mutex_unlock (&evs_inst
->dispatch_mutex
);
273 continue; /* next poll */
276 if (ufds
.revents
& POLLIN
) {
278 * Queue empty, read response from socket
280 error
= saRecvRetry (evs_inst
->dispatch_fd
, &dispatch_data
.header
,
281 sizeof (mar_res_header_t
));
282 if (error
!= SA_AIS_OK
) {
285 if (dispatch_data
.header
.size
> sizeof (mar_res_header_t
)) {
286 error
= saRecvRetry (evs_inst
->dispatch_fd
, &dispatch_data
.data
,
287 dispatch_data
.header
.size
- sizeof (mar_res_header_t
));
289 if (error
!= SA_AIS_OK
) {
294 pthread_mutex_unlock (&evs_inst
->dispatch_mutex
);
299 * Make copy of callbacks, message data, unlock instance, and call callback
300 * A risk of this dispatch method is that the callback routines may
301 * operate at the same time that evsFinalize has been called.
303 memcpy (&callbacks
, &evs_inst
->callbacks
, sizeof (evs_callbacks_t
));
305 pthread_mutex_unlock (&evs_inst
->dispatch_mutex
);
307 * Dispatch incoming message
309 switch (dispatch_data
.header
.id
) {
310 case MESSAGE_RES_EVS_DELIVER_CALLBACK
:
311 res_evs_deliver_callback
= (struct res_evs_deliver_callback
*)&dispatch_data
;
312 callbacks
.evs_deliver_fn (
313 res_evs_deliver_callback
->local_nodeid
,
314 &res_evs_deliver_callback
->msg
,
315 res_evs_deliver_callback
->msglen
);
318 case MESSAGE_RES_EVS_CONFCHG_CALLBACK
:
319 res_evs_confchg_callback
= (struct res_evs_confchg_callback
*)&dispatch_data
;
320 callbacks
.evs_confchg_fn (
321 res_evs_confchg_callback
->member_list
,
322 res_evs_confchg_callback
->member_list_entries
,
323 res_evs_confchg_callback
->left_list
,
324 res_evs_confchg_callback
->left_list_entries
,
325 res_evs_confchg_callback
->joined_list
,
326 res_evs_confchg_callback
->joined_list_entries
);
330 error
= SA_AIS_ERR_LIBRARY
;
336 * Determine if more messages should be processed
338 switch (dispatch_types
) {
339 case EVS_DISPATCH_ONE
:
340 if (ignore_dispatch
) {
346 case EVS_DISPATCH_ALL
:
347 if (ignore_dispatch
) {
351 case EVS_DISPATCH_BLOCKING
:
357 saHandleInstancePut (&evs_handle_t_db
, handle
);
362 evs_error_t
evs_join (
364 struct evs_group
*groups
,
368 struct evs_inst
*evs_inst
;
370 struct req_lib_evs_join req_lib_evs_join
;
371 struct res_lib_evs_join res_lib_evs_join
;
373 error
= saHandleInstanceGet (&evs_handle_t_db
, handle
, (void *)&evs_inst
);
374 if (error
!= SA_AIS_OK
) {
378 req_lib_evs_join
.header
.size
= sizeof (struct req_lib_evs_join
) +
379 (group_entries
* sizeof (struct evs_group
));
380 req_lib_evs_join
.header
.id
= MESSAGE_REQ_EVS_JOIN
;
381 req_lib_evs_join
.group_entries
= group_entries
;
383 iov
[0].iov_base
= (char *)&req_lib_evs_join
;
384 iov
[0].iov_len
= sizeof (struct req_lib_evs_join
);
385 iov
[1].iov_base
= (char *)groups
;
386 iov
[1].iov_len
= (group_entries
* sizeof (struct evs_group
));
388 pthread_mutex_lock (&evs_inst
->response_mutex
);
390 error
= saSendMsgReceiveReply (evs_inst
->response_fd
, iov
, 2,
391 &res_lib_evs_join
, sizeof (struct res_lib_evs_join
));
393 pthread_mutex_unlock (&evs_inst
->response_mutex
);
395 if (error
!= SA_AIS_OK
) {
399 error
= res_lib_evs_join
.header
.error
;
402 saHandleInstancePut (&evs_handle_t_db
, handle
);
407 evs_error_t
evs_leave (
409 struct evs_group
*groups
,
413 struct evs_inst
*evs_inst
;
415 struct req_lib_evs_leave req_lib_evs_leave
;
416 struct res_lib_evs_leave res_lib_evs_leave
;
418 error
= saHandleInstanceGet (&evs_handle_t_db
, handle
, (void *)&evs_inst
);
419 if (error
!= SA_AIS_OK
) {
423 req_lib_evs_leave
.header
.size
= sizeof (struct req_lib_evs_leave
) +
424 (group_entries
* sizeof (struct evs_group
));
425 req_lib_evs_leave
.header
.id
= MESSAGE_REQ_EVS_LEAVE
;
426 req_lib_evs_leave
.group_entries
= group_entries
;
428 iov
[0].iov_base
= (char *)&req_lib_evs_leave
;
429 iov
[0].iov_len
= sizeof (struct req_lib_evs_leave
);
430 iov
[1].iov_base
= (char *)groups
;
431 iov
[1].iov_len
= (group_entries
* sizeof (struct evs_group
));
433 pthread_mutex_lock (&evs_inst
->response_mutex
);
435 error
= saSendMsgReceiveReply (evs_inst
->response_fd
, iov
, 2,
436 &res_lib_evs_leave
, sizeof (struct res_lib_evs_leave
));
438 pthread_mutex_unlock (&evs_inst
->response_mutex
);
440 if (error
!= SA_AIS_OK
) {
444 error
= res_lib_evs_leave
.header
.error
;
447 saHandleInstancePut (&evs_handle_t_db
, handle
);
452 evs_error_t
evs_mcast_joined (
454 evs_guarantee_t guarantee
,
460 struct evs_inst
*evs_inst
;
461 struct iovec iov
[64];
462 struct req_lib_evs_mcast_joined req_lib_evs_mcast_joined
;
463 struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined
;
466 error
= saHandleInstanceGet (&evs_handle_t_db
, handle
, (void *)&evs_inst
);
467 if (error
!= SA_AIS_OK
) {
471 for (i
= 0; i
< iov_len
; i
++ ) {
472 msg_len
+= iovec
[i
].iov_len
;
475 req_lib_evs_mcast_joined
.header
.size
= sizeof (struct req_lib_evs_mcast_joined
) +
478 req_lib_evs_mcast_joined
.header
.id
= MESSAGE_REQ_EVS_MCAST_JOINED
;
479 req_lib_evs_mcast_joined
.guarantee
= guarantee
;
480 req_lib_evs_mcast_joined
.msg_len
= msg_len
;
482 iov
[0].iov_base
= (char *)&req_lib_evs_mcast_joined
;
483 iov
[0].iov_len
= sizeof (struct req_lib_evs_mcast_joined
);
484 memcpy (&iov
[1], iovec
, iov_len
* sizeof (struct iovec
));
486 pthread_mutex_lock (&evs_inst
->response_mutex
);
488 error
= saSendMsgReceiveReply (evs_inst
->response_fd
, iov
, iov_len
+ 1,
489 &res_lib_evs_mcast_joined
, sizeof (struct res_lib_evs_mcast_joined
));
491 pthread_mutex_unlock (&evs_inst
->response_mutex
);
493 if (error
!= SA_AIS_OK
) {
497 error
= res_lib_evs_mcast_joined
.header
.error
;
500 saHandleInstancePut (&evs_handle_t_db
, handle
);
505 evs_error_t
evs_mcast_groups (
507 evs_guarantee_t guarantee
,
508 struct evs_group
*groups
,
515 struct evs_inst
*evs_inst
;
516 struct iovec iov
[64];
517 struct req_lib_evs_mcast_groups req_lib_evs_mcast_groups
;
518 struct res_lib_evs_mcast_groups res_lib_evs_mcast_groups
;
521 error
= saHandleInstanceGet (&evs_handle_t_db
, handle
, (void *)&evs_inst
);
522 if (error
!= SA_AIS_OK
) {
525 for (i
= 0; i
< iov_len
; i
++) {
526 msg_len
+= iovec
[i
].iov_len
;
528 req_lib_evs_mcast_groups
.header
.size
= sizeof (struct req_lib_evs_mcast_groups
) +
529 (group_entries
* sizeof (struct evs_group
)) + msg_len
;
530 req_lib_evs_mcast_groups
.header
.id
= MESSAGE_REQ_EVS_MCAST_GROUPS
;
531 req_lib_evs_mcast_groups
.guarantee
= guarantee
;
532 req_lib_evs_mcast_groups
.msg_len
= msg_len
;
533 req_lib_evs_mcast_groups
.group_entries
= group_entries
;
535 iov
[0].iov_base
= (char *)&req_lib_evs_mcast_groups
;
536 iov
[0].iov_len
= sizeof (struct req_lib_evs_mcast_groups
);
537 iov
[1].iov_base
= (char *)groups
;
538 iov
[1].iov_len
= (group_entries
* sizeof (struct evs_group
));
539 memcpy (&iov
[2], iovec
, iov_len
* sizeof (struct iovec
));
541 pthread_mutex_lock (&evs_inst
->response_mutex
);
543 error
= saSendMsgReceiveReply (evs_inst
->response_fd
, iov
, iov_len
+ 2,
544 &res_lib_evs_mcast_groups
, sizeof (struct res_lib_evs_mcast_groups
));
546 pthread_mutex_unlock (&evs_inst
->response_mutex
);
547 if (error
!= SA_AIS_OK
) {
551 error
= res_lib_evs_mcast_groups
.header
.error
;
554 saHandleInstancePut (&evs_handle_t_db
, handle
);
559 evs_error_t
evs_membership_get (
561 unsigned int *local_nodeid
,
562 unsigned int *member_list
,
563 unsigned int *member_list_entries
)
566 struct evs_inst
*evs_inst
;
568 struct req_lib_evs_membership_get req_lib_evs_membership_get
;
569 struct res_lib_evs_membership_get res_lib_evs_membership_get
;
571 error
= saHandleInstanceGet (&evs_handle_t_db
, handle
, (void *)&evs_inst
);
572 if (error
!= SA_AIS_OK
) {
576 req_lib_evs_membership_get
.header
.size
= sizeof (struct req_lib_evs_membership_get
);
577 req_lib_evs_membership_get
.header
.id
= MESSAGE_REQ_EVS_MEMBERSHIP_GET
;
579 iov
.iov_base
= (char *)&req_lib_evs_membership_get
;
580 iov
.iov_len
= sizeof (struct req_lib_evs_membership_get
);
582 pthread_mutex_lock (&evs_inst
->response_mutex
);
584 error
= saSendMsgReceiveReply (evs_inst
->response_fd
, &iov
, 1,
585 &res_lib_evs_membership_get
, sizeof (struct res_lib_evs_membership_get
));
587 pthread_mutex_unlock (&evs_inst
->response_mutex
);
589 if (error
!= SA_AIS_OK
) {
593 error
= res_lib_evs_membership_get
.header
.error
;
596 * Copy results to caller
599 *local_nodeid
= res_lib_evs_membership_get
.local_nodeid
;
601 *member_list_entries
= *member_list_entries
< res_lib_evs_membership_get
.member_list_entries
?
602 *member_list_entries
: res_lib_evs_membership_get
.member_list_entries
;
604 memcpy (member_list
, &res_lib_evs_membership_get
.member_list
,
605 *member_list_entries
* sizeof (struct in_addr
));
609 saHandleInstancePut (&evs_handle_t_db
, handle
);