Add writeback call to objdb
[openais.git] / test / subscription.c
blobaf0dbd120ddb99854084aca9299e782a43d3266d
1 /*
2 * Test program for event service subscriptions
3 */
5 #include <stdio.h>
6 #include <string.h>
7 #include <unistd.h>
8 #include <time.h>
9 #include <fcntl.h>
10 #include <sys/poll.h>
11 #ifndef OPENAIS_SOLARIS
12 #include <stdint.h>
13 #include <getopt.h>
14 #else
15 #include <sys/types.h>
16 #endif
17 #include <stdlib.h>
18 #include <sched.h>
19 #include "saAis.h"
20 #include "saEvt.h"
22 #ifdef OPENAIS_SOLARIS
23 char * strsep(char** str, const char* delims)
25 char* token;
27 if (*str == NULL) {
28 /* No more tokens */
29 return NULL;
32 token = *str;
33 while (**str != '\0') {
34 if (strchr(delims,**str)!=NULL) {
35 **str = '\0';
36 (*str)++;
37 return token;
39 (*str)++;
41 /* There is no other token */
42 *str = NULL;
43 return token;
45 #endif
47 #define TEST_EVENT_ORDER 1
48 #define EVT_FREQ 1000
49 #define TRY_WAIT 2
50 uint32_t evt_count = 0;
52 extern int get_sa_error(SaAisErrorT, char *, int);
53 char result_buf[256];
54 int result_buf_len = sizeof(result_buf);
56 int quiet = 0;
58 SaVersionT version = { 'B', 0x01, 0x01 };
60 void event_callback( SaEvtSubscriptionIdT subscriptionId,
61 const SaEvtEventHandleT eventHandle,
62 const SaSizeT eventDataSize);
64 SaEvtCallbacksT callbacks = {
66 event_callback
69 char channel[256] = "EVENT_TEST_CHANNEL";
71 #define MAX_NODES 256
72 SaEvtEventIdT last_event_id[MAX_NODES] = {0,};
74 #define MAX_SUB 100
76 uint32_t subscription_id[MAX_SUB] = {0xfedcba98};
78 int sub_next = 0;
80 char pubname[256] = "Test Pub Name";
82 #define patt1 "Filter pattern 1"
83 #define patt1_size sizeof(patt1)
85 SaEvtEventFilterT filters[MAX_SUB] = {
86 {SA_EVT_PASS_ALL_FILTER, {0, 0}}
89 SaEvtEventFilterArrayT subscribe_filters[MAX_SUB] = {
91 1, &filters[0]
96 #define PAT_SIZE 100
97 SaUint8T pat0[PAT_SIZE];
98 SaUint8T pat1[PAT_SIZE];
99 SaUint8T pat2[PAT_SIZE];
100 SaUint8T pat3[PAT_SIZE];
101 SaUint8T pat4[PAT_SIZE];
102 SaEvtEventPatternT evt_patts[5] = {
103 {PAT_SIZE, PAT_SIZE, pat0},
104 {PAT_SIZE, PAT_SIZE, pat1},
105 {PAT_SIZE, PAT_SIZE, pat2},
106 {PAT_SIZE, PAT_SIZE, pat3},
107 {PAT_SIZE, PAT_SIZE, pat4}};
108 SaEvtEventPatternArrayT evt_pat_get_array = { 5, 0, evt_patts };
110 SaNameT test_pub_name = {13, "Test Pub Name"};
113 char user_data_file[256];
114 char user_data[65536];
115 char event_data[65536];
116 int user_data_size = 0;
119 test_subscription()
121 SaEvtHandleT handle;
122 SaEvtChannelHandleT channel_handle;
123 SaEvtChannelOpenFlagsT flags;
124 SaNameT channel_name;
126 struct pollfd pfd;
127 int nfd;
128 SaSelectionObjectT fd;
129 int timeout = 60000;
130 int i;
134 SaAisErrorT result;
136 flags = SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE;
137 strcpy((char *)channel_name.value, channel);
138 channel_name.length = strlen(channel);
140 printf("Test subscription:\n");
142 do {
143 result = saEvtInitialize (&handle, &callbacks, &version);
144 } while ((result == SA_AIS_ERR_TRY_AGAIN) && !sleep(TRY_WAIT));
145 if (result != SA_AIS_OK) {
146 get_sa_error(result, result_buf, result_buf_len);
147 printf("Event Initialize result: %s\n", result_buf);
148 return result;
150 do {
151 result = saEvtChannelOpen(handle, &channel_name, flags,
152 SA_TIME_MAX, &channel_handle);
153 } while ((result == SA_AIS_ERR_TRY_AGAIN) && !sleep(TRY_WAIT));
154 if (result != SA_AIS_OK) {
155 get_sa_error(result, result_buf, result_buf_len);
156 printf("channel open result: %s\n", result_buf);
157 goto init_fin;
160 if (sub_next == 0)
161 sub_next = 1;
163 for (i = 0; i < sub_next; i++) {
164 do {
165 result = saEvtEventSubscribe(channel_handle,
166 &subscribe_filters[i],
167 subscription_id[i]);
168 } while ((result == SA_AIS_ERR_TRY_AGAIN) && !sleep(TRY_WAIT));
170 if (result != SA_AIS_OK) {
171 get_sa_error(result, result_buf, result_buf_len);
172 printf("event subscribe result: %s\n", result_buf);
173 goto chan_fin;
178 * See if we got the event
180 do {
181 result = saEvtSelectionObjectGet(handle, &fd);
182 } while ((result == SA_AIS_ERR_TRY_AGAIN) && !sleep(TRY_WAIT));
183 if (result != SA_AIS_OK) {
184 get_sa_error(result, result_buf, result_buf_len);
185 printf("saEvtSelectionObject get %s\n", result_buf);
186 goto sub_fin;
189 while (1) {
190 pfd.fd = fd;
191 pfd.events = POLLIN;
192 nfd = poll(&pfd, 1, timeout);
193 if (nfd < 0) {
194 printf("poll fds %d\n", nfd);
195 perror("poll error");
196 goto sub_fin;
197 } else if (nfd == 0) {
198 printf("Still waiting\n");
199 continue;
202 if (pfd.revents & (POLLERR|POLLHUP)) {
203 printf("Error received on poll fd %llu\n",
204 (unsigned long long)fd);
205 result = SA_AIS_ERR_BAD_OPERATION;
206 goto sub_fin;
208 do {
209 result = saEvtDispatch(handle, SA_DISPATCH_ONE);
210 } while ((result == SA_AIS_ERR_TRY_AGAIN) && !sleep(TRY_WAIT));
211 if (result != SA_AIS_OK) {
212 get_sa_error(result, result_buf, result_buf_len);
213 printf("saEvtDispatch %s\n", result_buf);
214 goto sub_fin;
216 if (!quiet)
217 printf(" - - - - - - - - - - - - - - - -\n\n");
220 sub_fin:
221 #if 0
222 result = saEvtEventUnsubscribe(channel_handle, subscription_id);
223 if (result != SA_AIS_OK)
224 printf("Channel unsubscribe result: %d\n", result);
225 #endif
226 chan_fin:
227 result = saEvtChannelClose(channel_handle);
228 if (result != SA_AIS_OK)
229 get_sa_error(result, result_buf, result_buf_len);
230 printf("Channel close result: %s\n", result_buf);
231 init_fin:
232 result = saEvtFinalize(handle);
233 if (result != SA_AIS_OK) {
234 get_sa_error(result, result_buf, result_buf_len);
235 printf("Finalize result: %s\n", result_buf);
238 return result;
241 static char time_buf[1024];
243 char *ais_time_str(SaTimeT time)
245 time_t t;
246 if (time == SA_TIME_UNKNOWN) {
247 return "Unknown Time";
249 t = time / 1000000000ULL;
250 strcpy(time_buf, ctime(&t));
251 return time_buf;
254 #define dprintf(format, ...) \
256 if (did_dot) { \
257 printf("\n"); \
259 printf(format, ## __VA_ARGS__); \
260 did_dot = 0; \
263 void
264 event_callback( SaEvtSubscriptionIdT subscription_id,
265 const SaEvtEventHandleT event_handle,
266 const SaSizeT event_data_size)
268 static int did_dot = 0;
269 SaAisErrorT result;
270 SaUint8T priority;
271 SaTimeT retention_time;
272 SaNameT publisher_name = {0, {0}};
273 SaTimeT publish_time;
274 SaEvtEventIdT event_id;
275 SaSizeT received_size;
276 int i;
277 #ifdef TEST_EVENT_ORDER
278 int idx;
279 #endif
281 if (!quiet)
282 dprintf("event_callback called\n");
283 if (!quiet)
284 dprintf("sub ID: %x\n", subscription_id);
285 if (!quiet)
286 dprintf("event_handle %llx\n", (unsigned long long)event_handle);
287 if (!quiet)
288 dprintf("event data size %llu\n", (unsigned long long)event_data_size);
290 evt_pat_get_array.patterns[0].patternSize = PAT_SIZE;
291 evt_pat_get_array.patterns[1].patternSize = PAT_SIZE;
292 evt_pat_get_array.patterns[2].patternSize = PAT_SIZE;
293 evt_pat_get_array.patterns[3].patternSize = PAT_SIZE;
294 evt_pat_get_array.patternsNumber = 4;
295 result = saEvtEventAttributesGet(event_handle,
296 &evt_pat_get_array, /* patterns */
297 &priority, /* priority */
298 &retention_time, /* retention time */
299 &publisher_name, /* publisher name */
300 &publish_time, /* publish time */
301 &event_id /* event_id */
303 if (result != SA_AIS_OK) {
304 get_sa_error(result, result_buf, result_buf_len);
305 dprintf("event get attr result(2): %s\n", result_buf);
306 goto evt_free;
308 if (!quiet) {
309 dprintf("pattern array count: %llu\n",
310 (unsigned long long)evt_pat_get_array.patternsNumber);
311 for (i = 0; i < evt_pat_get_array.patternsNumber; i++) {
312 dprintf( "pattern %d =\"%s\"\n", i,
313 evt_pat_get_array.patterns[i].pattern);
316 dprintf("priority: 0x%x\n", priority);
317 dprintf("retention: 0x%llx\n", (unsigned long long)retention_time);
318 dprintf("publisher name content: \"%s\"\n",
319 publisher_name.value);
322 if (event_id == SA_EVT_EVENTID_LOST) {
323 dprintf("*** Events have been dropped at %s",
324 ais_time_str(publish_time));
326 if ((evt_pat_get_array.patternsNumber == 0)||
327 (strcmp((char *)evt_pat_get_array.patterns[0].pattern, SA_EVT_LOST_EVENT) != 0)) {
328 dprintf("*** Received SA_EVT_EVENTID_LOST but pattern is wrong: %s\n",
329 evt_pat_get_array.patterns[0].pattern);
332 if (quiet < 2) {
333 dprintf("event id: 0x%016llx\n", (unsigned long long)event_id);
335 if (quiet == 2) {
336 if ((++evt_count % EVT_FREQ) == 0) {
337 fprintf(stderr, ".");
338 did_dot = 1;
342 if (event_id == SA_EVT_EVENTID_LOST) {
343 goto evt_free;
346 #ifdef TEST_EVENT_ORDER
347 for (idx = 0; idx < MAX_NODES; idx++) {
348 if (last_event_id[idx] == 0) {
349 last_event_id[idx] = event_id;
350 break;
351 } else {
352 if ((last_event_id[idx] >> 32) == (event_id >> 32)) {
353 last_event_id[idx]++;
354 if (last_event_id[idx] != event_id) {
355 dprintf("*** expected %016llx got %016llx event_id\n",
356 (unsigned long long)last_event_id[idx],
357 (unsigned long long)event_id);
358 last_event_id[idx] = event_id;
360 break;
364 if (idx == MAX_NODES) {
365 dprintf("*** Too many nodes in cluster\n");
366 exit(1);
368 #endif
370 if (event_data_size != user_data_size) {
371 dprintf("unexpected data size: e=%d, a=%llu\n",
372 user_data_size, (unsigned long long)event_data_size);
373 goto evt_free;
376 received_size = user_data_size;
377 result = saEvtEventDataGet(event_handle, event_data,
378 &received_size);
379 if (result != SA_AIS_OK) {
380 get_sa_error(result, result_buf, result_buf_len);
381 dprintf("event get data result: %s\n", result_buf);
382 goto evt_free;
384 if (received_size != event_data_size) {
385 dprintf("event data mismatch e=%llu, a=%llu\n",
386 (unsigned long long)event_data_size,
387 (unsigned long long)received_size);
388 goto evt_free;
390 if (memcmp(user_data, event_data, user_data_size) != 0 ) {
391 dprintf("event data doesn't match specified file data\n");
392 goto evt_free;
394 if (!quiet) {
395 dprintf("Received %d bytes of data OK\n",
396 user_data_size);
399 evt_free:
400 result = saEvtEventFree(event_handle);
401 if (!quiet) {
402 get_sa_error(result, result_buf, result_buf_len);
403 dprintf("event free result: %s\n", result_buf);
407 static int err_wait_time = -1;
409 #if ! defined(TS_CLASS) && (defined(OPENAIS_BSD) || defined(OPENAIS_LINUX) || defined(OPENAIS_SOLARIS))
410 static struct sched_param sched_param = {
411 sched_priority: 1
413 #endif
415 int main (int argc, char **argv)
417 static const char opts[] = "c:s:n:qu:f:";
419 int option;
420 char *p;
422 #if ! defined(TS_CLASS) && (defined(OPENAIS_BSD) || defined(OPENAIS_LINUX) || defined(OPENAIS_SOLARIS))
423 sched_setscheduler (0, SCHED_RR, &sched_param);
424 #endif
426 while (1) {
427 option = getopt(argc, argv, opts);
428 if (option == -1)
429 break;
431 switch (option) {
432 case 'u': {
433 int fd;
434 int sz;
436 strcpy(user_data_file, optarg);
437 fd = open(user_data_file, O_RDONLY);
438 if (fd < 0) {
439 printf("Can't open user data file %s\n",
440 user_data_file);
441 exit(1);
443 sz = read(fd, user_data, 65536);
444 if (sz < 0) {
445 perror("subscription\n");
446 exit(1);
448 close(fd);
449 user_data_size = sz;
450 break;
452 case 'q':
453 quiet++;
454 break;
455 case 'c':
456 strcpy(channel, optarg);
457 break;
458 case 'f':
459 err_wait_time =
460 (unsigned int)strtoul(optarg, NULL, 0);
461 break;
462 case 'n':
463 strcpy(pubname, optarg);
464 break;
465 case 's':
466 p = strsep(&optarg, ",");
467 subscription_id[sub_next] =
468 (unsigned int)strtoul(p, NULL, 0);
469 p = strsep(&optarg, ",");
470 filters[sub_next].filter.pattern = malloc(strlen(p));
471 strcpy((char *)filters[sub_next].filter.pattern, p);
472 filters[sub_next].filter.patternSize = strlen(p);
473 p = strsep(&optarg, ",");
474 filters[sub_next++].filterType = strtoul(p,0, 0);
475 break;
476 default:
477 printf("invalid arg: \"%s\"\n", optarg);
478 return 1;
481 do {
482 if (test_subscription() != SA_AIS_OK) {
483 if (err_wait_time > 0) {
484 sleep(err_wait_time);
485 } else {
486 return 1;
489 } while (err_wait_time > 0);
491 return 0;