2 * Copyright (c) 2005 MontaVista Software, Inc.
6 * Author: Steven Dake (sdake@mvista.com)
8 * This software licensed under BSD license, the text of which follows:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions are met:
13 * - Redistributions of source code must retain the above copyright notice,
14 * this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 * - Neither the name of the MontaVista Software, Inc. nor the names of its
19 * contributors may be used to endorse or promote products derived from this
20 * software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32 * THE POSSIBILITY OF SUCH DAMAGE.
42 #include <sys/types.h>
44 #include <sys/socket.h>
45 #include <sys/select.h>
56 struct message_overlay
{
57 mar_res_header_t header
__attribute__((aligned(8)));
62 * Data structure for instance data
67 SaMsgCallbacksT callbacks
;
69 SaMsgHandleT msgHandle
;
70 pthread_mutex_t response_mutex
;
71 pthread_mutex_t dispatch_mutex
;
72 struct list_head queue_list
;
75 struct msgQueueInstance
{
77 SaMsgHandleT msgHandle
;
78 SaMsgQueueHandleT queueHandle
;
79 SaMsgQueueOpenFlagsT openFlags
;
81 struct list_head list
;
82 struct list_head section_iteration_list_head
;
83 pthread_mutex_t
*response_mutex
;
87 void msgHandleInstanceDestructor (void *instance
);
88 void queueHandleInstanceDestructor (void *instance
);
91 * All MSG instances in this database
93 static struct saHandleDatabase msgHandleDatabase
= {
96 .mutex
= PTHREAD_MUTEX_INITIALIZER
,
97 .handleInstanceDestructor
= msgHandleInstanceDestructor
101 * All Queue instances in this database
103 static struct saHandleDatabase queueHandleDatabase
= {
106 .mutex
= PTHREAD_MUTEX_INITIALIZER
,
107 .handleInstanceDestructor
= queueHandleInstanceDestructor
113 static SaVersionT msgVersionsSupported
[] = {
117 static struct saVersionDatabase msgVersionDatabase
= {
118 sizeof (msgVersionsSupported
) / sizeof (SaVersionT
),
122 struct iteratorSectionIdListEntry
{
123 struct list_head list
;
124 unsigned char data
[0];
131 void msgHandleInstanceDestructor (void *instance
)
133 struct msgInstance
*msgInstance
= instance
;
135 pthread_mutex_destroy (&msgInstance
->response_mutex
);
136 pthread_mutex_destroy (&msgInstance
->dispatch_mutex
);
139 void queueHandleInstanceDestructor (void *instance
)
145 static void msgQueueInstanceFinalize (struct msgQueueInstance
*msgQueueInstance
)
147 struct msgSectionIterationInstance
*sectionIterationInstance
;
148 struct list_head
*sectionIterationList
;
149 struct list_head
*sectionIterationListNext
;
151 for (sectionIterationList
= msgQueueInstance
->section_iteration_list_head
.next
,
152 sectionIterationListNext
= sectionIterationList
->next
;
153 sectionIterationList
!= &msgQueueInstance
->section_iteration_list_head
;
154 sectionIterationList
= sectionIterationListNext
,
155 sectionIterationListNext
= sectionIterationList
->next
) {
157 sectionIterationInstance
= list_entry (sectionIterationList
,
158 struct msgSectionIterationInstance
, list
);
160 msgSectionIterationInstanceFinalize (sectionIterationInstance
);
163 list_del (&msgQueueInstance
->list
);
165 saHandleDestroy (&queueHandleDatabase
, msgQueueInstance
->queueHandle
);
168 static void msgInstanceFinalize (struct msgInstance
*msgInstance
)
170 struct msgQueueInstance
*msgQueueInstance
;
171 struct list_head
*queueInstanceList
;
172 struct list_head
*queueInstanceListNext
;
174 for (queueInstanceList
= msgInstance
->queue_list
.next
,
175 queueInstanceListNext
= queueInstanceList
->next
;
176 queueInstanceList
!= &msgInstance
->queue_list
;
177 queueInstanceList
= queueInstanceListNext
,
178 queueInstanceListNext
= queueInstanceList
->next
) {
180 msgQueueInstance
= list_entry (queueInstanceList
,
181 struct msgQueueInstance
, list
);
183 msgQueueInstanceFinalize (msgQueueInstance
);
186 saHandleDestroy (&msgHandleDatabase
, msgInstance
->msgHandle
);
193 SaMsgHandleT
*msgHandle
,
194 const SaMsgCallbacksT
*callbacks
,
197 struct msgInstance
*msgInstance
;
198 SaAisErrorT error
= SA_AIS_OK
;
200 if (msgHandle
== NULL
) {
201 return (SA_AIS_ERR_INVALID_PARAM
);
204 error
= saVersionVerify (&msgVersionDatabase
, version
);
205 if (error
!= SA_AIS_OK
) {
206 goto error_no_destroy
;
209 error
= saHandleCreate (&msgHandleDatabase
, sizeof (struct msgInstance
),
211 if (error
!= SA_AIS_OK
) {
212 goto error_no_destroy
;
215 error
= saHandleInstanceGet (&msgHandleDatabase
, *msgHandle
,
216 (void *)&msgInstance
);
217 if (error
!= SA_AIS_OK
) {
221 msgInstance
->response_fd
= -1;
223 error
= saServiceConnect (&msgInstance
->response_fd
,
224 &msgInstance
->dispatch_fd
, MSG_SERVICE
);
225 if (error
!= SA_AIS_OK
) {
226 goto error_put_destroy
;
230 memcpy (&msgInstance
->callbacks
, callbacks
, sizeof (SaMsgCallbacksT
));
232 memset (&msgInstance
->callbacks
, 0, sizeof (SaMsgCallbacksT
));
235 list_init (&msgInstance
->queue_list
);
237 msgInstance
->msgHandle
= *msgHandle
;
239 pthread_mutex_init (&msgInstance
->response_mutex
, NULL
);
241 saHandleInstancePut (&msgHandleDatabase
, *msgHandle
);
246 saHandleInstancePut (&msgHandleDatabase
, *msgHandle
);
248 saHandleDestroy (&msgHandleDatabase
, *msgHandle
);
254 saMsgSelectionObjectGet (
255 const SaMsgHandleT msgHandle
,
256 SaSelectionObjectT
*selectionObject
)
258 struct msgInstance
*msgInstance
;
261 if (selectionObject
== NULL
) {
262 return (SA_AIS_ERR_INVALID_PARAM
);
264 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
265 if (error
!= SA_AIS_OK
) {
269 *selectionObject
= msgInstance
->dispatch_fd
;
271 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
278 const SaMsgHandleT msgHandle
,
279 SaDispatchFlagsT dispatchFlags
)
284 SaMsgCallbacksT callbacks
;
287 struct msgInstance
*msgInstance
;
288 int cont
= 1; /* always continue do loop except when set to 0 */
289 struct message_overlay dispatch_data
;
291 struct res_lib_msg_queueopenasync *res_lib_msg_queueopenasync;
292 struct res_lib_msg_queuesynchronizeasync *res_lib_msg_queuesynchronizeasync;
293 struct msgQueueInstance *msgQueueInstance;
296 if (dispatchFlags
!= SA_DISPATCH_ONE
&&
297 dispatchFlags
!= SA_DISPATCH_ALL
&&
298 dispatchFlags
!= SA_DISPATCH_BLOCKING
) {
300 return (SA_AIS_ERR_INVALID_PARAM
);
303 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
,
304 (void *)&msgInstance
);
305 if (error
!= SA_AIS_OK
) {
310 * Timeout instantly for SA_DISPATCH_ALL
312 if (dispatchFlags
== SA_DISPATCH_ALL
) {
318 * Read data directly from socket
320 poll_fd
= msgInstance
->dispatch_fd
;
322 ufds
.events
= POLLIN
;
325 error
= saPollRetry(&ufds
, 1, timeout
);
326 if (error
!= SA_AIS_OK
) {
329 pthread_mutex_lock(&msgInstance
->dispatch_mutex
);
331 if (msgInstance
->finalize
== 1) {
336 if ((ufds
.revents
& (POLLERR
|POLLHUP
|POLLNVAL
)) != 0) {
337 error
= SA_AIS_ERR_BAD_HANDLE
;
341 dispatch_avail
= (ufds
.revents
& POLLIN
);
343 if (dispatch_avail
== 0 && dispatchFlags
== SA_DISPATCH_ALL
) {
344 pthread_mutex_unlock(&msgInstance
->dispatch_mutex
);
345 break; /* exit do while cont is 1 loop */
347 if (dispatch_avail
== 0) {
348 pthread_mutex_unlock(&msgInstance
->dispatch_mutex
);
352 memset(&dispatch_data
,0, sizeof(struct message_overlay
));
353 error
= saRecvRetry (msgInstance
->dispatch_fd
, &dispatch_data
.header
, sizeof (mar_res_header_t
));
354 if (error
!= SA_AIS_OK
) {
357 if (dispatch_data
.header
.size
> sizeof (mar_res_header_t
)) {
358 error
= saRecvRetry (msgInstance
->dispatch_fd
, &dispatch_data
.data
,
359 dispatch_data
.header
.size
- sizeof (mar_res_header_t
));
360 if (error
!= SA_AIS_OK
) {
366 * Make copy of callbacks, message data, unlock instance,
367 * and call callback. A risk of this dispatch method is that
368 * the callback routines may operate at the same time that
369 * MsgFinalize has been called in another thread.
371 memcpy(&callbacks
,&msgInstance
->callbacks
, sizeof(msgInstance
->callbacks
));
372 pthread_mutex_unlock(&msgInstance
->dispatch_mutex
);
374 * Dispatch incoming response
376 switch (dispatch_data
.header
.id
) {
378 case MESSAGE_RES_MSG_QUEUE_QUEUEOPENASYNC
:
379 if (callbacks
.saMsgQueueOpenCallback
== NULL
) {
382 res_lib_msg_queueopenasync
= (struct res_lib_msg_queueopenasync
*) &dispatch_data
;
385 * This instance get/listadd/put required so that close
386 * later has the proper list of queues
388 if (res_lib_msg_queueopenasync
->header
.error
== SA_AIS_OK
) {
389 error
= saHandleInstanceGet (&queueHandleDatabase
,
390 res_lib_msg_queueopenasync
->queueHandle
,
391 (void *)&msgQueueInstance
);
393 assert (error
== SA_AIS_OK
); /* should only be valid handles here */
395 * open succeeded without error
397 list_init (&msgQueueInstance
->list
);
398 list_init (&msgQueueInstance
->section_iteration_list_head
);
399 list_add (&msgQueueInstance
->list
,
400 &msgInstance
->queue_list
);
402 callbacks
.saMsgQueueOpenCallback(
403 res_lib_msg_queueopenasync
->invocation
,
404 res_lib_msg_queueopenasync
->queueHandle
,
405 res_lib_msg_queueopenasync
->header
.error
);
406 saHandleInstancePut (&queueHandleDatabase
,
407 res_lib_msg_queueopenasync
->queueHandle
);
410 * open failed with error
412 callbacks
.saMsgQueueOpenCallback(
413 res_lib_msg_queueopenasync
->invocation
,
415 res_lib_msg_queueopenasync
->header
.error
);
419 case MESSAGE_RES_MSG_QUEUE_QUEUESYNCHRONIZEASYNC
:
420 if (callbacks
.saMsgQueueSynchronizeCallback
== NULL
) {
424 res_lib_msg_queuesynchronizeasync
= (struct res_lib_msg_queuesynchronizeasync
*) &dispatch_data
;
426 callbacks
.saMsgQueueSynchronizeCallback(
427 res_lib_msg_queuesynchronizeasync
->invocation
,
428 res_lib_msg_queuesynchronizeasync
->header
.error
);
436 * Determine if more messages should be processed
438 switch (dispatchFlags
) {
439 case SA_DISPATCH_ONE
:
442 case SA_DISPATCH_ALL
:
444 case SA_DISPATCH_BLOCKING
:
449 pthread_mutex_unlock(&msgInstance
->dispatch_mutex
);
451 saHandleInstancePut(&msgHandleDatabase
, msgHandle
);
458 const SaMsgHandleT msgHandle
)
460 struct msgInstance
*msgInstance
;
463 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
,
464 (void *)&msgInstance
);
465 if (error
!= SA_AIS_OK
) {
469 pthread_mutex_lock (&msgInstance
->response_mutex
);
472 * Another thread has already started finalizing
474 if (msgInstance
->finalize
) {
475 pthread_mutex_unlock (&msgInstance
->response_mutex
);
476 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
477 return (SA_AIS_ERR_BAD_HANDLE
);
480 msgInstance
->finalize
= 1;
482 pthread_mutex_unlock (&msgInstance
->response_mutex
);
484 // TODO msgInstanceFinalize (msgInstance);
486 if (msgInstance
->response_fd
!= -1) {
487 shutdown (msgInstance
->response_fd
, 0);
488 close (msgInstance
->response_fd
);
491 if (msgInstance
->dispatch_fd
!= -1) {
492 shutdown (msgInstance
->dispatch_fd
, 0);
493 close (msgInstance
->dispatch_fd
);
496 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
503 SaMsgHandleT msgHandle
,
504 const SaNameT
*queueName
,
505 const SaMsgQueueCreationAttributesT
*creationAttributes
,
506 SaMsgQueueOpenFlagsT openFlags
,
508 SaMsgQueueHandleT
*queueHandle
)
511 struct msgQueueInstance
*msgQueueInstance
;
512 struct msgInstance
*msgInstance
;
513 struct req_lib_msg_queueopen req_lib_msg_queueopen
;
514 struct res_lib_msg_queueopen res_lib_msg_queueopen
;
516 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
,
517 (void *)&msgInstance
);
518 if (error
!= SA_AIS_OK
) {
522 error
= saHandleCreate (&queueHandleDatabase
,
523 sizeof (struct msgQueueInstance
), queueHandle
);
524 if (error
!= SA_AIS_OK
) {
528 error
= saHandleInstanceGet (&queueHandleDatabase
,
529 *queueHandle
, (void *)&msgQueueInstance
);
530 if (error
!= SA_AIS_OK
) {
534 msgQueueInstance
->response_fd
= msgInstance
->response_fd
;
535 msgQueueInstance
->response_mutex
= &msgInstance
->response_mutex
;
537 msgQueueInstance
->msgHandle
= msgHandle
;
538 msgQueueInstance
->queueHandle
= *queueHandle
;
539 msgQueueInstance
->openFlags
= openFlags
;
541 req_lib_msg_queueopen
.header
.size
= sizeof (struct req_lib_msg_queueopen
);
542 req_lib_msg_queueopen
.header
.id
= MESSAGE_REQ_MSG_QUEUEOPEN
;
543 memcpy (&req_lib_msg_queueopen
.queueName
, queueName
, sizeof (SaNameT
));
544 memcpy (&msgQueueInstance
->queueName
, queueName
, sizeof (SaNameT
));
545 req_lib_msg_queueopen
.creationAttributesSet
= 0;
546 if (creationAttributes
) {
547 memcpy (&req_lib_msg_queueopen
.creationAttributes
,
549 sizeof (SaMsgQueueCreationAttributesT
));
550 req_lib_msg_queueopen
.creationAttributesSet
= 1;
552 req_lib_msg_queueopen
.openFlags
= openFlags
;
553 req_lib_msg_queueopen
.timeout
= timeout
;
555 pthread_mutex_lock (msgQueueInstance
->response_mutex
);
557 error
= saSendReceiveReply (msgQueueInstance
->response_fd
,
558 &req_lib_msg_queueopen
,
559 sizeof (struct req_lib_msg_queueopen
),
560 &res_lib_msg_queueopen
,
561 sizeof (struct res_lib_msg_queueopen
));
563 pthread_mutex_unlock (msgQueueInstance
->response_mutex
);
565 if (res_lib_msg_queueopen
.header
.error
!= SA_AIS_OK
) {
566 error
= res_lib_msg_queueopen
.header
.error
;
567 goto error_put_destroy
;
570 saHandleInstancePut (&queueHandleDatabase
, *queueHandle
);
572 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
577 saHandleInstancePut (&queueHandleDatabase
, *queueHandle
);
579 saHandleDestroy (&queueHandleDatabase
, *queueHandle
);
581 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
587 saMsgQueueOpenAsync (
588 SaMsgHandleT msgHandle
,
589 SaInvocationT invocation
,
590 const SaNameT
*queueName
,
591 const SaMsgQueueCreationAttributesT
*creationAttributes
,
592 SaMsgQueueOpenFlagsT openFlags
)
594 struct msgQueueInstance
*msgQueueInstance
;
595 struct msgInstance
*msgInstance
;
596 SaMsgQueueHandleT queueHandle
;
598 struct req_lib_msg_queueopen req_lib_msg_queueopen
;
599 struct res_lib_msg_queueopenasync res_lib_msg_queueopenasync
;
601 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
,
602 (void *)&msgInstance
);
603 if (error
!= SA_AIS_OK
) {
607 if (msgInstance
->callbacks
.saMsgQueueOpenCallback
== NULL
) {
608 error
= SA_AIS_ERR_INIT
;
612 error
= saHandleCreate (&queueHandleDatabase
,
613 sizeof (struct msgQueueInstance
), &queueHandle
);
614 if (error
!= SA_AIS_OK
) {
618 error
= saHandleInstanceGet (&queueHandleDatabase
, queueHandle
,
619 (void *)&msgQueueInstance
);
620 if (error
!= SA_AIS_OK
) {
624 msgQueueInstance
->response_fd
= msgInstance
->response_fd
;
625 msgQueueInstance
->response_mutex
= &msgInstance
->response_mutex
;
627 msgQueueInstance
->msgHandle
= msgHandle
;
628 msgQueueInstance
->queueHandle
= queueHandle
;
629 msgQueueInstance
->openFlags
= openFlags
;
630 req_lib_msg_queueopen
.header
.size
= sizeof (struct req_lib_msg_queueopen
);
631 req_lib_msg_queueopen
.header
.id
= MESSAGE_REQ_MSG_QUEUEOPEN
;
632 req_lib_msg_queueopen
.invocation
= invocation
;
633 req_lib_msg_queueopen
.creationAttributesSet
= 0;
634 if (creationAttributes
) {
635 memcpy (&req_lib_msg_queueopen
.creationAttributes
,
637 sizeof (SaMsgQueueCreationAttributesT
));
638 req_lib_msg_queueopen
.creationAttributesSet
= 1;
641 req_lib_msg_queueopen
.openFlags
= openFlags
;
642 req_lib_msg_queueopen
.queueHandle
= queueHandle
;
644 pthread_mutex_unlock (msgQueueInstance
->response_mutex
);
646 error
= saSendReceiveReply (msgQueueInstance
->response_fd
,
647 &req_lib_msg_queueopen
,
648 sizeof (struct req_lib_msg_queueopen
),
649 &res_lib_msg_queueopenasync
,
650 sizeof (struct res_lib_msg_queueopenasync
));
652 pthread_mutex_unlock (msgQueueInstance
->response_mutex
);
654 if (res_lib_msg_queueopenasync
.header
.error
!= SA_AIS_OK
) {
655 error
= res_lib_msg_queueopenasync
.header
.error
;
656 goto error_put_destroy
;
659 saHandleInstancePut (&queueHandleDatabase
, queueHandle
);
661 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
666 saHandleInstancePut (&queueHandleDatabase
, queueHandle
);
668 saHandleDestroy (&queueHandleDatabase
, queueHandle
);
670 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
677 SaMsgQueueHandleT queueHandle
)
679 struct req_lib_msg_queueclose req_lib_msg_queueclose
;
680 struct res_lib_msg_queueclose res_lib_msg_queueclose
;
682 struct msgQueueInstance
*msgQueueInstance
;
684 error
= saHandleInstanceGet (&queueHandleDatabase
, queueHandle
,
685 (void *)&msgQueueInstance
);
686 if (error
!= SA_AIS_OK
) {
690 req_lib_msg_queueclose
.header
.size
= sizeof (struct req_lib_msg_queueclose
);
691 req_lib_msg_queueclose
.header
.id
= MESSAGE_REQ_MSG_QUEUECLOSE
;
692 memcpy (&req_lib_msg_queueclose
.queueName
,
693 &msgQueueInstance
->queueName
, sizeof (SaNameT
));
696 pthread_mutex_lock (msgQueueInstance
->response_mutex
);
698 error
= saSendReceiveReply (msgQueueInstance
->response_fd
,
699 &req_lib_msg_queueclose
,
700 sizeof (struct req_lib_msg_queueclose
),
701 &res_lib_msg_queueclose
,
702 sizeof (struct res_lib_msg_queueclose
));
704 pthread_mutex_unlock (msgQueueInstance
->response_mutex
);
706 if (error
== SA_AIS_OK
) {
707 error
= res_lib_msg_queueclose
.header
.error
;
710 if (error
== SA_AIS_OK
) {
711 // TODO msgQueueInstanceFinalize (msgQueueInstance);
714 saHandleInstancePut (&queueHandleDatabase
, queueHandle
);
719 saMsgQueueStatusGet (
720 SaMsgHandleT msgHandle
,
721 const SaNameT
*queueName
,
722 SaMsgQueueStatusT
*queueStatus
)
724 struct msgInstance
*msgInstance
;
725 struct req_lib_msg_queuestatusget req_lib_msg_queuestatusget
;
726 struct res_lib_msg_queuestatusget res_lib_msg_queuestatusget
;
729 if (queueName
== NULL
) {
730 return (SA_AIS_ERR_INVALID_PARAM
);
732 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
733 if (error
!= SA_AIS_OK
) {
737 req_lib_msg_queuestatusget
.header
.size
= sizeof (struct req_lib_msg_queuestatusget
);
738 req_lib_msg_queuestatusget
.header
.id
= MESSAGE_REQ_MSG_QUEUESTATUSGET
;
739 memcpy (&req_lib_msg_queuestatusget
.queueName
, queueName
, sizeof (SaNameT
));
741 pthread_mutex_lock (&msgInstance
->response_mutex
);
743 error
= saSendReceiveReply (msgInstance
->response_fd
,
744 &req_lib_msg_queuestatusget
,
745 sizeof (struct req_lib_msg_queuestatusget
),
746 &res_lib_msg_queuestatusget
,
747 sizeof (struct res_lib_msg_queuestatusget
));
749 pthread_mutex_unlock (&msgInstance
->response_mutex
);
751 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
753 if (error
== SA_AIS_OK
)
754 error
= res_lib_msg_queuestatusget
.header
.error
;
755 if (error
== SA_AIS_OK
) {
756 memcpy (queueStatus
, &res_lib_msg_queuestatusget
.queueStatus
,
757 sizeof (SaMsgQueueStatusT
));
765 SaMsgHandleT msgHandle
,
766 const SaNameT
*queueName
)
769 struct msgInstance
*msgInstance
;
770 struct req_lib_msg_queueunlink req_lib_msg_queueunlink
;
771 struct res_lib_msg_queueunlink res_lib_msg_queueunlink
;
773 if (queueName
== NULL
) {
774 return (SA_AIS_ERR_INVALID_PARAM
);
776 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
777 if (error
!= SA_AIS_OK
) {
781 req_lib_msg_queueunlink
.header
.size
= sizeof (struct req_lib_msg_queueunlink
);
782 req_lib_msg_queueunlink
.header
.id
= MESSAGE_REQ_MSG_QUEUEUNLINK
;
783 memcpy (&req_lib_msg_queueunlink
.queueName
, queueName
, sizeof (SaNameT
));
785 pthread_mutex_lock (&msgInstance
->response_mutex
);
787 error
= saSendReceiveReply (msgInstance
->response_fd
,
788 &req_lib_msg_queueunlink
,
789 sizeof (struct req_lib_msg_queueunlink
),
790 &res_lib_msg_queueunlink
,
791 sizeof (struct res_lib_msg_queueunlink
));
793 pthread_mutex_unlock (&msgInstance
->response_mutex
);
795 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
797 return (error
== SA_AIS_OK
? res_lib_msg_queueunlink
.header
.error
: error
);
801 saMsgQueueGroupCreate (
802 SaMsgHandleT msgHandle
,
803 const SaNameT
*queueGroupName
,
804 SaMsgQueueGroupPolicyT queueGroupPolicy
)
807 struct msgInstance
*msgInstance
;
808 struct req_lib_msg_queuegroupcreate req_lib_msg_queuegroupcreate
;
809 struct res_lib_msg_queuegroupcreate res_lib_msg_queuegroupcreate
;
811 if (queueGroupName
== NULL
) {
812 return (SA_AIS_ERR_INVALID_PARAM
);
814 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
815 if (error
!= SA_AIS_OK
) {
819 req_lib_msg_queuegroupcreate
.header
.size
= sizeof (struct req_lib_msg_queuegroupcreate
);
820 req_lib_msg_queuegroupcreate
.header
.id
= MESSAGE_REQ_MSG_QUEUEGROUPCREATE
;
821 memcpy (&req_lib_msg_queuegroupcreate
.queueGroupName
, queueGroupName
,
823 req_lib_msg_queuegroupcreate
.queueGroupPolicy
= queueGroupPolicy
;
825 pthread_mutex_lock (&msgInstance
->response_mutex
);
827 error
= saSendReceiveReply (msgInstance
->response_fd
,
828 &req_lib_msg_queuegroupcreate
,
829 sizeof (struct req_lib_msg_queuegroupcreate
),
830 &res_lib_msg_queuegroupcreate
,
831 sizeof (struct res_lib_msg_queuegroupcreate
));
833 pthread_mutex_unlock (&msgInstance
->response_mutex
);
835 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
837 return (error
== SA_AIS_OK
? res_lib_msg_queuegroupcreate
.header
.error
: error
);
841 saMsgQueueGroupInsert (
842 SaMsgHandleT msgHandle
,
843 const SaNameT
*queueGroupName
,
844 const SaNameT
*queueName
)
847 struct msgInstance
*msgInstance
;
848 struct req_lib_msg_queuegroupinsert req_lib_msg_queuegroupinsert
;
849 struct res_lib_msg_queuegroupinsert res_lib_msg_queuegroupinsert
;
851 if (queueName
== NULL
) {
852 return (SA_AIS_ERR_INVALID_PARAM
);
854 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
855 if (error
!= SA_AIS_OK
) {
859 req_lib_msg_queuegroupinsert
.header
.size
= sizeof (struct req_lib_msg_queuegroupinsert
);
860 req_lib_msg_queuegroupinsert
.header
.id
= MESSAGE_REQ_MSG_QUEUEGROUPINSERT
;
861 memcpy (&req_lib_msg_queuegroupinsert
.queueName
, queueName
, sizeof (SaNameT
));
862 memcpy (&req_lib_msg_queuegroupinsert
.queueGroupName
, queueGroupName
,
865 pthread_mutex_lock (&msgInstance
->response_mutex
);
867 error
= saSendReceiveReply (msgInstance
->response_fd
,
868 &req_lib_msg_queuegroupinsert
,
869 sizeof (struct req_lib_msg_queuegroupinsert
),
870 &res_lib_msg_queuegroupinsert
,
871 sizeof (struct res_lib_msg_queuegroupinsert
));
873 pthread_mutex_unlock (&msgInstance
->response_mutex
);
875 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
877 return (error
== SA_AIS_OK
? res_lib_msg_queuegroupinsert
.header
.error
: error
);
881 saMsgQueueGroupRemove (
882 SaMsgHandleT msgHandle
,
883 const SaNameT
*queueGroupName
,
884 const SaNameT
*queueName
)
887 struct msgInstance
*msgInstance
;
888 struct req_lib_msg_queuegroupremove req_lib_msg_queuegroupremove
;
889 struct res_lib_msg_queuegroupremove res_lib_msg_queuegroupremove
;
891 if (queueName
== NULL
) {
892 return (SA_AIS_ERR_INVALID_PARAM
);
894 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
895 if (error
!= SA_AIS_OK
) {
899 req_lib_msg_queuegroupremove
.header
.size
= sizeof (struct req_lib_msg_queuegroupremove
);
900 req_lib_msg_queuegroupremove
.header
.id
= MESSAGE_REQ_MSG_QUEUEGROUPREMOVE
;
901 memcpy (&req_lib_msg_queuegroupremove
.queueName
, queueName
, sizeof (SaNameT
));
902 memcpy (&req_lib_msg_queuegroupremove
.queueGroupName
, queueGroupName
,
905 pthread_mutex_lock (&msgInstance
->response_mutex
);
907 error
= saSendReceiveReply (msgInstance
->response_fd
,
908 &req_lib_msg_queuegroupremove
,
909 sizeof (struct req_lib_msg_queuegroupremove
),
910 &res_lib_msg_queuegroupremove
,
911 sizeof (struct res_lib_msg_queuegroupremove
));
913 pthread_mutex_unlock (&msgInstance
->response_mutex
);
915 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
917 return (error
== SA_AIS_OK
? res_lib_msg_queuegroupremove
.header
.error
: error
);
921 saMsgQueueGroupDelete (
922 SaMsgHandleT msgHandle
,
923 const SaNameT
*queueGroupName
)
926 struct msgInstance
*msgInstance
;
927 struct req_lib_msg_queuegroupdelete req_lib_msg_queuegroupdelete
;
928 struct res_lib_msg_queuegroupdelete res_lib_msg_queuegroupdelete
;
930 if (queueGroupName
== NULL
) {
931 return (SA_AIS_ERR_INVALID_PARAM
);
933 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
934 if (error
!= SA_AIS_OK
) {
938 req_lib_msg_queuegroupdelete
.header
.size
= sizeof (struct req_lib_msg_queuegroupdelete
);
939 req_lib_msg_queuegroupdelete
.header
.id
= MESSAGE_REQ_MSG_QUEUEGROUPDELETE
;
940 memcpy (&req_lib_msg_queuegroupdelete
.queueGroupName
, queueGroupName
,
943 pthread_mutex_lock (&msgInstance
->response_mutex
);
945 error
= saSendReceiveReply (msgInstance
->response_fd
,
946 &req_lib_msg_queuegroupdelete
,
947 sizeof (struct req_lib_msg_queuegroupdelete
),
948 &res_lib_msg_queuegroupdelete
,
949 sizeof (struct res_lib_msg_queuegroupdelete
));
951 pthread_mutex_unlock (&msgInstance
->response_mutex
);
953 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
955 return (error
== SA_AIS_OK
? res_lib_msg_queuegroupdelete
.header
.error
: error
);
959 saMsgQueueGroupTrack (
960 SaMsgHandleT msgHandle
,
961 const SaNameT
*queueGroupName
,
963 SaMsgQueueGroupNotificationBufferT
*notificationBuffer
)
966 struct msgInstance
*msgInstance
;
967 struct req_lib_msg_queuegrouptrack req_lib_msg_queuegrouptrack
;
968 struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack
;
970 if (queueGroupName
== NULL
) {
971 return (SA_AIS_ERR_INVALID_PARAM
);
973 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
974 if (error
!= SA_AIS_OK
) {
978 req_lib_msg_queuegrouptrack
.header
.size
= sizeof (struct req_lib_msg_queuegrouptrack
);
979 req_lib_msg_queuegrouptrack
.header
.id
= MESSAGE_REQ_MSG_QUEUEGROUPTRACK
;
980 req_lib_msg_queuegrouptrack
.trackFlags
= trackFlags
;
981 memcpy (&req_lib_msg_queuegrouptrack
.queueGroupName
, queueGroupName
,
984 pthread_mutex_lock (&msgInstance
->response_mutex
);
986 error
= saSendReceiveReply (msgInstance
->response_fd
,
987 &req_lib_msg_queuegrouptrack
,
988 sizeof (struct req_lib_msg_queuegrouptrack
),
989 &res_lib_msg_queuegrouptrack
,
990 sizeof (struct res_lib_msg_queuegrouptrack
));
992 pthread_mutex_unlock (&msgInstance
->response_mutex
);
994 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
996 return (error
== SA_AIS_OK
? res_lib_msg_queuegrouptrack
.header
.error
: error
);
1000 saMsgQueueGroupTrackStop (
1001 SaMsgHandleT msgHandle
,
1002 const SaNameT
*queueGroupName
)
1005 struct msgInstance
*msgInstance
;
1006 struct req_lib_msg_queuegrouptrackstop req_lib_msg_queuegrouptrackstop
;
1007 struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop
;
1009 if (queueGroupName
== NULL
) {
1010 return (SA_AIS_ERR_INVALID_PARAM
);
1012 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
1013 if (error
!= SA_AIS_OK
) {
1017 req_lib_msg_queuegrouptrackstop
.header
.size
= sizeof (struct req_lib_msg_queuegrouptrackstop
);
1018 req_lib_msg_queuegrouptrackstop
.header
.id
= MESSAGE_REQ_MSG_QUEUEGROUPTRACKSTOP
;
1019 memcpy (&req_lib_msg_queuegrouptrackstop
.queueGroupName
, queueGroupName
,
1022 pthread_mutex_lock (&msgInstance
->response_mutex
);
1024 error
= saSendReceiveReply (msgInstance
->response_fd
,
1025 &req_lib_msg_queuegrouptrackstop
,
1026 sizeof (struct req_lib_msg_queuegrouptrackstop
),
1027 &res_lib_msg_queuegrouptrackstop
,
1028 sizeof (struct res_lib_msg_queuegrouptrackstop
));
1030 pthread_mutex_unlock (&msgInstance
->response_mutex
);
1032 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
1034 return (error
== SA_AIS_OK
? res_lib_msg_queuegrouptrackstop
.header
.error
: error
);
1039 SaMsgHandleT msgHandle
,
1040 const SaNameT
*destination
,
1041 const SaMsgMessageT
*message
,
1045 struct msgInstance
*msgInstance
;
1046 struct req_lib_msg_messagesend req_lib_msg_messagesend
;
1047 struct res_lib_msg_messagesend res_lib_msg_messagesend
;
1049 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
1050 if (error
!= SA_AIS_OK
) {
1054 req_lib_msg_messagesend
.header
.size
= sizeof (struct req_lib_msg_messagesend
);
1055 req_lib_msg_messagesend
.header
.id
= MESSAGE_REQ_MSG_MESSAGESEND
;
1056 memcpy (&req_lib_msg_messagesend
.destination
, destination
, sizeof (SaNameT
));
1058 pthread_mutex_lock (&msgInstance
->response_mutex
);
1060 error
= saSendReceiveReply (msgInstance
->response_fd
,
1061 &req_lib_msg_messagesend
,
1062 sizeof (struct req_lib_msg_messagesend
),
1063 &res_lib_msg_messagesend
,
1064 sizeof (struct res_lib_msg_messagesend
));
1066 pthread_mutex_unlock (&msgInstance
->response_mutex
);
1068 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
1070 return (error
== SA_AIS_OK
? res_lib_msg_messagesend
.header
.error
: error
);
1074 saMsgMessageSendAsync (
1075 SaMsgHandleT msgHandle
,
1076 SaInvocationT invocation
,
1077 const SaNameT
*destination
,
1078 const SaMsgMessageT
*message
,
1079 SaMsgAckFlagsT ackFlags
)
1082 struct msgInstance
*msgInstance
;
1083 struct req_lib_msg_messagesend req_lib_msg_messagesend
;
1084 struct res_lib_msg_messagesendasync res_lib_msg_messagesendasync
;
1086 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
1087 if (error
!= SA_AIS_OK
) {
1091 req_lib_msg_messagesend
.header
.size
= sizeof (struct req_lib_msg_messagesend
);
1092 req_lib_msg_messagesend
.header
.id
= MESSAGE_REQ_MSG_MESSAGESEND
;
1093 memcpy (&req_lib_msg_messagesend
.destination
, destination
, sizeof (SaNameT
));
1095 pthread_mutex_lock (&msgInstance
->response_mutex
);
1097 error
= saSendReceiveReply (msgInstance
->response_fd
,
1098 &req_lib_msg_messagesend
,
1099 sizeof (struct req_lib_msg_messagesend
),
1100 &res_lib_msg_messagesendasync
,
1101 sizeof (struct res_lib_msg_messagesendasync
));
1103 pthread_mutex_unlock (&msgInstance
->response_mutex
);
1105 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
1107 return (error
== SA_AIS_OK
? res_lib_msg_messagesendasync
.header
.error
: error
);
1112 SaMsgQueueHandleT queueHandle
,
1113 SaMsgMessageT
*message
,
1115 SaMsgSenderIdT
*senderId
,
1119 struct msgQueueInstance
*msgQueueInstance
;
1120 struct req_lib_msg_messageget req_lib_msg_messageget
;
1121 struct res_lib_msg_messageget res_lib_msg_messageget
;
1123 error
= saHandleInstanceGet (&queueHandleDatabase
, queueHandle
, (void *)&msgQueueInstance
);
1124 if (error
!= SA_AIS_OK
) {
1128 req_lib_msg_messageget
.header
.size
= sizeof (struct req_lib_msg_messageget
);
1129 req_lib_msg_messageget
.header
.id
= MESSAGE_REQ_MSG_MESSAGEGET
;
1130 req_lib_msg_messageget
.timeout
= timeout
;
1132 pthread_mutex_lock (msgQueueInstance
->response_mutex
);
1134 error
= saSendReceiveReply (msgQueueInstance
->response_fd
,
1135 &req_lib_msg_messageget
,
1136 sizeof (struct req_lib_msg_messageget
),
1137 &res_lib_msg_messageget
,
1138 sizeof (struct res_lib_msg_messageget
));
1140 pthread_mutex_unlock (msgQueueInstance
->response_mutex
);
1142 saHandleInstancePut (&queueHandleDatabase
, queueHandle
);
1144 if (error
== SA_AIS_OK
)
1145 error
= res_lib_msg_messageget
.header
.error
;
1146 if (error
== SA_AIS_OK
) {
1147 *sendTime
= res_lib_msg_messageget
.sendTime
;
1148 memcpy (senderId
, &res_lib_msg_messageget
.senderId
,
1149 sizeof (SaMsgSenderIdT
));
1155 saMsgMessageCancel (
1156 SaMsgQueueHandleT queueHandle
)
1159 struct msgQueueInstance
*msgQueueInstance
;
1160 struct req_lib_msg_messagecancel req_lib_msg_messagecancel
;
1161 struct res_lib_msg_messagecancel res_lib_msg_messagecancel
;
1163 error
= saHandleInstanceGet (&msgHandleDatabase
, queueHandle
, (void *)&msgQueueInstance
);
1164 if (error
!= SA_AIS_OK
) {
1168 req_lib_msg_messagecancel
.header
.size
= sizeof (struct req_lib_msg_messagecancel
);
1169 req_lib_msg_messagecancel
.header
.id
= MESSAGE_REQ_MSG_MESSAGECANCEL
;
1171 pthread_mutex_lock (msgQueueInstance
->response_mutex
);
1173 error
= saSendReceiveReply (msgQueueInstance
->response_fd
,
1174 &req_lib_msg_messagecancel
,
1175 sizeof (struct req_lib_msg_messagecancel
),
1176 &res_lib_msg_messagecancel
,
1177 sizeof (struct res_lib_msg_messagecancel
));
1179 pthread_mutex_unlock (msgQueueInstance
->response_mutex
);
1181 saHandleInstancePut (&queueHandleDatabase
, queueHandle
);
1183 return (error
== SA_AIS_OK
? res_lib_msg_messagecancel
.header
.error
: error
);
1187 saMsgMessageSendReceive (
1188 SaMsgHandleT msgHandle
,
1189 const SaNameT
*destination
,
1190 const SaMsgMessageT
*sendMessage
,
1191 SaMsgMessageT
*receiveMessage
,
1192 SaTimeT
*replySendTime
,
1196 struct msgInstance
*msgInstance
;
1197 struct req_lib_msg_messagesendreceive req_lib_msg_messagesendreceive
;
1198 struct res_lib_msg_messagesendreceive res_lib_msg_messagesendreceive
;
1200 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
1201 if (error
!= SA_AIS_OK
) {
1205 req_lib_msg_messagesendreceive
.header
.size
= sizeof (struct req_lib_msg_messagesendreceive
);
1206 req_lib_msg_messagesendreceive
.header
.id
= MESSAGE_REQ_MSG_MESSAGEREPLY
;
1207 memcpy (&req_lib_msg_messagesendreceive
.destination
, destination
,
1209 req_lib_msg_messagesendreceive
.timeout
= timeout
;
1211 pthread_mutex_lock (&msgInstance
->response_mutex
);
1213 error
= saSendReceiveReply (msgInstance
->response_fd
,
1214 &req_lib_msg_messagesendreceive
,
1215 sizeof (struct req_lib_msg_messagesendreceive
),
1216 &res_lib_msg_messagesendreceive
,
1217 sizeof (struct res_lib_msg_messagesendreceive
));
1219 pthread_mutex_unlock (&msgInstance
->response_mutex
);
1221 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
1223 if (error
== SA_AIS_OK
)
1224 error
= res_lib_msg_messagesendreceive
.header
.error
;
1225 if (error
== SA_AIS_OK
) {
1226 *replySendTime
= res_lib_msg_messagesendreceive
.replySendTime
;
1234 SaMsgHandleT msgHandle
,
1235 const SaMsgMessageT
*replyMessage
,
1236 const SaMsgSenderIdT
*senderId
,
1240 struct msgInstance
*msgInstance
;
1241 struct req_lib_msg_messagereply req_lib_msg_messagereply
;
1242 struct res_lib_msg_messagereply res_lib_msg_messagereply
;
1244 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
1245 if (error
!= SA_AIS_OK
) {
1249 req_lib_msg_messagereply
.header
.size
= sizeof (struct req_lib_msg_messagereply
);
1250 req_lib_msg_messagereply
.header
.id
= MESSAGE_REQ_MSG_MESSAGEREPLY
;
1251 memcpy (&req_lib_msg_messagereply
.senderId
, senderId
, sizeof (SaMsgSenderIdT
));
1253 pthread_mutex_lock (&msgInstance
->response_mutex
);
1255 error
= saSendReceiveReply (msgInstance
->response_fd
,
1256 &req_lib_msg_messagereply
,
1257 sizeof (struct req_lib_msg_messagereply
),
1258 &res_lib_msg_messagereply
,
1259 sizeof (struct res_lib_msg_messagereply
));
1261 pthread_mutex_unlock (&msgInstance
->response_mutex
);
1263 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
1265 return (error
== SA_AIS_OK
? res_lib_msg_messagereply
.header
.error
: error
);
1268 SaAisErrorT
saMsgMessageReplyAsync (
1269 SaMsgHandleT msgHandle
,
1270 SaInvocationT invocation
,
1271 const SaMsgMessageT
*replyMessage
,
1272 const SaMsgSenderIdT
*senderId
,
1273 SaMsgAckFlagsT ackFlags
)
1276 struct msgInstance
*msgInstance
;
1277 struct req_lib_msg_messagereply req_lib_msg_messagereply
;
1278 struct res_lib_msg_messagereplyasync res_lib_msg_messagereplyasync
;
1280 error
= saHandleInstanceGet (&msgHandleDatabase
, msgHandle
, (void *)&msgInstance
);
1281 if (error
!= SA_AIS_OK
) {
1285 req_lib_msg_messagereply
.header
.size
= sizeof (struct req_lib_msg_messagereply
);
1286 req_lib_msg_messagereply
.header
.id
= MESSAGE_REQ_MSG_MESSAGEREPLY
;
1287 memcpy (&req_lib_msg_messagereply
.senderId
, senderId
, sizeof (SaMsgSenderIdT
));
1289 pthread_mutex_lock (&msgInstance
->response_mutex
);
1291 error
= saSendReceiveReply (msgInstance
->response_fd
,
1292 &req_lib_msg_messagereply
,
1293 sizeof (struct req_lib_msg_messagereply
),
1294 &res_lib_msg_messagereplyasync
,
1295 sizeof (struct res_lib_msg_messagereplyasync
));
1297 pthread_mutex_unlock (&msgInstance
->response_mutex
);
1299 saHandleInstancePut (&msgHandleDatabase
, msgHandle
);
1301 return (error
== SA_AIS_OK
? res_lib_msg_messagereplyasync
.header
.error
: error
);