Sync usage with man page.
[netbsd-mini2440.git] / external / gpl2 / lvm2 / dist / daemons / cmirrord / cluster.c
blobdf738371d178525042d07bb385e7a12e5de6b610
1 /* $NetBSD$ */
3 /*
4 * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
6 * This copyrighted material is made available to anyone wishing to use,
7 * modify, copy, or redistribute it subject to the terms and conditions
8 * of the GNU Lesser General Public License v.2.1.
10 * You should have received a copy of the GNU Lesser General Public License
11 * along with this program; if not, write to the Free Software Foundation,
12 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
14 #include <errno.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <unistd.h>
18 #include <stdint.h>
19 #include <stdlib.h>
20 #include <signal.h>
21 #include <sys/socket.h> /* These are for OpenAIS CPGs */
22 #include <sys/select.h>
23 #include <sys/un.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/cpg.h>
28 #include <openais/saAis.h>
29 #include <openais/saCkpt.h>
31 #include "dm-log-userspace.h"
32 #include "libdevmapper.h"
33 #include "functions.h"
34 #include "local.h"
35 #include "common.h"
36 #include "logging.h"
37 #include "link_mon.h"
38 #include "cluster.h"
40 /* Open AIS error codes */
41 #define str_ais_error(x) \
42 ((x) == SA_AIS_OK) ? "SA_AIS_OK" : \
43 ((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \
44 ((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \
45 ((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \
46 ((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \
47 ((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \
48 ((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \
49 ((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \
50 ((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \
51 ((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \
52 ((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \
53 ((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \
54 ((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \
55 ((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \
56 ((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \
57 ((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \
58 ((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \
59 ((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \
60 ((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \
61 ((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \
62 ((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \
63 ((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \
64 ((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \
65 ((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \
66 ((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \
67 ((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \
68 ((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \
69 "ais_error_unknown"
71 #define DM_ULOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
72 #define DM_ULOG_CHECKPOINT_READY 21
73 #define DM_ULOG_MEMBER_JOIN 22
75 #define _RQ_TYPE(x) \
76 ((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \
77 ((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN": \
78 RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
80 static uint32_t my_cluster_id = 0xDEAD;
81 static SaCkptHandleT ckpt_handle = 0;
82 static SaCkptCallbacksT callbacks = { 0, 0 };
83 static SaVersionT version = { 'B', 1, 1 };
85 #define DEBUGGING_HISTORY 100
86 //static char debugging[DEBUGGING_HISTORY][128];
87 //static int idx = 0;
88 #define LOG_SPRINT(cc, f, arg...) do { \
89 cc->idx++; \
90 cc->idx = cc->idx % DEBUGGING_HISTORY; \
91 sprintf(cc->debugging[cc->idx], f, ## arg); \
92 } while (0)
94 static int log_resp_rec = 0;
96 struct checkpoint_data {
97 uint32_t requester;
98 char uuid[CPG_MAX_NAME_LENGTH];
100 int bitmap_size; /* in bytes */
101 char *sync_bits;
102 char *clean_bits;
103 char *recovering_region;
104 struct checkpoint_data *next;
107 #define INVALID 0
108 #define VALID 1
109 #define LEAVING 2
111 #define MAX_CHECKPOINT_REQUESTERS 10
112 struct clog_cpg {
113 struct dm_list list;
115 uint32_t lowest_id;
116 cpg_handle_t handle;
117 struct cpg_name name;
118 uint64_t luid;
120 /* Are we the first, or have we received checkpoint? */
121 int state;
122 int cpg_state; /* FIXME: debugging */
123 int free_me;
124 int delay;
125 int resend_requests;
126 struct dm_list startup_list;
127 struct dm_list working_list;
129 int checkpoints_needed;
130 uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS];
131 struct checkpoint_data *checkpoint_list;
132 int idx;
133 char debugging[DEBUGGING_HISTORY][128];
136 static struct dm_list clog_cpg_list;
139 * cluster_send
140 * @rq
142 * Returns: 0 on success, -Exxx on error
144 int cluster_send(struct clog_request *rq)
146 int r;
147 int count=0;
148 int found;
149 struct iovec iov;
150 struct clog_cpg *entry;
152 dm_list_iterate_items(entry, &clog_cpg_list)
153 if (!strncmp(entry->name.value, rq->u_rq.uuid,
154 CPG_MAX_NAME_LENGTH)) {
155 found = 1;
156 break;
159 if (!found) {
160 rq->u_rq.error = -ENOENT;
161 return -ENOENT;
165 * Once the request heads for the cluster, the luid looses
166 * all its meaning.
168 rq->u_rq.luid = 0;
170 iov.iov_base = rq;
171 iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size;
173 if (entry->cpg_state != VALID)
174 return -EINVAL;
176 do {
177 r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
178 if (r != SA_AIS_ERR_TRY_AGAIN)
179 break;
180 count++;
181 if (count < 10)
182 LOG_PRINT("[%s] Retry #%d of cpg_mcast_joined: %s",
183 SHORT_UUID(rq->u_rq.uuid), count,
184 str_ais_error(r));
185 else if ((count < 100) && !(count % 10))
186 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
187 SHORT_UUID(rq->u_rq.uuid), count,
188 str_ais_error(r));
189 else if ((count < 1000) && !(count % 100))
190 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
191 SHORT_UUID(rq->u_rq.uuid), count,
192 str_ais_error(r));
193 else if ((count < 10000) && !(count % 1000))
194 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s - "
195 "OpenAIS not handling the load?",
196 SHORT_UUID(rq->u_rq.uuid), count,
197 str_ais_error(r));
198 usleep(1000);
199 } while (1);
201 if (r == CPG_OK)
202 return 0;
204 /* error codes found in openais/cpg.h */
205 LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r));
207 rq->u_rq.error = -EBADE;
208 return -EBADE;
211 static struct clog_request *get_matching_rq(struct clog_request *rq,
212 struct dm_list *l)
214 struct clog_request *match, *n;
216 dm_list_iterate_items_safe(match, n, l)
217 if (match->u_rq.seq == rq->u_rq.seq) {
218 dm_list_del(&match->list);
219 return match;
222 return NULL;
225 static char rq_buffer[DM_ULOG_REQUEST_SIZE];
226 static int handle_cluster_request(struct clog_cpg *entry,
227 struct clog_request *rq, int server)
229 int r = 0;
230 struct clog_request *tmp = (struct clog_request *)rq_buffer;
233 * We need a separate dm_ulog_request struct, one that can carry
234 * a return payload. Otherwise, the memory address after
235 * rq will be altered - leading to problems
237 memset(rq_buffer, 0, sizeof(rq_buffer));
238 memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size);
241 * With resumes, we only handle our own.
242 * Resume is a special case that requires
243 * local action (to set up CPG), followed by
244 * a cluster action to co-ordinate reading
245 * the disk and checkpointing
247 if (tmp->u_rq.request_type == DM_ULOG_RESUME) {
248 if (tmp->originator == my_cluster_id) {
249 r = do_request(tmp, server);
251 r = kernel_send(&tmp->u_rq);
252 if (r < 0)
253 LOG_ERROR("Failed to send resume response to kernel");
255 return r;
258 r = do_request(tmp, server);
260 if (server &&
261 (tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
262 (tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) {
263 tmp->u_rq.request_type |= DM_ULOG_RESPONSE;
266 * Errors from previous functions are in the rq struct.
268 r = cluster_send(tmp);
269 if (r < 0)
270 LOG_ERROR("cluster_send failed: %s", strerror(-r));
273 return r;
276 static int handle_cluster_response(struct clog_cpg *entry,
277 struct clog_request *rq)
279 int r = 0;
280 struct clog_request *orig_rq;
283 * If I didn't send it, then I don't care about the response
285 if (rq->originator != my_cluster_id)
286 return 0;
288 rq->u_rq.request_type &= ~DM_ULOG_RESPONSE;
289 orig_rq = get_matching_rq(rq, &entry->working_list);
291 if (!orig_rq) {
292 /* Unable to find match for response */
294 LOG_ERROR("[%s] No match for cluster response: %s:%u",
295 SHORT_UUID(rq->u_rq.uuid),
296 _RQ_TYPE(rq->u_rq.request_type),
297 rq->u_rq.seq);
299 LOG_ERROR("Current local list:");
300 if (dm_list_empty(&entry->working_list))
301 LOG_ERROR(" [none]");
303 dm_list_iterate_items(orig_rq, &entry->working_list)
304 LOG_ERROR(" [%s] %s:%u",
305 SHORT_UUID(orig_rq->u_rq.uuid),
306 _RQ_TYPE(orig_rq->u_rq.request_type),
307 orig_rq->u_rq.seq);
309 return -EINVAL;
312 if (log_resp_rec > 0) {
313 LOG_COND(log_resend_requests,
314 "[%s] Response received to %s/#%u",
315 SHORT_UUID(rq->u_rq.uuid),
316 _RQ_TYPE(rq->u_rq.request_type),
317 rq->u_rq.seq);
318 log_resp_rec--;
321 /* FIXME: Ensure memcpy cannot explode */
322 memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
324 r = kernel_send(&orig_rq->u_rq);
325 if (r)
326 LOG_ERROR("Failed to send response to kernel");
328 free(orig_rq);
329 return r;
332 static struct clog_cpg *find_clog_cpg(cpg_handle_t handle)
334 struct clog_cpg *match;
336 dm_list_iterate_items(match, &clog_cpg_list)
337 if (match->handle == handle)
338 return match;
340 return NULL;
344 * prepare_checkpoint
345 * @entry: clog_cpg describing the log
346 * @cp_requester: nodeid requesting the checkpoint
348 * Creates and fills in a new checkpoint_data struct.
350 * Returns: checkpoint_data on success, NULL on error
352 static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
353 uint32_t cp_requester)
355 int r;
356 struct checkpoint_data *new;
358 if (entry->state != VALID) {
360 * We can't store bitmaps yet, because the log is not
361 * valid yet.
363 LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
364 cp_requester);
365 return NULL;
368 new = malloc(sizeof(*new));
369 if (!new) {
370 LOG_ERROR("Unable to create checkpoint data for %u",
371 cp_requester);
372 return NULL;
374 memset(new, 0, sizeof(*new));
375 new->requester = cp_requester;
376 strncpy(new->uuid, entry->name.value, entry->name.length);
378 new->bitmap_size = push_state(entry->name.value, entry->luid,
379 "clean_bits",
380 &new->clean_bits, cp_requester);
381 if (new->bitmap_size <= 0) {
382 LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
383 new->requester);
384 free(new);
385 return NULL;
388 new->bitmap_size = push_state(entry->name.value, entry->luid,
389 "sync_bits",
390 &new->sync_bits, cp_requester);
391 if (new->bitmap_size <= 0) {
392 LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
393 new->requester);
394 free(new->clean_bits);
395 free(new);
396 return NULL;
399 r = push_state(entry->name.value, entry->luid,
400 "recovering_region",
401 &new->recovering_region, cp_requester);
402 if (r <= 0) {
403 LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
404 new->requester);
405 free(new->sync_bits);
406 free(new->clean_bits);
407 free(new);
408 return NULL;
410 LOG_DBG("[%s] Checkpoint prepared for node %u:",
411 SHORT_UUID(new->uuid), new->requester);
412 LOG_DBG(" bitmap_size = %d", new->bitmap_size);
414 return new;
418 * free_checkpoint
419 * @cp: the checkpoint_data struct to free
422 static void free_checkpoint(struct checkpoint_data *cp)
424 free(cp->recovering_region);
425 free(cp->sync_bits);
426 free(cp->clean_bits);
427 free(cp);
430 static int export_checkpoint(struct checkpoint_data *cp)
432 SaCkptCheckpointCreationAttributesT attr;
433 SaCkptCheckpointHandleT h;
434 SaCkptSectionIdT section_id;
435 SaCkptSectionCreationAttributesT section_attr;
436 SaCkptCheckpointOpenFlagsT flags;
437 SaNameT name;
438 SaAisErrorT rv;
439 struct clog_request *rq;
440 int len, r = 0;
441 char buf[32];
443 LOG_DBG("Sending checkpointed data to %u", cp->requester);
445 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH,
446 "bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester);
447 name.length = len;
449 len = strlen(cp->recovering_region) + 1;
451 attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
452 attr.checkpointSize = cp->bitmap_size * 2 + len;
454 attr.retentionDuration = SA_TIME_MAX;
455 attr.maxSections = 4; /* don't know why we need +1 */
457 attr.maxSectionSize = (cp->bitmap_size > len) ? cp->bitmap_size : len;
458 attr.maxSectionIdSize = 22;
460 flags = SA_CKPT_CHECKPOINT_READ |
461 SA_CKPT_CHECKPOINT_WRITE |
462 SA_CKPT_CHECKPOINT_CREATE;
464 open_retry:
465 rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h);
466 if (rv == SA_AIS_ERR_TRY_AGAIN) {
467 LOG_ERROR("export_checkpoint: ckpt open retry");
468 usleep(1000);
469 goto open_retry;
472 if (rv == SA_AIS_ERR_EXIST) {
473 LOG_DBG("export_checkpoint: checkpoint already exists");
474 return -EEXIST;
477 if (rv != SA_AIS_OK) {
478 LOG_ERROR("[%s] Failed to open checkpoint for %u: %s",
479 SHORT_UUID(cp->uuid), cp->requester,
480 str_ais_error(rv));
481 return -EIO; /* FIXME: better error */
485 * Add section for sync_bits
487 section_id.idLen = snprintf(buf, 32, "sync_bits");
488 section_id.id = (unsigned char *)buf;
489 section_attr.sectionId = &section_id;
490 section_attr.expirationTime = SA_TIME_END;
492 sync_create_retry:
493 rv = saCkptSectionCreate(h, &section_attr,
494 cp->sync_bits, cp->bitmap_size);
495 if (rv == SA_AIS_ERR_TRY_AGAIN) {
496 LOG_ERROR("Sync checkpoint section create retry");
497 usleep(1000);
498 goto sync_create_retry;
501 if (rv == SA_AIS_ERR_EXIST) {
502 LOG_DBG("Sync checkpoint section already exists");
503 saCkptCheckpointClose(h);
504 return -EEXIST;
507 if (rv != SA_AIS_OK) {
508 LOG_ERROR("Sync checkpoint section creation failed: %s",
509 str_ais_error(rv));
510 saCkptCheckpointClose(h);
511 return -EIO; /* FIXME: better error */
515 * Add section for clean_bits
517 section_id.idLen = snprintf(buf, 32, "clean_bits");
518 section_id.id = (unsigned char *)buf;
519 section_attr.sectionId = &section_id;
520 section_attr.expirationTime = SA_TIME_END;
522 clean_create_retry:
523 rv = saCkptSectionCreate(h, &section_attr, cp->clean_bits, cp->bitmap_size);
524 if (rv == SA_AIS_ERR_TRY_AGAIN) {
525 LOG_ERROR("Clean checkpoint section create retry");
526 usleep(1000);
527 goto clean_create_retry;
530 if (rv == SA_AIS_ERR_EXIST) {
531 LOG_DBG("Clean checkpoint section already exists");
532 saCkptCheckpointClose(h);
533 return -EEXIST;
536 if (rv != SA_AIS_OK) {
537 LOG_ERROR("Clean checkpoint section creation failed: %s",
538 str_ais_error(rv));
539 saCkptCheckpointClose(h);
540 return -EIO; /* FIXME: better error */
544 * Add section for recovering_region
546 section_id.idLen = snprintf(buf, 32, "recovering_region");
547 section_id.id = (unsigned char *)buf;
548 section_attr.sectionId = &section_id;
549 section_attr.expirationTime = SA_TIME_END;
551 rr_create_retry:
552 rv = saCkptSectionCreate(h, &section_attr, cp->recovering_region,
553 strlen(cp->recovering_region) + 1);
554 if (rv == SA_AIS_ERR_TRY_AGAIN) {
555 LOG_ERROR("RR checkpoint section create retry");
556 usleep(1000);
557 goto rr_create_retry;
560 if (rv == SA_AIS_ERR_EXIST) {
561 LOG_DBG("RR checkpoint section already exists");
562 saCkptCheckpointClose(h);
563 return -EEXIST;
566 if (rv != SA_AIS_OK) {
567 LOG_ERROR("RR checkpoint section creation failed: %s",
568 str_ais_error(rv));
569 saCkptCheckpointClose(h);
570 return -EIO; /* FIXME: better error */
573 LOG_DBG("export_checkpoint: closing checkpoint");
574 saCkptCheckpointClose(h);
576 rq = malloc(DM_ULOG_REQUEST_SIZE);
577 if (!rq) {
578 LOG_ERROR("export_checkpoint: Unable to allocate transfer structs");
579 return -ENOMEM;
581 memset(rq, 0, sizeof(*rq));
583 dm_list_init(&rq->list);
584 rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
585 rq->originator = cp->requester; /* FIXME: hack to overload meaning of originator */
586 strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
587 rq->u_rq.seq = my_cluster_id;
589 r = cluster_send(rq);
590 if (r)
591 LOG_ERROR("Failed to send checkpoint ready notice: %s",
592 strerror(-r));
594 free(rq);
595 return 0;
598 static int import_checkpoint(struct clog_cpg *entry, int no_read)
600 int rtn = 0;
601 SaCkptCheckpointHandleT h;
602 SaCkptSectionIterationHandleT itr;
603 SaCkptSectionDescriptorT desc;
604 SaCkptIOVectorElementT iov;
605 SaNameT name;
606 SaAisErrorT rv;
607 char *bitmap = NULL;
608 int len;
610 bitmap = malloc(1024*1024);
611 if (!bitmap)
612 return -ENOMEM;
614 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
615 SHORT_UUID(entry->name.value), my_cluster_id);
616 name.length = len;
618 open_retry:
619 rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
620 SA_CKPT_CHECKPOINT_READ, 0, &h);
621 if (rv == SA_AIS_ERR_TRY_AGAIN) {
622 LOG_ERROR("import_checkpoint: ckpt open retry");
623 usleep(1000);
624 goto open_retry;
627 if (rv != SA_AIS_OK) {
628 LOG_ERROR("[%s] Failed to open checkpoint: %s",
629 SHORT_UUID(entry->name.value), str_ais_error(rv));
630 return -EIO; /* FIXME: better error */
633 unlink_retry:
634 rv = saCkptCheckpointUnlink(ckpt_handle, &name);
635 if (rv == SA_AIS_ERR_TRY_AGAIN) {
636 LOG_ERROR("import_checkpoint: ckpt unlink retry");
637 usleep(1000);
638 goto unlink_retry;
641 if (no_read) {
642 LOG_DBG("Checkpoint for this log already received");
643 goto no_read;
646 init_retry:
647 rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
648 SA_TIME_END, &itr);
649 if (rv == SA_AIS_ERR_TRY_AGAIN) {
650 LOG_ERROR("import_checkpoint: sync create retry");
651 usleep(1000);
652 goto init_retry;
655 if (rv != SA_AIS_OK) {
656 LOG_ERROR("[%s] Sync checkpoint section creation failed: %s",
657 SHORT_UUID(entry->name.value), str_ais_error(rv));
658 return -EIO; /* FIXME: better error */
661 len = 0;
662 while (1) {
663 rv = saCkptSectionIterationNext(itr, &desc);
664 if (rv == SA_AIS_OK)
665 len++;
666 else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len)
667 break;
668 else if (rv != SA_AIS_ERR_TRY_AGAIN) {
669 LOG_ERROR("saCkptSectionIterationNext failure: %d", rv);
670 break;
673 saCkptSectionIterationFinalize(itr);
674 if (len != 3) {
675 LOG_ERROR("import_checkpoint: %d checkpoint sections found",
676 len);
677 usleep(1000);
678 goto init_retry;
680 saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
681 SA_TIME_END, &itr);
683 while (1) {
684 rv = saCkptSectionIterationNext(itr, &desc);
685 if (rv == SA_AIS_ERR_NO_SECTIONS)
686 break;
688 if (rv == SA_AIS_ERR_TRY_AGAIN) {
689 LOG_ERROR("import_checkpoint: ckpt iternext retry");
690 usleep(1000);
691 continue;
694 if (rv != SA_AIS_OK) {
695 LOG_ERROR("import_checkpoint: clean checkpoint section "
696 "creation failed: %s", str_ais_error(rv));
697 rtn = -EIO; /* FIXME: better error */
698 goto fail;
701 if (!desc.sectionSize) {
702 LOG_ERROR("Checkpoint section empty");
703 continue;
706 memset(bitmap, 0, sizeof(*bitmap));
707 iov.sectionId = desc.sectionId;
708 iov.dataBuffer = bitmap;
709 iov.dataSize = desc.sectionSize;
710 iov.dataOffset = 0;
712 read_retry:
713 rv = saCkptCheckpointRead(h, &iov, 1, NULL);
714 if (rv == SA_AIS_ERR_TRY_AGAIN) {
715 LOG_ERROR("ckpt read retry");
716 usleep(1000);
717 goto read_retry;
720 if (rv != SA_AIS_OK) {
721 LOG_ERROR("import_checkpoint: ckpt read error: %s",
722 str_ais_error(rv));
723 rtn = -EIO; /* FIXME: better error */
724 goto fail;
727 if (iov.readSize) {
728 if (pull_state(entry->name.value, entry->luid,
729 (char *)desc.sectionId.id, bitmap,
730 iov.readSize)) {
731 LOG_ERROR("Error loading state");
732 rtn = -EIO;
733 goto fail;
735 } else {
736 /* Need to request new checkpoint */
737 rtn = -EAGAIN;
738 goto fail;
742 fail:
743 saCkptSectionIterationFinalize(itr);
744 no_read:
745 saCkptCheckpointClose(h);
747 free(bitmap);
748 return rtn;
751 static void do_checkpoints(struct clog_cpg *entry, int leaving)
753 struct checkpoint_data *cp;
755 for (cp = entry->checkpoint_list; cp;) {
757 * FIXME: Check return code. Could send failure
758 * notice in rq in export_checkpoint function
759 * by setting rq->error
761 switch (export_checkpoint(cp)) {
762 case -EEXIST:
763 LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s",
764 SHORT_UUID(entry->name.value), cp->requester,
765 (leaving) ? "(L)": "");
766 LOG_COND(log_checkpoint,
767 "[%s] Checkpoint for %u already handled%s",
768 SHORT_UUID(entry->name.value), cp->requester,
769 (leaving) ? "(L)": "");
770 entry->checkpoint_list = cp->next;
771 free_checkpoint(cp);
772 cp = entry->checkpoint_list;
773 break;
774 case 0:
775 LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s",
776 SHORT_UUID(entry->name.value), cp->requester,
777 (leaving) ? "(L)": "");
778 LOG_COND(log_checkpoint,
779 "[%s] Checkpoint data available for node %u%s",
780 SHORT_UUID(entry->name.value), cp->requester,
781 (leaving) ? "(L)": "");
782 entry->checkpoint_list = cp->next;
783 free_checkpoint(cp);
784 cp = entry->checkpoint_list;
785 break;
786 default:
787 /* FIXME: Skipping will cause list corruption */
788 LOG_ERROR("[%s] Failed to export checkpoint for %u%s",
789 SHORT_UUID(entry->name.value), cp->requester,
790 (leaving) ? "(L)": "");
795 static int resend_requests(struct clog_cpg *entry)
797 int r = 0;
798 struct clog_request *rq, *n;
800 if (!entry->resend_requests || entry->delay)
801 return 0;
803 if (entry->state != VALID)
804 return 0;
806 entry->resend_requests = 0;
808 dm_list_iterate_items_safe(rq, n, &entry->working_list) {
809 dm_list_del(&rq->list);
811 if (strcmp(entry->name.value, rq->u_rq.uuid)) {
812 LOG_ERROR("[%s] Stray request from another log (%s)",
813 SHORT_UUID(entry->name.value),
814 SHORT_UUID(rq->u_rq.uuid));
815 free(rq);
816 continue;
819 switch (rq->u_rq.request_type) {
820 case DM_ULOG_SET_REGION_SYNC:
822 * Some requests simply do not need to be resent.
823 * If it is a request that just changes log state,
824 * then it doesn't need to be resent (everyone makes
825 * updates).
827 LOG_COND(log_resend_requests,
828 "[%s] Skipping resend of %s/#%u...",
829 SHORT_UUID(entry->name.value),
830 _RQ_TYPE(rq->u_rq.request_type),
831 rq->u_rq.seq);
832 LOG_SPRINT(entry, "### No resend: [%s] %s/%u ###",
833 SHORT_UUID(entry->name.value),
834 _RQ_TYPE(rq->u_rq.request_type),
835 rq->u_rq.seq);
837 rq->u_rq.data_size = 0;
838 kernel_send(&rq->u_rq);
840 break;
842 default:
844 * If an action or a response is required, then
845 * the request must be resent.
847 LOG_COND(log_resend_requests,
848 "[%s] Resending %s(#%u) due to new server(%u)",
849 SHORT_UUID(entry->name.value),
850 _RQ_TYPE(rq->u_rq.request_type),
851 rq->u_rq.seq, entry->lowest_id);
852 LOG_SPRINT(entry, "*** Resending: [%s] %s/%u ***",
853 SHORT_UUID(entry->name.value),
854 _RQ_TYPE(rq->u_rq.request_type),
855 rq->u_rq.seq);
856 r = cluster_send(rq);
857 if (r < 0)
858 LOG_ERROR("Failed resend");
860 free(rq);
863 return r;
866 static int do_cluster_work(void *data)
868 int r = SA_AIS_OK;
869 struct clog_cpg *entry;
871 dm_list_iterate_items(entry, &clog_cpg_list) {
872 r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
873 if (r != SA_AIS_OK)
874 LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
876 if (entry->free_me) {
877 free(entry);
878 continue;
880 do_checkpoints(entry, 0);
882 resend_requests(entry);
885 return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */
888 static int flush_startup_list(struct clog_cpg *entry)
890 int r = 0;
891 int i_was_server;
892 struct clog_request *rq, *n;
893 struct checkpoint_data *new;
895 dm_list_iterate_items_safe(rq, n, &entry->startup_list) {
896 dm_list_del(&rq->list);
898 if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) {
899 new = prepare_checkpoint(entry, rq->originator);
900 if (!new) {
902 * FIXME: Need better error handling. Other nodes
903 * will be trying to send the checkpoint too, and we
904 * must continue processing the list; so report error
905 * but continue.
907 LOG_ERROR("Failed to prepare checkpoint for %u!!!",
908 rq->originator);
909 free(rq);
910 continue;
912 LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u",
913 SHORT_UUID(entry->name.value), rq->originator);
914 LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
915 SHORT_UUID(entry->name.value), rq->originator);
916 new->next = entry->checkpoint_list;
917 entry->checkpoint_list = new;
918 } else {
919 LOG_DBG("[%s] Processing delayed request: %s",
920 SHORT_UUID(rq->u_rq.uuid),
921 _RQ_TYPE(rq->u_rq.request_type));
922 i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0;
923 r = handle_cluster_request(entry, rq, i_was_server);
925 if (r)
927 * FIXME: If we error out here, we will never get
928 * another opportunity to retry these requests
930 LOG_ERROR("Error while processing delayed CPG message");
932 free(rq);
935 return 0;
938 static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname,
939 uint32_t nodeid, uint32_t pid,
940 void *msg, size_t msg_len)
942 int i;
943 int r = 0;
944 int i_am_server;
945 int response = 0;
946 struct clog_request *rq = msg;
947 struct clog_request *tmp_rq;
948 struct clog_cpg *match;
950 match = find_clog_cpg(handle);
951 if (!match) {
952 LOG_ERROR("Unable to find clog_cpg for cluster message");
953 return;
956 if ((nodeid == my_cluster_id) &&
957 !(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
958 (rq->u_rq.request_type != DM_ULOG_RESUME) &&
959 (rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
960 (rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) {
961 tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
962 if (!tmp_rq) {
964 * FIXME: It may be possible to continue... but we
965 * would not be able to resend any messages that might
966 * be necessary during membership changes
968 LOG_ERROR("[%s] Unable to record request: -ENOMEM",
969 SHORT_UUID(rq->u_rq.uuid));
970 return;
972 memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
973 dm_list_init(&tmp_rq->list);
974 dm_list_add( &match->working_list, &tmp_rq->list);
977 if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
979 * If the server (lowest_id) indicates it is leaving,
980 * then we must resend any outstanding requests. However,
981 * we do not want to resend them if the next server in
982 * line is in the process of leaving.
984 if (nodeid == my_cluster_id) {
985 LOG_COND(log_resend_requests, "[%s] I am leaving.1.....",
986 SHORT_UUID(rq->u_rq.uuid));
987 } else {
988 if (nodeid < my_cluster_id) {
989 if (nodeid == match->lowest_id) {
990 match->resend_requests = 1;
991 LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
992 SHORT_UUID(rq->u_rq.uuid), nodeid,
993 (dm_list_empty(&match->working_list)) ? " -- working_list empty": "");
995 dm_list_iterate_items(tmp_rq, &match->working_list)
996 LOG_COND(log_resend_requests,
997 "[%s] %s/%u",
998 SHORT_UUID(tmp_rq->u_rq.uuid),
999 _RQ_TYPE(tmp_rq->u_rq.request_type),
1000 tmp_rq->u_rq.seq);
1003 match->delay++;
1004 LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
1005 SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay);
1007 rq->originator = nodeid; /* don't really need this, but nice for debug */
1008 goto out;
1013 * We can receive messages after we do a cpg_leave but before we
1014 * get our config callback. However, since we can't respond after
1015 * leaving, we simply return.
1017 if (match->state == LEAVING)
1018 return;
1020 i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
1022 if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) {
1023 if (my_cluster_id == rq->originator) {
1024 /* Redundant checkpoints ignored if match->valid */
1025 LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
1026 SHORT_UUID(rq->u_rq.uuid), nodeid);
1027 if (import_checkpoint(match, (match->state != INVALID))) {
1028 LOG_SPRINT(match,
1029 "[%s] Failed to import checkpoint from %u",
1030 SHORT_UUID(rq->u_rq.uuid), nodeid);
1031 LOG_ERROR("[%s] Failed to import checkpoint from %u",
1032 SHORT_UUID(rq->u_rq.uuid), nodeid);
1033 kill(getpid(), SIGUSR1);
1034 /* Could we retry? */
1035 goto out;
1036 } else if (match->state == INVALID) {
1037 LOG_SPRINT(match,
1038 "[%s] Checkpoint data received from %u. Log is now valid",
1039 SHORT_UUID(match->name.value), nodeid);
1040 LOG_COND(log_checkpoint,
1041 "[%s] Checkpoint data received from %u. Log is now valid",
1042 SHORT_UUID(match->name.value), nodeid);
1043 match->state = VALID;
1045 flush_startup_list(match);
1046 } else {
1047 LOG_SPRINT(match,
1048 "[%s] Redundant checkpoint from %u ignored.",
1049 SHORT_UUID(rq->u_rq.uuid), nodeid);
1052 goto out;
1055 if (rq->u_rq.request_type & DM_ULOG_RESPONSE) {
1056 response = 1;
1057 r = handle_cluster_response(match, rq);
1058 } else {
1059 rq->originator = nodeid;
1061 if (match->state == LEAVING) {
1062 LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving",
1063 SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type),
1064 rq->originator);
1065 goto out;
1068 if (match->state == INVALID) {
1069 LOG_DBG("Log not valid yet, storing request");
1070 tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
1071 if (!tmp_rq) {
1072 LOG_ERROR("cpg_message_callback: Unable to"
1073 " allocate transfer structs");
1074 r = -ENOMEM; /* FIXME: Better error #? */
1075 goto out;
1078 memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
1079 tmp_rq->pit_server = match->lowest_id;
1080 dm_list_init(&tmp_rq->list);
1081 dm_list_add(&match->startup_list, &tmp_rq->list);
1082 goto out;
1085 r = handle_cluster_request(match, rq, i_am_server);
1089 * If the log is now valid, we can queue the checkpoints
1091 for (i = match->checkpoints_needed; i; ) {
1092 struct checkpoint_data *new;
1094 if (log_get_state(&rq->u_rq) != LOG_RESUMED) {
1095 LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)",
1096 SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid);
1097 break;
1100 i--;
1101 new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
1102 if (!new) {
1103 /* FIXME: Need better error handling */
1104 LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
1105 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1106 break;
1108 LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)",
1109 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i],
1110 (log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED");
1111 LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
1112 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1113 match->checkpoints_needed--;
1115 new->next = match->checkpoint_list;
1116 match->checkpoint_list = new;
1119 out:
1120 /* nothing happens after this point. It is just for debugging */
1121 if (r) {
1122 LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
1123 SHORT_UUID(rq->u_rq.uuid),
1124 _RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE),
1125 strerror(-r));
1126 LOG_ERROR("[%s] Response : %s", SHORT_UUID(rq->u_rq.uuid),
1127 (response) ? "YES" : "NO");
1128 LOG_ERROR("[%s] Originator: %u",
1129 SHORT_UUID(rq->u_rq.uuid), rq->originator);
1130 if (response)
1131 LOG_ERROR("[%s] Responder : %u",
1132 SHORT_UUID(rq->u_rq.uuid), nodeid);
1134 LOG_ERROR("HISTORY::");
1135 for (i = 0; i < DEBUGGING_HISTORY; i++) {
1136 match->idx++;
1137 match->idx = match->idx % DEBUGGING_HISTORY;
1138 if (match->debugging[match->idx][0] == '\0')
1139 continue;
1140 LOG_ERROR("%d:%d) %s", i, match->idx,
1141 match->debugging[match->idx]);
1143 } else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) ||
1144 (rq->originator == my_cluster_id)) {
1145 if (!response)
1146 LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
1147 rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1148 _RQ_TYPE(rq->u_rq.request_type),
1149 rq->originator, (response) ? "YES" : "NO");
1150 else
1151 LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u",
1152 rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1153 _RQ_TYPE(rq->u_rq.request_type),
1154 rq->originator, (response) ? "YES" : "NO",
1155 nodeid);
1159 static void cpg_join_callback(struct clog_cpg *match,
1160 const struct cpg_address *joined,
1161 const struct cpg_address *member_list,
1162 size_t member_list_entries)
1164 int i;
1165 int my_pid = getpid();
1166 uint32_t lowest = match->lowest_id;
1167 struct clog_request *rq;
1168 char dbuf[32];
1170 /* Assign my_cluster_id */
1171 if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
1172 my_cluster_id = joined->nodeid;
1174 /* Am I the very first to join? */
1175 if (member_list_entries == 1) {
1176 match->lowest_id = joined->nodeid;
1177 match->state = VALID;
1180 /* If I am part of the joining list, I do not send checkpoints */
1181 if (joined->nodeid == my_cluster_id)
1182 goto out;
1184 memset(dbuf, 0, sizeof(dbuf));
1185 for (i = 0; i < (member_list_entries-1); i++)
1186 sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid);
1187 sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid);
1188 LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]",
1189 SHORT_UUID(match->name.value), joined->nodeid, dbuf);
1192 * FIXME: remove checkpoint_requesters/checkpoints_needed, and use
1193 * the startup_list interface exclusively
1195 if (dm_list_empty(&match->startup_list) && (match->state == VALID) &&
1196 (match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) {
1197 match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
1198 goto out;
1201 rq = malloc(DM_ULOG_REQUEST_SIZE);
1202 if (!rq) {
1203 LOG_ERROR("cpg_config_callback: "
1204 "Unable to allocate transfer structs");
1205 LOG_ERROR("cpg_config_callback: "
1206 "Unable to perform checkpoint");
1207 goto out;
1209 rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN;
1210 rq->originator = joined->nodeid;
1211 dm_list_init(&rq->list);
1212 dm_list_add(&match->startup_list, &rq->list);
1214 out:
1215 /* Find the lowest_id, i.e. the server */
1216 match->lowest_id = member_list[0].nodeid;
1217 for (i = 0; i < member_list_entries; i++)
1218 if (match->lowest_id > member_list[i].nodeid)
1219 match->lowest_id = member_list[i].nodeid;
1221 if (lowest == 0xDEAD)
1222 LOG_COND(log_membership_change, "[%s] Server change <none> -> %u (%u %s)",
1223 SHORT_UUID(match->name.value), match->lowest_id,
1224 joined->nodeid, (member_list_entries == 1) ?
1225 "is first to join" : "joined");
1226 else if (lowest != match->lowest_id)
1227 LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u joined)",
1228 SHORT_UUID(match->name.value), lowest,
1229 match->lowest_id, joined->nodeid);
1230 else
1231 LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u joined)",
1232 SHORT_UUID(match->name.value),
1233 lowest, joined->nodeid);
1234 LOG_SPRINT(match, "+++ UUID=%s %u join +++",
1235 SHORT_UUID(match->name.value), joined->nodeid);
1238 static void cpg_leave_callback(struct clog_cpg *match,
1239 const struct cpg_address *left,
1240 const struct cpg_address *member_list,
1241 size_t member_list_entries)
1243 int i, j, fd;
1244 uint32_t lowest = match->lowest_id;
1245 struct clog_request *rq, *n;
1246 struct checkpoint_data *p_cp, *c_cp;
1248 LOG_SPRINT(match, "--- UUID=%s %u left ---",
1249 SHORT_UUID(match->name.value), left->nodeid);
1251 /* Am I leaving? */
1252 if (my_cluster_id == left->nodeid) {
1253 LOG_DBG("Finalizing leave...");
1254 dm_list_del(&match->list);
1256 cpg_fd_get(match->handle, &fd);
1257 links_unregister(fd);
1259 cluster_postsuspend(match->name.value, match->luid);
1261 dm_list_iterate_items_safe(rq, n, &match->working_list) {
1262 dm_list_del(&rq->list);
1264 if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND)
1265 kernel_send(&rq->u_rq);
1266 free(rq);
1269 cpg_finalize(match->handle);
1271 match->free_me = 1;
1272 match->lowest_id = 0xDEAD;
1273 match->state = INVALID;
1276 /* Remove any pending checkpoints for the leaving node. */
1277 for (p_cp = NULL, c_cp = match->checkpoint_list;
1278 c_cp && (c_cp->requester != left->nodeid);
1279 p_cp = c_cp, c_cp = c_cp->next);
1280 if (c_cp) {
1281 if (p_cp)
1282 p_cp->next = c_cp->next;
1283 else
1284 match->checkpoint_list = c_cp->next;
1286 LOG_COND(log_checkpoint,
1287 "[%s] Removing pending checkpoint (%u is leaving)",
1288 SHORT_UUID(match->name.value), left->nodeid);
1289 free_checkpoint(c_cp);
1291 dm_list_iterate_items_safe(rq, n, &match->startup_list) {
1292 if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) &&
1293 (rq->originator == left->nodeid)) {
1294 LOG_COND(log_checkpoint,
1295 "[%s] Removing pending ckpt from startup list (%u is leaving)",
1296 SHORT_UUID(match->name.value), left->nodeid);
1297 dm_list_del(&rq->list);
1298 free(rq);
1301 for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) {
1302 match->checkpoint_requesters[j] = match->checkpoint_requesters[i];
1303 if (match->checkpoint_requesters[i] == left->nodeid) {
1304 LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)",
1305 SHORT_UUID(match->name.value), left->nodeid);
1306 j--;
1309 match->checkpoints_needed = j;
1311 if (left->nodeid < my_cluster_id) {
1312 match->delay = (match->delay > 0) ? match->delay - 1 : 0;
1313 if (!match->delay && dm_list_empty(&match->working_list))
1314 match->resend_requests = 0;
1315 LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s",
1316 SHORT_UUID(match->name.value), left->nodeid,
1317 match->delay, (dm_list_empty(&match->working_list)) ?
1318 " -- working_list empty": "");
1321 /* Find the lowest_id, i.e. the server */
1322 if (!member_list_entries) {
1323 match->lowest_id = 0xDEAD;
1324 LOG_COND(log_membership_change, "[%s] Server change %u -> <none> "
1325 "(%u is last to leave)",
1326 SHORT_UUID(match->name.value), left->nodeid,
1327 left->nodeid);
1328 return;
1331 match->lowest_id = member_list[0].nodeid;
1332 for (i = 0; i < member_list_entries; i++)
1333 if (match->lowest_id > member_list[i].nodeid)
1334 match->lowest_id = member_list[i].nodeid;
1336 if (lowest != match->lowest_id) {
1337 LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u left)",
1338 SHORT_UUID(match->name.value), lowest,
1339 match->lowest_id, left->nodeid);
1340 } else
1341 LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u left)",
1342 SHORT_UUID(match->name.value), lowest, left->nodeid);
1344 if ((match->state == INVALID) && !match->free_me) {
1346 * If all CPG members are waiting for checkpoints and they
1347 * are all present in my startup_list, then I was the first to
1348 * join and I must assume control.
1350 * We do not normally end up here, but if there was a quick
1351 * 'resume -> suspend -> resume' across the cluster, we may
1352 * have initially thought we were not the first to join because
1353 * of the presence of out-going (and unable to respond) members.
1356 i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */
1357 dm_list_iterate_items(rq, &match->startup_list)
1358 if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN)
1359 i++;
1361 if (i == member_list_entries) {
1363 * Last node who could have given me a checkpoint just left.
1364 * Setting log state to VALID and acting as 'first join'.
1366 match->state = VALID;
1367 flush_startup_list(match);
1372 static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gname,
1373 const struct cpg_address *member_list,
1374 size_t member_list_entries,
1375 const struct cpg_address *left_list,
1376 size_t left_list_entries,
1377 const struct cpg_address *joined_list,
1378 size_t joined_list_entries)
1380 struct clog_cpg *match;
1381 int found = 0;
1383 dm_list_iterate_items(match, &clog_cpg_list)
1384 if (match->handle == handle) {
1385 found = 1;
1386 break;
1389 if (!found) {
1390 LOG_ERROR("Unable to find match for CPG config callback");
1391 return;
1394 if ((joined_list_entries + left_list_entries) > 1)
1395 LOG_ERROR("[%s] More than one node joining/leaving",
1396 SHORT_UUID(match->name.value));
1398 if (joined_list_entries)
1399 cpg_join_callback(match, joined_list,
1400 member_list, member_list_entries);
1401 else
1402 cpg_leave_callback(match, left_list,
1403 member_list, member_list_entries);
1406 cpg_callbacks_t cpg_callbacks = {
1407 .cpg_deliver_fn = cpg_message_callback,
1408 .cpg_confchg_fn = cpg_config_callback,
1412 * remove_checkpoint
1413 * @entry
1415 * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error
1417 int remove_checkpoint(struct clog_cpg *entry)
1419 int len;
1420 SaNameT name;
1421 SaAisErrorT rv;
1422 SaCkptCheckpointHandleT h;
1424 len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
1425 SHORT_UUID(entry->name.value), my_cluster_id);
1426 name.length = len;
1428 open_retry:
1429 rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
1430 SA_CKPT_CHECKPOINT_READ, 0, &h);
1431 if (rv == SA_AIS_ERR_TRY_AGAIN) {
1432 LOG_ERROR("abort_startup: ckpt open retry");
1433 usleep(1000);
1434 goto open_retry;
1437 if (rv != SA_AIS_OK)
1438 return 0;
1440 LOG_DBG("[%s] Removing checkpoint", SHORT_UUID(entry->name.value));
1441 unlink_retry:
1442 rv = saCkptCheckpointUnlink(ckpt_handle, &name);
1443 if (rv == SA_AIS_ERR_TRY_AGAIN) {
1444 LOG_ERROR("abort_startup: ckpt unlink retry");
1445 usleep(1000);
1446 goto unlink_retry;
1449 if (rv != SA_AIS_OK) {
1450 LOG_ERROR("[%s] Failed to unlink checkpoint: %s",
1451 SHORT_UUID(entry->name.value), str_ais_error(rv));
1452 return -EIO;
1455 saCkptCheckpointClose(h);
1457 return 1;
1460 int create_cluster_cpg(char *uuid, uint64_t luid)
1462 int r;
1463 int size;
1464 struct clog_cpg *new = NULL;
1465 struct clog_cpg *tmp;
1467 dm_list_iterate_items(tmp, &clog_cpg_list)
1468 if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) {
1469 LOG_ERROR("Log entry already exists: %s", uuid);
1470 return -EEXIST;
1473 new = malloc(sizeof(*new));
1474 if (!new) {
1475 LOG_ERROR("Unable to allocate memory for clog_cpg");
1476 return -ENOMEM;
1478 memset(new, 0, sizeof(*new));
1479 dm_list_init(&new->list);
1480 new->lowest_id = 0xDEAD;
1481 dm_list_init(&new->startup_list);
1482 dm_list_init(&new->working_list);
1484 size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ?
1485 CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1);
1486 strncpy(new->name.value, uuid, size);
1487 new->name.length = size;
1488 new->luid = luid;
1491 * Ensure there are no stale checkpoints around before we join
1493 if (remove_checkpoint(new) == 1)
1494 LOG_COND(log_checkpoint,
1495 "[%s] Removing checkpoints left from previous session",
1496 SHORT_UUID(new->name.value));
1498 r = cpg_initialize(&new->handle, &cpg_callbacks);
1499 if (r != SA_AIS_OK) {
1500 LOG_ERROR("cpg_initialize failed: Cannot join cluster");
1501 free(new);
1502 return -EPERM;
1505 r = cpg_join(new->handle, &new->name);
1506 if (r != SA_AIS_OK) {
1507 LOG_ERROR("cpg_join failed: Cannot join cluster");
1508 free(new);
1509 return -EPERM;
1512 new->cpg_state = VALID;
1513 dm_list_add(&clog_cpg_list, &new->list);
1514 LOG_DBG("New handle: %llu", (unsigned long long)new->handle);
1515 LOG_DBG("New name: %s", new->name.value);
1517 /* FIXME: better variable */
1518 cpg_fd_get(new->handle, &r);
1519 links_register(r, "cluster", do_cluster_work, NULL);
1521 return 0;
1524 static void abort_startup(struct clog_cpg *del)
1526 struct clog_request *rq, *n;
1528 LOG_DBG("[%s] CPG teardown before checkpoint received",
1529 SHORT_UUID(del->name.value));
1531 dm_list_iterate_items_safe(rq, n, &del->startup_list) {
1532 dm_list_del(&rq->list);
1534 LOG_DBG("[%s] Ignoring request from %u: %s",
1535 SHORT_UUID(del->name.value), rq->originator,
1536 _RQ_TYPE(rq->u_rq.request_type));
1537 free(rq);
1540 remove_checkpoint(del);
1543 static int _destroy_cluster_cpg(struct clog_cpg *del)
1545 int r;
1546 int state;
1548 LOG_COND(log_resend_requests, "[%s] I am leaving.2.....",
1549 SHORT_UUID(del->name.value));
1552 * We must send any left over checkpoints before
1553 * leaving. If we don't, an incoming node could
1554 * be stuck with no checkpoint and stall.
1555 do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
1557 - Incoming node deletes old checkpoints before joining
1558 - A stale checkpoint is issued here by leaving node
1559 - (leaving node leaves)
1560 - Incoming node joins cluster and finds stale checkpoint.
1561 - (leaving node leaves - option 2)
1563 do_checkpoints(del, 1);
1565 state = del->state;
1567 del->cpg_state = INVALID;
1568 del->state = LEAVING;
1571 * If the state is VALID, we might be processing the
1572 * startup list. If so, we certainly don't want to
1573 * clear the startup_list here by calling abort_startup
1575 if (!dm_list_empty(&del->startup_list) && (state != VALID))
1576 abort_startup(del);
1578 r = cpg_leave(del->handle, &del->name);
1579 if (r != CPG_OK)
1580 LOG_ERROR("Error leaving CPG!");
1581 return 0;
1584 int destroy_cluster_cpg(char *uuid)
1586 struct clog_cpg *del, *tmp;
1588 dm_list_iterate_items_safe(del, tmp, &clog_cpg_list)
1589 if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH))
1590 _destroy_cluster_cpg(del);
1592 return 0;
1595 int init_cluster(void)
1597 SaAisErrorT rv;
1599 dm_list_init(&clog_cpg_list);
1600 rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
1602 if (rv != SA_AIS_OK)
1603 return EXIT_CLUSTER_CKPT_INIT;
1605 return 0;
1608 void cleanup_cluster(void)
1610 SaAisErrorT err;
1612 err = saCkptFinalize(ckpt_handle);
1613 if (err != SA_AIS_OK)
1614 LOG_ERROR("Failed to finalize checkpoint handle");
1617 void cluster_debug(void)
1619 struct checkpoint_data *cp;
1620 struct clog_cpg *entry;
1621 struct clog_request *rq;
1622 int i;
1624 LOG_ERROR("");
1625 LOG_ERROR("CLUSTER COMPONENT DEBUGGING::");
1626 dm_list_iterate_items(entry, &clog_cpg_list) {
1627 LOG_ERROR("%s::", SHORT_UUID(entry->name.value));
1628 LOG_ERROR(" lowest_id : %u", entry->lowest_id);
1629 LOG_ERROR(" state : %s", (entry->state == INVALID) ?
1630 "INVALID" : (entry->state == VALID) ? "VALID" :
1631 (entry->state == LEAVING) ? "LEAVING" : "UNKNOWN");
1632 LOG_ERROR(" cpg_state : %d", entry->cpg_state);
1633 LOG_ERROR(" free_me : %d", entry->free_me);
1634 LOG_ERROR(" delay : %d", entry->delay);
1635 LOG_ERROR(" resend_requests : %d", entry->resend_requests);
1636 LOG_ERROR(" checkpoints_needed: %d", entry->checkpoints_needed);
1637 for (i = 0, cp = entry->checkpoint_list;
1638 i < MAX_CHECKPOINT_REQUESTERS; i++)
1639 if (cp)
1640 cp = cp->next;
1641 else
1642 break;
1643 LOG_ERROR(" CKPTs waiting : %d", i);
1644 LOG_ERROR(" Working list:");
1645 dm_list_iterate_items(rq, &entry->working_list)
1646 LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1647 rq->u_rq.seq);
1649 LOG_ERROR(" Startup list:");
1650 dm_list_iterate_items(rq, &entry->startup_list)
1651 LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1652 rq->u_rq.seq);
1654 LOG_ERROR("Command History:");
1655 for (i = 0; i < DEBUGGING_HISTORY; i++) {
1656 entry->idx++;
1657 entry->idx = entry->idx % DEBUGGING_HISTORY;
1658 if (entry->debugging[entry->idx][0] == '\0')
1659 continue;
1660 LOG_ERROR("%d:%d) %s", i, entry->idx,
1661 entry->debugging[entry->idx]);