Expose confdb write to the library.
[openais.git] / lib / cpg.c
blob2f4869457c2be8eecf14255e600cf51f4e375c52
1 /*
2 * vi: set autoindent tabstop=4 shiftwidth=4 :
4 * Copyright (c) 2006 Red Hat, Inc.
6 * All rights reserved.
8 * Author: Patrick Caulfield (pcaulfie@redhat.com)
10 * This software licensed under BSD license, the text of which follows:
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions are met:
15 * - Redistributions of source code must retain the above copyright notice,
16 * this list of conditions and the following disclaimer.
17 * - Redistributions in binary form must reproduce the above copyright notice,
18 * this list of conditions and the following disclaimer in the documentation
19 * and/or other materials provided with the distribution.
20 * - Neither the name of the MontaVista Software, Inc. nor the names of its
21 * contributors may be used to endorse or promote products derived from this
22 * software without specific prior written permission.
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
34 * THE POSSIBILITY OF SUCH DAMAGE.
37 * Provides a closed process group API using the openais executive
40 #include <stdlib.h>
41 #include <string.h>
42 #include <unistd.h>
43 #include <pthread.h>
44 #include <sys/types.h>
45 #include <sys/socket.h>
46 #include <errno.h>
48 #include <saAis.h>
49 #include <cpg.h>
50 #include <ipc_cpg.h>
51 #include <mar_cpg.h>
52 #include <ais_util.h>
54 struct cpg_inst {
55 int response_fd;
56 int dispatch_fd;
57 int finalize;
58 cpg_flow_control_state_t flow_control_state;
59 cpg_callbacks_t callbacks;
60 void *context;
61 pthread_mutex_t response_mutex;
62 pthread_mutex_t dispatch_mutex;
65 static void cpg_instance_destructor (void *instance);
67 static struct saHandleDatabase cpg_handle_t_db = {
68 .handleCount = 0,
69 .handles = 0,
70 .mutex = PTHREAD_MUTEX_INITIALIZER,
71 .handleInstanceDestructor = cpg_instance_destructor
75 * Clean up function for a cpg instance (cpg_nitialize) handle
77 static void cpg_instance_destructor (void *instance)
79 struct cpg_inst *cpg_inst = instance;
81 pthread_mutex_destroy (&cpg_inst->response_mutex);
82 pthread_mutex_destroy (&cpg_inst->dispatch_mutex);
86 /**
87 * @defgroup cpg_openais The closed process group API
88 * @ingroup openais
90 * @{
93 cpg_error_t cpg_initialize (
94 cpg_handle_t *handle,
95 cpg_callbacks_t *callbacks)
97 SaAisErrorT error;
98 struct cpg_inst *cpg_inst;
100 error = saHandleCreate (&cpg_handle_t_db, sizeof (struct cpg_inst), handle);
101 if (error != SA_AIS_OK) {
102 goto error_no_destroy;
105 error = saHandleInstanceGet (&cpg_handle_t_db, *handle, (void *)&cpg_inst);
106 if (error != SA_AIS_OK) {
107 goto error_destroy;
110 error = saServiceConnect (&cpg_inst->dispatch_fd,
111 &cpg_inst->response_fd,
112 CPG_SERVICE);
113 if (error != SA_AIS_OK) {
114 goto error_put_destroy;
117 memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t));
119 pthread_mutex_init (&cpg_inst->response_mutex, NULL);
121 pthread_mutex_init (&cpg_inst->dispatch_mutex, NULL);
123 saHandleInstancePut (&cpg_handle_t_db, *handle);
125 return (SA_AIS_OK);
127 error_put_destroy:
128 saHandleInstancePut (&cpg_handle_t_db, *handle);
129 error_destroy:
130 saHandleDestroy (&cpg_handle_t_db, *handle);
131 error_no_destroy:
132 return (error);
135 cpg_error_t cpg_finalize (
136 cpg_handle_t handle)
138 struct cpg_inst *cpg_inst;
139 SaAisErrorT error;
141 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
142 if (error != SA_AIS_OK) {
143 return (error);
146 pthread_mutex_lock (&cpg_inst->response_mutex);
149 * Another thread has already started finalizing
151 if (cpg_inst->finalize) {
152 pthread_mutex_unlock (&cpg_inst->response_mutex);
153 saHandleInstancePut (&cpg_handle_t_db, handle);
154 return (CPG_ERR_BAD_HANDLE);
157 cpg_inst->finalize = 1;
159 pthread_mutex_unlock (&cpg_inst->response_mutex);
161 saHandleDestroy (&cpg_handle_t_db, handle);
164 * Disconnect from the server
166 if (cpg_inst->response_fd != -1) {
167 shutdown(cpg_inst->response_fd, 0);
168 close(cpg_inst->response_fd);
170 if (cpg_inst->dispatch_fd != -1) {
171 shutdown(cpg_inst->dispatch_fd, 0);
172 close(cpg_inst->dispatch_fd);
174 saHandleInstancePut (&cpg_handle_t_db, handle);
176 return (CPG_OK);
179 cpg_error_t cpg_fd_get (
180 cpg_handle_t handle,
181 int *fd)
183 SaAisErrorT error;
184 struct cpg_inst *cpg_inst;
186 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
187 if (error != SA_AIS_OK) {
188 return (error);
191 *fd = cpg_inst->dispatch_fd;
193 saHandleInstancePut (&cpg_handle_t_db, handle);
195 return (SA_AIS_OK);
198 cpg_error_t cpg_context_get (
199 cpg_handle_t handle,
200 void **context)
202 SaAisErrorT error;
203 struct cpg_inst *cpg_inst;
205 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
206 if (error != SA_AIS_OK) {
207 return (error);
210 *context = cpg_inst->context;
212 saHandleInstancePut (&cpg_handle_t_db, handle);
214 return (SA_AIS_OK);
217 cpg_error_t cpg_context_set (
218 cpg_handle_t handle,
219 void *context)
221 SaAisErrorT error;
222 struct cpg_inst *cpg_inst;
224 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
225 if (error != SA_AIS_OK) {
226 return (error);
229 cpg_inst->context = context;
231 saHandleInstancePut (&cpg_handle_t_db, handle);
233 return (SA_AIS_OK);
236 struct res_overlay {
237 mar_res_header_t header __attribute__((aligned(8)));
238 char data[512000];
241 cpg_error_t cpg_dispatch (
242 cpg_handle_t handle,
243 cpg_dispatch_t dispatch_types)
245 struct pollfd ufds;
246 int timeout = -1;
247 SaAisErrorT error;
248 int cont = 1; /* always continue do loop except when set to 0 */
249 int dispatch_avail;
250 struct cpg_inst *cpg_inst;
251 struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
252 struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
253 cpg_callbacks_t callbacks;
254 struct res_overlay dispatch_data;
255 int ignore_dispatch = 0;
256 struct cpg_address member_list[CPG_MEMBERS_MAX];
257 struct cpg_address left_list[CPG_MEMBERS_MAX];
258 struct cpg_address joined_list[CPG_MEMBERS_MAX];
259 struct cpg_name group_name;
260 mar_cpg_address_t *left_list_start;
261 mar_cpg_address_t *joined_list_start;
262 unsigned int i;
264 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
265 if (error != SA_AIS_OK) {
266 return (error);
270 * Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and
271 * wait indefinately for SA_DISPATCH_BLOCKING
273 if (dispatch_types == CPG_DISPATCH_ALL) {
274 timeout = 0;
277 do {
278 ufds.fd = cpg_inst->dispatch_fd;
279 ufds.events = POLLIN;
280 ufds.revents = 0;
282 error = saPollRetry (&ufds, 1, timeout);
283 if (error != SA_AIS_OK) {
284 goto error_nounlock;
287 pthread_mutex_lock (&cpg_inst->dispatch_mutex);
290 * Regather poll data in case ufds has changed since taking lock
292 error = saPollRetry (&ufds, 1, timeout);
293 if (error != SA_AIS_OK) {
294 goto error_nounlock;
298 * Handle has been finalized in another thread
300 if (cpg_inst->finalize == 1) {
301 error = CPG_OK;
302 pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
303 goto error_unlock;
306 dispatch_avail = ufds.revents & POLLIN;
307 if (dispatch_avail == 0 && dispatch_types == CPG_DISPATCH_ALL) {
308 pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
309 break; /* exit do while cont is 1 loop */
310 } else
311 if (dispatch_avail == 0) {
312 pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
313 continue; /* next poll */
316 if (ufds.revents & POLLIN) {
318 * Queue empty, read response from socket
320 error = saRecvRetry (cpg_inst->dispatch_fd, &dispatch_data.header,
321 sizeof (mar_res_header_t));
322 if (error != SA_AIS_OK) {
323 goto error_unlock;
325 if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
326 error = saRecvRetry (cpg_inst->dispatch_fd, &dispatch_data.data,
327 dispatch_data.header.size - sizeof (mar_res_header_t));
329 if (error != SA_AIS_OK) {
330 goto error_unlock;
333 } else {
334 pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
335 continue;
339 * Make copy of callbacks, message data, unlock instance, and call callback
340 * A risk of this dispatch method is that the callback routines may
341 * operate at the same time that cpgFinalize has been called.
343 memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t));
345 pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
347 * Dispatch incoming message
349 switch (dispatch_data.header.id) {
350 case MESSAGE_RES_CPG_DELIVER_CALLBACK:
351 res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
353 cpg_inst->flow_control_state = res_cpg_deliver_callback->flow_control_state;
354 marshall_from_mar_cpg_name_t (
355 &group_name,
356 &res_cpg_deliver_callback->group_name);
358 callbacks.cpg_deliver_fn (handle,
359 &group_name,
360 res_cpg_deliver_callback->nodeid,
361 res_cpg_deliver_callback->pid,
362 &res_cpg_deliver_callback->message,
363 res_cpg_deliver_callback->msglen);
364 break;
366 case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
367 res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)&dispatch_data;
369 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
370 marshall_from_mar_cpg_address_t (&member_list[i],
371 &res_cpg_confchg_callback->member_list[i]);
373 left_list_start = res_cpg_confchg_callback->member_list +
374 res_cpg_confchg_callback->member_list_entries;
375 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
376 marshall_from_mar_cpg_address_t (&left_list[i],
377 &left_list_start[i]);
379 joined_list_start = res_cpg_confchg_callback->member_list +
380 res_cpg_confchg_callback->member_list_entries +
381 res_cpg_confchg_callback->left_list_entries;
382 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
383 marshall_from_mar_cpg_address_t (&joined_list[i],
384 &joined_list_start[i]);
386 marshall_from_mar_cpg_name_t (
387 &group_name,
388 &res_cpg_confchg_callback->group_name);
390 callbacks.cpg_confchg_fn (handle,
391 &group_name,
392 member_list,
393 res_cpg_confchg_callback->member_list_entries,
394 left_list,
395 res_cpg_confchg_callback->left_list_entries,
396 joined_list,
397 res_cpg_confchg_callback->joined_list_entries);
398 break;
400 default:
401 error = SA_AIS_ERR_LIBRARY;
402 goto error_nounlock;
403 break;
407 * Determine if more messages should be processed
408 * */
409 switch (dispatch_types) {
410 case CPG_DISPATCH_ONE:
411 if (ignore_dispatch) {
412 ignore_dispatch = 0;
413 } else {
414 cont = 0;
416 break;
417 case CPG_DISPATCH_ALL:
418 if (ignore_dispatch) {
419 ignore_dispatch = 0;
421 break;
422 case CPG_DISPATCH_BLOCKING:
423 break;
425 } while (cont);
427 error_unlock:
428 saHandleInstancePut (&cpg_handle_t_db, handle);
429 error_nounlock:
430 return (error);
433 cpg_error_t cpg_join (
434 cpg_handle_t handle,
435 struct cpg_name *group)
437 cpg_error_t error;
438 struct cpg_inst *cpg_inst;
439 struct iovec iov[2];
440 struct req_lib_cpg_join req_lib_cpg_join;
441 struct res_lib_cpg_join res_lib_cpg_join;
442 struct req_lib_cpg_trackstart req_lib_cpg_trackstart;
443 struct res_lib_cpg_trackstart res_lib_cpg_trackstart;
445 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
446 if (error != SA_AIS_OK) {
447 return (error);
450 pthread_mutex_lock (&cpg_inst->response_mutex);
452 /* Automatically add a tracker */
453 req_lib_cpg_trackstart.header.size = sizeof (struct req_lib_cpg_trackstart);
454 req_lib_cpg_trackstart.header.id = MESSAGE_REQ_CPG_TRACKSTART;
455 marshall_to_mar_cpg_name_t (&req_lib_cpg_trackstart.group_name,
456 group);
458 iov[0].iov_base = (char *)&req_lib_cpg_trackstart;
459 iov[0].iov_len = sizeof (struct req_lib_cpg_trackstart);
461 error = saSendMsgReceiveReply (cpg_inst->dispatch_fd, iov, 1,
462 &res_lib_cpg_trackstart, sizeof (struct res_lib_cpg_trackstart));
464 if (error != SA_AIS_OK) {
465 pthread_mutex_unlock (&cpg_inst->response_mutex);
466 goto error_exit;
469 /* Now join */
470 req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
471 req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
472 req_lib_cpg_join.pid = getpid();
473 marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
474 group);
476 iov[0].iov_base = (char *)&req_lib_cpg_join;
477 iov[0].iov_len = sizeof (struct req_lib_cpg_join);
479 error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1,
480 &res_lib_cpg_join, sizeof (struct res_lib_cpg_join));
482 pthread_mutex_unlock (&cpg_inst->response_mutex);
484 if (error != SA_AIS_OK) {
485 goto error_exit;
488 error = res_lib_cpg_join.header.error;
490 error_exit:
491 saHandleInstancePut (&cpg_handle_t_db, handle);
493 return (error);
496 cpg_error_t cpg_leave (
497 cpg_handle_t handle,
498 struct cpg_name *group)
500 cpg_error_t error;
501 struct cpg_inst *cpg_inst;
502 struct iovec iov[2];
503 struct req_lib_cpg_leave req_lib_cpg_leave;
504 struct res_lib_cpg_leave res_lib_cpg_leave;
506 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
507 if (error != SA_AIS_OK) {
508 return (error);
511 req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
512 req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
513 req_lib_cpg_leave.pid = getpid();
514 marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
515 group);
517 iov[0].iov_base = (char *)&req_lib_cpg_leave;
518 iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
520 pthread_mutex_lock (&cpg_inst->response_mutex);
522 error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1,
523 &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
525 pthread_mutex_unlock (&cpg_inst->response_mutex);
526 if (error != SA_AIS_OK) {
527 goto error_exit;
530 error = res_lib_cpg_leave.header.error;
532 error_exit:
533 saHandleInstancePut (&cpg_handle_t_db, handle);
535 return (error);
538 cpg_error_t cpg_mcast_joined (
539 cpg_handle_t handle,
540 cpg_guarantee_t guarantee,
541 struct iovec *iovec,
542 int iov_len)
544 int i;
545 cpg_error_t error;
546 struct cpg_inst *cpg_inst;
547 struct iovec iov[64];
548 struct req_lib_cpg_mcast req_lib_cpg_mcast;
549 struct res_lib_cpg_mcast res_lib_cpg_mcast;
550 int msg_len = 0;
552 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
553 if (error != SA_AIS_OK) {
554 return (error);
557 for (i = 0; i < iov_len; i++ ) {
558 msg_len += iovec[i].iov_len;
561 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
562 msg_len;
564 req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
565 req_lib_cpg_mcast.guarantee = guarantee;
566 req_lib_cpg_mcast.msglen = msg_len;
568 iov[0].iov_base = (char *)&req_lib_cpg_mcast;
569 iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
570 memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
572 pthread_mutex_lock (&cpg_inst->response_mutex);
574 error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, iov_len + 1,
575 &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast));
577 pthread_mutex_unlock (&cpg_inst->response_mutex);
579 if (error != SA_AIS_OK) {
580 goto error_exit;
583 cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
584 if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) {
585 cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
587 error = res_lib_cpg_mcast.header.error;
589 error_exit:
590 saHandleInstancePut (&cpg_handle_t_db, handle);
592 return (error);
595 cpg_error_t cpg_membership_get (
596 cpg_handle_t handle,
597 struct cpg_name *group_name,
598 struct cpg_address *member_list,
599 int *member_list_entries)
601 cpg_error_t error;
602 struct cpg_inst *cpg_inst;
603 struct iovec iov;
604 struct req_lib_cpg_membership req_lib_cpg_membership_get;
605 struct res_lib_cpg_confchg_callback res_lib_cpg_membership_get;
606 unsigned int i;
608 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
609 if (error != SA_AIS_OK) {
610 return (error);
613 req_lib_cpg_membership_get.header.size = sizeof (mar_req_header_t);
614 req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
615 marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
616 group_name);
618 iov.iov_base = (char *)&req_lib_cpg_membership_get;
619 iov.iov_len = sizeof (mar_req_header_t);
621 pthread_mutex_lock (&cpg_inst->response_mutex);
623 error = saSendMsgReceiveReply (cpg_inst->response_fd, &iov, 1,
624 &res_lib_cpg_membership_get, sizeof (mar_res_header_t));
626 pthread_mutex_unlock (&cpg_inst->response_mutex);
628 if (error != SA_AIS_OK) {
629 goto error_exit;
632 error = res_lib_cpg_membership_get.header.error;
635 * Copy results to caller
637 *member_list_entries = res_lib_cpg_membership_get.member_list_entries;
638 if (member_list) {
639 for (i = 0; i < res_lib_cpg_membership_get.member_list_entries; i++) {
640 marshall_from_mar_cpg_address_t (&member_list[i],
641 &res_lib_cpg_membership_get.member_list[i]);
645 error_exit:
646 saHandleInstancePut (&cpg_handle_t_db, handle);
648 return (error);
651 cpg_error_t cpg_local_get (
652 cpg_handle_t handle,
653 unsigned int *local_nodeid)
655 cpg_error_t error;
656 struct cpg_inst *cpg_inst;
657 struct iovec iov;
658 struct req_lib_cpg_local_get req_lib_cpg_local_get;
659 struct res_lib_cpg_local_get res_lib_cpg_local_get;
661 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
662 if (error != SA_AIS_OK) {
663 return (error);
666 req_lib_cpg_local_get.header.size = sizeof (mar_req_header_t);
667 req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
669 iov.iov_base = &req_lib_cpg_local_get;
670 iov.iov_len = sizeof (struct req_lib_cpg_local_get);
672 pthread_mutex_lock (&cpg_inst->response_mutex);
674 error = saSendMsgReceiveReply (cpg_inst->response_fd, &iov, 1,
675 &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
677 pthread_mutex_unlock (&cpg_inst->response_mutex);
679 if (error != SA_AIS_OK) {
680 goto error_exit;
683 error = res_lib_cpg_local_get.header.error;
685 *local_nodeid = res_lib_cpg_local_get.local_nodeid;
687 error_exit:
688 saHandleInstancePut (&cpg_handle_t_db, handle);
690 return (error);
693 cpg_error_t cpg_flow_control_state_get (
694 cpg_handle_t handle,
695 cpg_flow_control_state_t *flow_control_state)
697 cpg_error_t error;
698 struct cpg_inst *cpg_inst;
700 error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
701 if (error != SA_AIS_OK) {
702 return (error);
705 *flow_control_state = cpg_inst->flow_control_state;
707 saHandleInstancePut (&cpg_handle_t_db, handle);
709 return (error);
711 /** @} */