4 * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
6 * This copyrighted material is made available to anyone wishing to use,
7 * modify, copy, or redistribute it subject to the terms and conditions
8 * of the GNU Lesser General Public License v.2.1.
10 * You should have received a copy of the GNU Lesser General Public License
11 * along with this program; if not, write to the Free Software Foundation,
12 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 #include <sys/types.h>
21 #include <sys/socket.h> /* These are for OpenAIS CPGs */
22 #include <sys/select.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/cpg.h>
28 #include <openais/saAis.h>
29 #include <openais/saCkpt.h>
31 #include "dm-log-userspace.h"
32 #include "libdevmapper.h"
33 #include "functions.h"
40 /* Open AIS error codes */
41 #define str_ais_error(x) \
42 ((x) == SA_AIS_OK) ? "SA_AIS_OK" : \
43 ((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \
44 ((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \
45 ((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \
46 ((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \
47 ((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \
48 ((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \
49 ((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \
50 ((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \
51 ((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \
52 ((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \
53 ((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \
54 ((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \
55 ((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \
56 ((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \
57 ((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \
58 ((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \
59 ((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \
60 ((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \
61 ((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \
62 ((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \
63 ((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \
64 ((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \
65 ((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \
66 ((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \
67 ((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \
68 ((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \
71 #define DM_ULOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
72 #define DM_ULOG_CHECKPOINT_READY 21
73 #define DM_ULOG_MEMBER_JOIN 22
76 ((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \
77 ((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN": \
78 RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
80 static uint32_t my_cluster_id
= 0xDEAD;
81 static SaCkptHandleT ckpt_handle
= 0;
82 static SaCkptCallbacksT callbacks
= { 0, 0 };
83 static SaVersionT version
= { 'B', 1, 1 };
85 #define DEBUGGING_HISTORY 100
86 //static char debugging[DEBUGGING_HISTORY][128];
88 #define LOG_SPRINT(cc, f, arg...) do { \
90 cc->idx = cc->idx % DEBUGGING_HISTORY; \
91 sprintf(cc->debugging[cc->idx], f, ## arg); \
94 static int log_resp_rec
= 0;
96 struct checkpoint_data
{
98 char uuid
[CPG_MAX_NAME_LENGTH
];
100 int bitmap_size
; /* in bytes */
103 char *recovering_region
;
104 struct checkpoint_data
*next
;
111 #define MAX_CHECKPOINT_REQUESTERS 10
117 struct cpg_name name
;
120 /* Are we the first, or have we received checkpoint? */
122 int cpg_state
; /* FIXME: debugging */
126 struct dm_list startup_list
;
127 struct dm_list working_list
;
129 int checkpoints_needed
;
130 uint32_t checkpoint_requesters
[MAX_CHECKPOINT_REQUESTERS
];
131 struct checkpoint_data
*checkpoint_list
;
133 char debugging
[DEBUGGING_HISTORY
][128];
136 static struct dm_list clog_cpg_list
;
142 * Returns: 0 on success, -Exxx on error
144 int cluster_send(struct clog_request
*rq
)
150 struct clog_cpg
*entry
;
152 dm_list_iterate_items(entry
, &clog_cpg_list
)
153 if (!strncmp(entry
->name
.value
, rq
->u_rq
.uuid
,
154 CPG_MAX_NAME_LENGTH
)) {
160 rq
->u_rq
.error
= -ENOENT
;
165 * Once the request heads for the cluster, the luid looses
171 iov
.iov_len
= sizeof(struct clog_request
) + rq
->u_rq
.data_size
;
173 if (entry
->cpg_state
!= VALID
)
177 r
= cpg_mcast_joined(entry
->handle
, CPG_TYPE_AGREED
, &iov
, 1);
178 if (r
!= SA_AIS_ERR_TRY_AGAIN
)
182 LOG_PRINT("[%s] Retry #%d of cpg_mcast_joined: %s",
183 SHORT_UUID(rq
->u_rq
.uuid
), count
,
185 else if ((count
< 100) && !(count
% 10))
186 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
187 SHORT_UUID(rq
->u_rq
.uuid
), count
,
189 else if ((count
< 1000) && !(count
% 100))
190 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
191 SHORT_UUID(rq
->u_rq
.uuid
), count
,
193 else if ((count
< 10000) && !(count
% 1000))
194 LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s - "
195 "OpenAIS not handling the load?",
196 SHORT_UUID(rq
->u_rq
.uuid
), count
,
204 /* error codes found in openais/cpg.h */
205 LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r
));
207 rq
->u_rq
.error
= -EBADE
;
211 static struct clog_request
*get_matching_rq(struct clog_request
*rq
,
214 struct clog_request
*match
, *n
;
216 dm_list_iterate_items_safe(match
, n
, l
)
217 if (match
->u_rq
.seq
== rq
->u_rq
.seq
) {
218 dm_list_del(&match
->list
);
225 static char rq_buffer
[DM_ULOG_REQUEST_SIZE
];
226 static int handle_cluster_request(struct clog_cpg
*entry
,
227 struct clog_request
*rq
, int server
)
230 struct clog_request
*tmp
= (struct clog_request
*)rq_buffer
;
233 * We need a separate dm_ulog_request struct, one that can carry
234 * a return payload. Otherwise, the memory address after
235 * rq will be altered - leading to problems
237 memset(rq_buffer
, 0, sizeof(rq_buffer
));
238 memcpy(tmp
, rq
, sizeof(struct clog_request
) + rq
->u_rq
.data_size
);
241 * With resumes, we only handle our own.
242 * Resume is a special case that requires
243 * local action (to set up CPG), followed by
244 * a cluster action to co-ordinate reading
245 * the disk and checkpointing
247 if (tmp
->u_rq
.request_type
== DM_ULOG_RESUME
) {
248 if (tmp
->originator
== my_cluster_id
) {
249 r
= do_request(tmp
, server
);
251 r
= kernel_send(&tmp
->u_rq
);
253 LOG_ERROR("Failed to send resume response to kernel");
258 r
= do_request(tmp
, server
);
261 (tmp
->u_rq
.request_type
!= DM_ULOG_CLEAR_REGION
) &&
262 (tmp
->u_rq
.request_type
!= DM_ULOG_POSTSUSPEND
)) {
263 tmp
->u_rq
.request_type
|= DM_ULOG_RESPONSE
;
266 * Errors from previous functions are in the rq struct.
268 r
= cluster_send(tmp
);
270 LOG_ERROR("cluster_send failed: %s", strerror(-r
));
276 static int handle_cluster_response(struct clog_cpg
*entry
,
277 struct clog_request
*rq
)
280 struct clog_request
*orig_rq
;
283 * If I didn't send it, then I don't care about the response
285 if (rq
->originator
!= my_cluster_id
)
288 rq
->u_rq
.request_type
&= ~DM_ULOG_RESPONSE
;
289 orig_rq
= get_matching_rq(rq
, &entry
->working_list
);
292 /* Unable to find match for response */
294 LOG_ERROR("[%s] No match for cluster response: %s:%u",
295 SHORT_UUID(rq
->u_rq
.uuid
),
296 _RQ_TYPE(rq
->u_rq
.request_type
),
299 LOG_ERROR("Current local list:");
300 if (dm_list_empty(&entry
->working_list
))
301 LOG_ERROR(" [none]");
303 dm_list_iterate_items(orig_rq
, &entry
->working_list
)
304 LOG_ERROR(" [%s] %s:%u",
305 SHORT_UUID(orig_rq
->u_rq
.uuid
),
306 _RQ_TYPE(orig_rq
->u_rq
.request_type
),
312 if (log_resp_rec
> 0) {
313 LOG_COND(log_resend_requests
,
314 "[%s] Response received to %s/#%u",
315 SHORT_UUID(rq
->u_rq
.uuid
),
316 _RQ_TYPE(rq
->u_rq
.request_type
),
321 /* FIXME: Ensure memcpy cannot explode */
322 memcpy(orig_rq
, rq
, sizeof(*rq
) + rq
->u_rq
.data_size
);
324 r
= kernel_send(&orig_rq
->u_rq
);
326 LOG_ERROR("Failed to send response to kernel");
332 static struct clog_cpg
*find_clog_cpg(cpg_handle_t handle
)
334 struct clog_cpg
*match
;
336 dm_list_iterate_items(match
, &clog_cpg_list
)
337 if (match
->handle
== handle
)
345 * @entry: clog_cpg describing the log
346 * @cp_requester: nodeid requesting the checkpoint
348 * Creates and fills in a new checkpoint_data struct.
350 * Returns: checkpoint_data on success, NULL on error
352 static struct checkpoint_data
*prepare_checkpoint(struct clog_cpg
*entry
,
353 uint32_t cp_requester
)
356 struct checkpoint_data
*new;
358 if (entry
->state
!= VALID
) {
360 * We can't store bitmaps yet, because the log is not
363 LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
368 new = malloc(sizeof(*new));
370 LOG_ERROR("Unable to create checkpoint data for %u",
374 memset(new, 0, sizeof(*new));
375 new->requester
= cp_requester
;
376 strncpy(new->uuid
, entry
->name
.value
, entry
->name
.length
);
378 new->bitmap_size
= push_state(entry
->name
.value
, entry
->luid
,
380 &new->clean_bits
, cp_requester
);
381 if (new->bitmap_size
<= 0) {
382 LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
388 new->bitmap_size
= push_state(entry
->name
.value
, entry
->luid
,
390 &new->sync_bits
, cp_requester
);
391 if (new->bitmap_size
<= 0) {
392 LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
394 free(new->clean_bits
);
399 r
= push_state(entry
->name
.value
, entry
->luid
,
401 &new->recovering_region
, cp_requester
);
403 LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
405 free(new->sync_bits
);
406 free(new->clean_bits
);
410 LOG_DBG("[%s] Checkpoint prepared for node %u:",
411 SHORT_UUID(new->uuid
), new->requester
);
412 LOG_DBG(" bitmap_size = %d", new->bitmap_size
);
419 * @cp: the checkpoint_data struct to free
422 static void free_checkpoint(struct checkpoint_data
*cp
)
424 free(cp
->recovering_region
);
426 free(cp
->clean_bits
);
430 static int export_checkpoint(struct checkpoint_data
*cp
)
432 SaCkptCheckpointCreationAttributesT attr
;
433 SaCkptCheckpointHandleT h
;
434 SaCkptSectionIdT section_id
;
435 SaCkptSectionCreationAttributesT section_attr
;
436 SaCkptCheckpointOpenFlagsT flags
;
439 struct clog_request
*rq
;
443 LOG_DBG("Sending checkpointed data to %u", cp
->requester
);
445 len
= snprintf((char *)(name
.value
), SA_MAX_NAME_LENGTH
,
446 "bitmaps_%s_%u", SHORT_UUID(cp
->uuid
), cp
->requester
);
449 len
= strlen(cp
->recovering_region
) + 1;
451 attr
.creationFlags
= SA_CKPT_WR_ALL_REPLICAS
;
452 attr
.checkpointSize
= cp
->bitmap_size
* 2 + len
;
454 attr
.retentionDuration
= SA_TIME_MAX
;
455 attr
.maxSections
= 4; /* don't know why we need +1 */
457 attr
.maxSectionSize
= (cp
->bitmap_size
> len
) ? cp
->bitmap_size
: len
;
458 attr
.maxSectionIdSize
= 22;
460 flags
= SA_CKPT_CHECKPOINT_READ
|
461 SA_CKPT_CHECKPOINT_WRITE
|
462 SA_CKPT_CHECKPOINT_CREATE
;
465 rv
= saCkptCheckpointOpen(ckpt_handle
, &name
, &attr
, flags
, 0, &h
);
466 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
467 LOG_ERROR("export_checkpoint: ckpt open retry");
472 if (rv
== SA_AIS_ERR_EXIST
) {
473 LOG_DBG("export_checkpoint: checkpoint already exists");
477 if (rv
!= SA_AIS_OK
) {
478 LOG_ERROR("[%s] Failed to open checkpoint for %u: %s",
479 SHORT_UUID(cp
->uuid
), cp
->requester
,
481 return -EIO
; /* FIXME: better error */
485 * Add section for sync_bits
487 section_id
.idLen
= snprintf(buf
, 32, "sync_bits");
488 section_id
.id
= (unsigned char *)buf
;
489 section_attr
.sectionId
= §ion_id
;
490 section_attr
.expirationTime
= SA_TIME_END
;
493 rv
= saCkptSectionCreate(h
, §ion_attr
,
494 cp
->sync_bits
, cp
->bitmap_size
);
495 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
496 LOG_ERROR("Sync checkpoint section create retry");
498 goto sync_create_retry
;
501 if (rv
== SA_AIS_ERR_EXIST
) {
502 LOG_DBG("Sync checkpoint section already exists");
503 saCkptCheckpointClose(h
);
507 if (rv
!= SA_AIS_OK
) {
508 LOG_ERROR("Sync checkpoint section creation failed: %s",
510 saCkptCheckpointClose(h
);
511 return -EIO
; /* FIXME: better error */
515 * Add section for clean_bits
517 section_id
.idLen
= snprintf(buf
, 32, "clean_bits");
518 section_id
.id
= (unsigned char *)buf
;
519 section_attr
.sectionId
= §ion_id
;
520 section_attr
.expirationTime
= SA_TIME_END
;
523 rv
= saCkptSectionCreate(h
, §ion_attr
, cp
->clean_bits
, cp
->bitmap_size
);
524 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
525 LOG_ERROR("Clean checkpoint section create retry");
527 goto clean_create_retry
;
530 if (rv
== SA_AIS_ERR_EXIST
) {
531 LOG_DBG("Clean checkpoint section already exists");
532 saCkptCheckpointClose(h
);
536 if (rv
!= SA_AIS_OK
) {
537 LOG_ERROR("Clean checkpoint section creation failed: %s",
539 saCkptCheckpointClose(h
);
540 return -EIO
; /* FIXME: better error */
544 * Add section for recovering_region
546 section_id
.idLen
= snprintf(buf
, 32, "recovering_region");
547 section_id
.id
= (unsigned char *)buf
;
548 section_attr
.sectionId
= §ion_id
;
549 section_attr
.expirationTime
= SA_TIME_END
;
552 rv
= saCkptSectionCreate(h
, §ion_attr
, cp
->recovering_region
,
553 strlen(cp
->recovering_region
) + 1);
554 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
555 LOG_ERROR("RR checkpoint section create retry");
557 goto rr_create_retry
;
560 if (rv
== SA_AIS_ERR_EXIST
) {
561 LOG_DBG("RR checkpoint section already exists");
562 saCkptCheckpointClose(h
);
566 if (rv
!= SA_AIS_OK
) {
567 LOG_ERROR("RR checkpoint section creation failed: %s",
569 saCkptCheckpointClose(h
);
570 return -EIO
; /* FIXME: better error */
573 LOG_DBG("export_checkpoint: closing checkpoint");
574 saCkptCheckpointClose(h
);
576 rq
= malloc(DM_ULOG_REQUEST_SIZE
);
578 LOG_ERROR("export_checkpoint: Unable to allocate transfer structs");
581 memset(rq
, 0, sizeof(*rq
));
583 dm_list_init(&rq
->list
);
584 rq
->u_rq
.request_type
= DM_ULOG_CHECKPOINT_READY
;
585 rq
->originator
= cp
->requester
; /* FIXME: hack to overload meaning of originator */
586 strncpy(rq
->u_rq
.uuid
, cp
->uuid
, CPG_MAX_NAME_LENGTH
);
587 rq
->u_rq
.seq
= my_cluster_id
;
589 r
= cluster_send(rq
);
591 LOG_ERROR("Failed to send checkpoint ready notice: %s",
598 static int import_checkpoint(struct clog_cpg
*entry
, int no_read
)
601 SaCkptCheckpointHandleT h
;
602 SaCkptSectionIterationHandleT itr
;
603 SaCkptSectionDescriptorT desc
;
604 SaCkptIOVectorElementT iov
;
610 bitmap
= malloc(1024*1024);
614 len
= snprintf((char *)(name
.value
), SA_MAX_NAME_LENGTH
, "bitmaps_%s_%u",
615 SHORT_UUID(entry
->name
.value
), my_cluster_id
);
619 rv
= saCkptCheckpointOpen(ckpt_handle
, &name
, NULL
,
620 SA_CKPT_CHECKPOINT_READ
, 0, &h
);
621 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
622 LOG_ERROR("import_checkpoint: ckpt open retry");
627 if (rv
!= SA_AIS_OK
) {
628 LOG_ERROR("[%s] Failed to open checkpoint: %s",
629 SHORT_UUID(entry
->name
.value
), str_ais_error(rv
));
630 return -EIO
; /* FIXME: better error */
634 rv
= saCkptCheckpointUnlink(ckpt_handle
, &name
);
635 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
636 LOG_ERROR("import_checkpoint: ckpt unlink retry");
642 LOG_DBG("Checkpoint for this log already received");
647 rv
= saCkptSectionIterationInitialize(h
, SA_CKPT_SECTIONS_ANY
,
649 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
650 LOG_ERROR("import_checkpoint: sync create retry");
655 if (rv
!= SA_AIS_OK
) {
656 LOG_ERROR("[%s] Sync checkpoint section creation failed: %s",
657 SHORT_UUID(entry
->name
.value
), str_ais_error(rv
));
658 return -EIO
; /* FIXME: better error */
663 rv
= saCkptSectionIterationNext(itr
, &desc
);
666 else if ((rv
== SA_AIS_ERR_NO_SECTIONS
) && len
)
668 else if (rv
!= SA_AIS_ERR_TRY_AGAIN
) {
669 LOG_ERROR("saCkptSectionIterationNext failure: %d", rv
);
673 saCkptSectionIterationFinalize(itr
);
675 LOG_ERROR("import_checkpoint: %d checkpoint sections found",
680 saCkptSectionIterationInitialize(h
, SA_CKPT_SECTIONS_ANY
,
684 rv
= saCkptSectionIterationNext(itr
, &desc
);
685 if (rv
== SA_AIS_ERR_NO_SECTIONS
)
688 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
689 LOG_ERROR("import_checkpoint: ckpt iternext retry");
694 if (rv
!= SA_AIS_OK
) {
695 LOG_ERROR("import_checkpoint: clean checkpoint section "
696 "creation failed: %s", str_ais_error(rv
));
697 rtn
= -EIO
; /* FIXME: better error */
701 if (!desc
.sectionSize
) {
702 LOG_ERROR("Checkpoint section empty");
706 memset(bitmap
, 0, sizeof(*bitmap
));
707 iov
.sectionId
= desc
.sectionId
;
708 iov
.dataBuffer
= bitmap
;
709 iov
.dataSize
= desc
.sectionSize
;
713 rv
= saCkptCheckpointRead(h
, &iov
, 1, NULL
);
714 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
715 LOG_ERROR("ckpt read retry");
720 if (rv
!= SA_AIS_OK
) {
721 LOG_ERROR("import_checkpoint: ckpt read error: %s",
723 rtn
= -EIO
; /* FIXME: better error */
728 if (pull_state(entry
->name
.value
, entry
->luid
,
729 (char *)desc
.sectionId
.id
, bitmap
,
731 LOG_ERROR("Error loading state");
736 /* Need to request new checkpoint */
743 saCkptSectionIterationFinalize(itr
);
745 saCkptCheckpointClose(h
);
751 static void do_checkpoints(struct clog_cpg
*entry
, int leaving
)
753 struct checkpoint_data
*cp
;
755 for (cp
= entry
->checkpoint_list
; cp
;) {
757 * FIXME: Check return code. Could send failure
758 * notice in rq in export_checkpoint function
759 * by setting rq->error
761 switch (export_checkpoint(cp
)) {
763 LOG_SPRINT(entry
, "[%s] Checkpoint for %u already handled%s",
764 SHORT_UUID(entry
->name
.value
), cp
->requester
,
765 (leaving
) ? "(L)": "");
766 LOG_COND(log_checkpoint
,
767 "[%s] Checkpoint for %u already handled%s",
768 SHORT_UUID(entry
->name
.value
), cp
->requester
,
769 (leaving
) ? "(L)": "");
770 entry
->checkpoint_list
= cp
->next
;
772 cp
= entry
->checkpoint_list
;
775 LOG_SPRINT(entry
, "[%s] Checkpoint data available for node %u%s",
776 SHORT_UUID(entry
->name
.value
), cp
->requester
,
777 (leaving
) ? "(L)": "");
778 LOG_COND(log_checkpoint
,
779 "[%s] Checkpoint data available for node %u%s",
780 SHORT_UUID(entry
->name
.value
), cp
->requester
,
781 (leaving
) ? "(L)": "");
782 entry
->checkpoint_list
= cp
->next
;
784 cp
= entry
->checkpoint_list
;
787 /* FIXME: Skipping will cause list corruption */
788 LOG_ERROR("[%s] Failed to export checkpoint for %u%s",
789 SHORT_UUID(entry
->name
.value
), cp
->requester
,
790 (leaving
) ? "(L)": "");
795 static int resend_requests(struct clog_cpg
*entry
)
798 struct clog_request
*rq
, *n
;
800 if (!entry
->resend_requests
|| entry
->delay
)
803 if (entry
->state
!= VALID
)
806 entry
->resend_requests
= 0;
808 dm_list_iterate_items_safe(rq
, n
, &entry
->working_list
) {
809 dm_list_del(&rq
->list
);
811 if (strcmp(entry
->name
.value
, rq
->u_rq
.uuid
)) {
812 LOG_ERROR("[%s] Stray request from another log (%s)",
813 SHORT_UUID(entry
->name
.value
),
814 SHORT_UUID(rq
->u_rq
.uuid
));
819 switch (rq
->u_rq
.request_type
) {
820 case DM_ULOG_SET_REGION_SYNC
:
822 * Some requests simply do not need to be resent.
823 * If it is a request that just changes log state,
824 * then it doesn't need to be resent (everyone makes
827 LOG_COND(log_resend_requests
,
828 "[%s] Skipping resend of %s/#%u...",
829 SHORT_UUID(entry
->name
.value
),
830 _RQ_TYPE(rq
->u_rq
.request_type
),
832 LOG_SPRINT(entry
, "### No resend: [%s] %s/%u ###",
833 SHORT_UUID(entry
->name
.value
),
834 _RQ_TYPE(rq
->u_rq
.request_type
),
837 rq
->u_rq
.data_size
= 0;
838 kernel_send(&rq
->u_rq
);
844 * If an action or a response is required, then
845 * the request must be resent.
847 LOG_COND(log_resend_requests
,
848 "[%s] Resending %s(#%u) due to new server(%u)",
849 SHORT_UUID(entry
->name
.value
),
850 _RQ_TYPE(rq
->u_rq
.request_type
),
851 rq
->u_rq
.seq
, entry
->lowest_id
);
852 LOG_SPRINT(entry
, "*** Resending: [%s] %s/%u ***",
853 SHORT_UUID(entry
->name
.value
),
854 _RQ_TYPE(rq
->u_rq
.request_type
),
856 r
= cluster_send(rq
);
858 LOG_ERROR("Failed resend");
866 static int do_cluster_work(void *data
)
869 struct clog_cpg
*entry
;
871 dm_list_iterate_items(entry
, &clog_cpg_list
) {
872 r
= cpg_dispatch(entry
->handle
, CPG_DISPATCH_ALL
);
874 LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r
));
876 if (entry
->free_me
) {
880 do_checkpoints(entry
, 0);
882 resend_requests(entry
);
885 return (r
== SA_AIS_OK
) ? 0 : -1; /* FIXME: good error number? */
888 static int flush_startup_list(struct clog_cpg
*entry
)
892 struct clog_request
*rq
, *n
;
893 struct checkpoint_data
*new;
895 dm_list_iterate_items_safe(rq
, n
, &entry
->startup_list
) {
896 dm_list_del(&rq
->list
);
898 if (rq
->u_rq
.request_type
== DM_ULOG_MEMBER_JOIN
) {
899 new = prepare_checkpoint(entry
, rq
->originator
);
902 * FIXME: Need better error handling. Other nodes
903 * will be trying to send the checkpoint too, and we
904 * must continue processing the list; so report error
907 LOG_ERROR("Failed to prepare checkpoint for %u!!!",
912 LOG_SPRINT(entry
, "[%s] Checkpoint prepared for %u",
913 SHORT_UUID(entry
->name
.value
), rq
->originator
);
914 LOG_COND(log_checkpoint
, "[%s] Checkpoint prepared for %u",
915 SHORT_UUID(entry
->name
.value
), rq
->originator
);
916 new->next
= entry
->checkpoint_list
;
917 entry
->checkpoint_list
= new;
919 LOG_DBG("[%s] Processing delayed request: %s",
920 SHORT_UUID(rq
->u_rq
.uuid
),
921 _RQ_TYPE(rq
->u_rq
.request_type
));
922 i_was_server
= (rq
->pit_server
== my_cluster_id
) ? 1 : 0;
923 r
= handle_cluster_request(entry
, rq
, i_was_server
);
927 * FIXME: If we error out here, we will never get
928 * another opportunity to retry these requests
930 LOG_ERROR("Error while processing delayed CPG message");
938 static void cpg_message_callback(cpg_handle_t handle
, const struct cpg_name
*gname
,
939 uint32_t nodeid
, uint32_t pid
,
940 void *msg
, size_t msg_len
)
946 struct clog_request
*rq
= msg
;
947 struct clog_request
*tmp_rq
;
948 struct clog_cpg
*match
;
950 match
= find_clog_cpg(handle
);
952 LOG_ERROR("Unable to find clog_cpg for cluster message");
956 if ((nodeid
== my_cluster_id
) &&
957 !(rq
->u_rq
.request_type
& DM_ULOG_RESPONSE
) &&
958 (rq
->u_rq
.request_type
!= DM_ULOG_RESUME
) &&
959 (rq
->u_rq
.request_type
!= DM_ULOG_CLEAR_REGION
) &&
960 (rq
->u_rq
.request_type
!= DM_ULOG_CHECKPOINT_READY
)) {
961 tmp_rq
= malloc(DM_ULOG_REQUEST_SIZE
);
964 * FIXME: It may be possible to continue... but we
965 * would not be able to resend any messages that might
966 * be necessary during membership changes
968 LOG_ERROR("[%s] Unable to record request: -ENOMEM",
969 SHORT_UUID(rq
->u_rq
.uuid
));
972 memcpy(tmp_rq
, rq
, sizeof(*rq
) + rq
->u_rq
.data_size
);
973 dm_list_init(&tmp_rq
->list
);
974 dm_list_add( &match
->working_list
, &tmp_rq
->list
);
977 if (rq
->u_rq
.request_type
== DM_ULOG_POSTSUSPEND
) {
979 * If the server (lowest_id) indicates it is leaving,
980 * then we must resend any outstanding requests. However,
981 * we do not want to resend them if the next server in
982 * line is in the process of leaving.
984 if (nodeid
== my_cluster_id
) {
985 LOG_COND(log_resend_requests
, "[%s] I am leaving.1.....",
986 SHORT_UUID(rq
->u_rq
.uuid
));
988 if (nodeid
< my_cluster_id
) {
989 if (nodeid
== match
->lowest_id
) {
990 match
->resend_requests
= 1;
991 LOG_COND(log_resend_requests
, "[%s] %u is leaving, resend required%s",
992 SHORT_UUID(rq
->u_rq
.uuid
), nodeid
,
993 (dm_list_empty(&match
->working_list
)) ? " -- working_list empty": "");
995 dm_list_iterate_items(tmp_rq
, &match
->working_list
)
996 LOG_COND(log_resend_requests
,
998 SHORT_UUID(tmp_rq
->u_rq
.uuid
),
999 _RQ_TYPE(tmp_rq
->u_rq
.request_type
),
1004 LOG_COND(log_resend_requests
, "[%s] %u is leaving, delay = %d",
1005 SHORT_UUID(rq
->u_rq
.uuid
), nodeid
, match
->delay
);
1007 rq
->originator
= nodeid
; /* don't really need this, but nice for debug */
1013 * We can receive messages after we do a cpg_leave but before we
1014 * get our config callback. However, since we can't respond after
1015 * leaving, we simply return.
1017 if (match
->state
== LEAVING
)
1020 i_am_server
= (my_cluster_id
== match
->lowest_id
) ? 1 : 0;
1022 if (rq
->u_rq
.request_type
== DM_ULOG_CHECKPOINT_READY
) {
1023 if (my_cluster_id
== rq
->originator
) {
1024 /* Redundant checkpoints ignored if match->valid */
1025 LOG_SPRINT(match
, "[%s] CHECKPOINT_READY notification from %u",
1026 SHORT_UUID(rq
->u_rq
.uuid
), nodeid
);
1027 if (import_checkpoint(match
, (match
->state
!= INVALID
))) {
1029 "[%s] Failed to import checkpoint from %u",
1030 SHORT_UUID(rq
->u_rq
.uuid
), nodeid
);
1031 LOG_ERROR("[%s] Failed to import checkpoint from %u",
1032 SHORT_UUID(rq
->u_rq
.uuid
), nodeid
);
1033 kill(getpid(), SIGUSR1
);
1034 /* Could we retry? */
1036 } else if (match
->state
== INVALID
) {
1038 "[%s] Checkpoint data received from %u. Log is now valid",
1039 SHORT_UUID(match
->name
.value
), nodeid
);
1040 LOG_COND(log_checkpoint
,
1041 "[%s] Checkpoint data received from %u. Log is now valid",
1042 SHORT_UUID(match
->name
.value
), nodeid
);
1043 match
->state
= VALID
;
1045 flush_startup_list(match
);
1048 "[%s] Redundant checkpoint from %u ignored.",
1049 SHORT_UUID(rq
->u_rq
.uuid
), nodeid
);
1055 if (rq
->u_rq
.request_type
& DM_ULOG_RESPONSE
) {
1057 r
= handle_cluster_response(match
, rq
);
1059 rq
->originator
= nodeid
;
1061 if (match
->state
== LEAVING
) {
1062 LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving",
1063 SHORT_UUID(rq
->u_rq
.uuid
), _RQ_TYPE(rq
->u_rq
.request_type
),
1068 if (match
->state
== INVALID
) {
1069 LOG_DBG("Log not valid yet, storing request");
1070 tmp_rq
= malloc(DM_ULOG_REQUEST_SIZE
);
1072 LOG_ERROR("cpg_message_callback: Unable to"
1073 " allocate transfer structs");
1074 r
= -ENOMEM
; /* FIXME: Better error #? */
1078 memcpy(tmp_rq
, rq
, sizeof(*rq
) + rq
->u_rq
.data_size
);
1079 tmp_rq
->pit_server
= match
->lowest_id
;
1080 dm_list_init(&tmp_rq
->list
);
1081 dm_list_add(&match
->startup_list
, &tmp_rq
->list
);
1085 r
= handle_cluster_request(match
, rq
, i_am_server
);
1089 * If the log is now valid, we can queue the checkpoints
1091 for (i
= match
->checkpoints_needed
; i
; ) {
1092 struct checkpoint_data
*new;
1094 if (log_get_state(&rq
->u_rq
) != LOG_RESUMED
) {
1095 LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)",
1096 SHORT_UUID(rq
->u_rq
.uuid
), _RQ_TYPE(rq
->u_rq
.request_type
), nodeid
);
1101 new = prepare_checkpoint(match
, match
->checkpoint_requesters
[i
]);
1103 /* FIXME: Need better error handling */
1104 LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
1105 SHORT_UUID(rq
->u_rq
.uuid
), match
->checkpoint_requesters
[i
]);
1108 LOG_SPRINT(match
, "[%s] Checkpoint prepared for %u* (%s)",
1109 SHORT_UUID(rq
->u_rq
.uuid
), match
->checkpoint_requesters
[i
],
1110 (log_get_state(&rq
->u_rq
) != LOG_RESUMED
)? "LOG_RESUMED": "LOG_SUSPENDED");
1111 LOG_COND(log_checkpoint
, "[%s] Checkpoint prepared for %u*",
1112 SHORT_UUID(rq
->u_rq
.uuid
), match
->checkpoint_requesters
[i
]);
1113 match
->checkpoints_needed
--;
1115 new->next
= match
->checkpoint_list
;
1116 match
->checkpoint_list
= new;
1120 /* nothing happens after this point. It is just for debugging */
1122 LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
1123 SHORT_UUID(rq
->u_rq
.uuid
),
1124 _RQ_TYPE(rq
->u_rq
.request_type
& ~DM_ULOG_RESPONSE
),
1126 LOG_ERROR("[%s] Response : %s", SHORT_UUID(rq
->u_rq
.uuid
),
1127 (response
) ? "YES" : "NO");
1128 LOG_ERROR("[%s] Originator: %u",
1129 SHORT_UUID(rq
->u_rq
.uuid
), rq
->originator
);
1131 LOG_ERROR("[%s] Responder : %u",
1132 SHORT_UUID(rq
->u_rq
.uuid
), nodeid
);
1134 LOG_ERROR("HISTORY::");
1135 for (i
= 0; i
< DEBUGGING_HISTORY
; i
++) {
1137 match
->idx
= match
->idx
% DEBUGGING_HISTORY
;
1138 if (match
->debugging
[match
->idx
][0] == '\0')
1140 LOG_ERROR("%d:%d) %s", i
, match
->idx
,
1141 match
->debugging
[match
->idx
]);
1143 } else if (!(rq
->u_rq
.request_type
& DM_ULOG_RESPONSE
) ||
1144 (rq
->originator
== my_cluster_id
)) {
1146 LOG_SPRINT(match
, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
1147 rq
->u_rq
.seq
, SHORT_UUID(rq
->u_rq
.uuid
),
1148 _RQ_TYPE(rq
->u_rq
.request_type
),
1149 rq
->originator
, (response
) ? "YES" : "NO");
1151 LOG_SPRINT(match
, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u",
1152 rq
->u_rq
.seq
, SHORT_UUID(rq
->u_rq
.uuid
),
1153 _RQ_TYPE(rq
->u_rq
.request_type
),
1154 rq
->originator
, (response
) ? "YES" : "NO",
1159 static void cpg_join_callback(struct clog_cpg
*match
,
1160 const struct cpg_address
*joined
,
1161 const struct cpg_address
*member_list
,
1162 size_t member_list_entries
)
1165 int my_pid
= getpid();
1166 uint32_t lowest
= match
->lowest_id
;
1167 struct clog_request
*rq
;
1170 /* Assign my_cluster_id */
1171 if ((my_cluster_id
== 0xDEAD) && (joined
->pid
== my_pid
))
1172 my_cluster_id
= joined
->nodeid
;
1174 /* Am I the very first to join? */
1175 if (member_list_entries
== 1) {
1176 match
->lowest_id
= joined
->nodeid
;
1177 match
->state
= VALID
;
1180 /* If I am part of the joining list, I do not send checkpoints */
1181 if (joined
->nodeid
== my_cluster_id
)
1184 memset(dbuf
, 0, sizeof(dbuf
));
1185 for (i
= 0; i
< (member_list_entries
-1); i
++)
1186 sprintf(dbuf
+strlen(dbuf
), "%u-", member_list
[i
].nodeid
);
1187 sprintf(dbuf
+strlen(dbuf
), "(%u)", joined
->nodeid
);
1188 LOG_COND(log_checkpoint
, "[%s] Joining node, %u needs checkpoint [%s]",
1189 SHORT_UUID(match
->name
.value
), joined
->nodeid
, dbuf
);
1192 * FIXME: remove checkpoint_requesters/checkpoints_needed, and use
1193 * the startup_list interface exclusively
1195 if (dm_list_empty(&match
->startup_list
) && (match
->state
== VALID
) &&
1196 (match
->checkpoints_needed
< MAX_CHECKPOINT_REQUESTERS
)) {
1197 match
->checkpoint_requesters
[match
->checkpoints_needed
++] = joined
->nodeid
;
1201 rq
= malloc(DM_ULOG_REQUEST_SIZE
);
1203 LOG_ERROR("cpg_config_callback: "
1204 "Unable to allocate transfer structs");
1205 LOG_ERROR("cpg_config_callback: "
1206 "Unable to perform checkpoint");
1209 rq
->u_rq
.request_type
= DM_ULOG_MEMBER_JOIN
;
1210 rq
->originator
= joined
->nodeid
;
1211 dm_list_init(&rq
->list
);
1212 dm_list_add(&match
->startup_list
, &rq
->list
);
1215 /* Find the lowest_id, i.e. the server */
1216 match
->lowest_id
= member_list
[0].nodeid
;
1217 for (i
= 0; i
< member_list_entries
; i
++)
1218 if (match
->lowest_id
> member_list
[i
].nodeid
)
1219 match
->lowest_id
= member_list
[i
].nodeid
;
1221 if (lowest
== 0xDEAD)
1222 LOG_COND(log_membership_change
, "[%s] Server change <none> -> %u (%u %s)",
1223 SHORT_UUID(match
->name
.value
), match
->lowest_id
,
1224 joined
->nodeid
, (member_list_entries
== 1) ?
1225 "is first to join" : "joined");
1226 else if (lowest
!= match
->lowest_id
)
1227 LOG_COND(log_membership_change
, "[%s] Server change %u -> %u (%u joined)",
1228 SHORT_UUID(match
->name
.value
), lowest
,
1229 match
->lowest_id
, joined
->nodeid
);
1231 LOG_COND(log_membership_change
, "[%s] Server unchanged at %u (%u joined)",
1232 SHORT_UUID(match
->name
.value
),
1233 lowest
, joined
->nodeid
);
1234 LOG_SPRINT(match
, "+++ UUID=%s %u join +++",
1235 SHORT_UUID(match
->name
.value
), joined
->nodeid
);
1238 static void cpg_leave_callback(struct clog_cpg
*match
,
1239 const struct cpg_address
*left
,
1240 const struct cpg_address
*member_list
,
1241 size_t member_list_entries
)
1244 uint32_t lowest
= match
->lowest_id
;
1245 struct clog_request
*rq
, *n
;
1246 struct checkpoint_data
*p_cp
, *c_cp
;
1248 LOG_SPRINT(match
, "--- UUID=%s %u left ---",
1249 SHORT_UUID(match
->name
.value
), left
->nodeid
);
1252 if (my_cluster_id
== left
->nodeid
) {
1253 LOG_DBG("Finalizing leave...");
1254 dm_list_del(&match
->list
);
1256 cpg_fd_get(match
->handle
, &fd
);
1257 links_unregister(fd
);
1259 cluster_postsuspend(match
->name
.value
, match
->luid
);
1261 dm_list_iterate_items_safe(rq
, n
, &match
->working_list
) {
1262 dm_list_del(&rq
->list
);
1264 if (rq
->u_rq
.request_type
== DM_ULOG_POSTSUSPEND
)
1265 kernel_send(&rq
->u_rq
);
1269 cpg_finalize(match
->handle
);
1272 match
->lowest_id
= 0xDEAD;
1273 match
->state
= INVALID
;
1276 /* Remove any pending checkpoints for the leaving node. */
1277 for (p_cp
= NULL
, c_cp
= match
->checkpoint_list
;
1278 c_cp
&& (c_cp
->requester
!= left
->nodeid
);
1279 p_cp
= c_cp
, c_cp
= c_cp
->next
);
1282 p_cp
->next
= c_cp
->next
;
1284 match
->checkpoint_list
= c_cp
->next
;
1286 LOG_COND(log_checkpoint
,
1287 "[%s] Removing pending checkpoint (%u is leaving)",
1288 SHORT_UUID(match
->name
.value
), left
->nodeid
);
1289 free_checkpoint(c_cp
);
1291 dm_list_iterate_items_safe(rq
, n
, &match
->startup_list
) {
1292 if ((rq
->u_rq
.request_type
== DM_ULOG_MEMBER_JOIN
) &&
1293 (rq
->originator
== left
->nodeid
)) {
1294 LOG_COND(log_checkpoint
,
1295 "[%s] Removing pending ckpt from startup list (%u is leaving)",
1296 SHORT_UUID(match
->name
.value
), left
->nodeid
);
1297 dm_list_del(&rq
->list
);
1301 for (i
= 0, j
= 0; i
< match
->checkpoints_needed
; i
++, j
++) {
1302 match
->checkpoint_requesters
[j
] = match
->checkpoint_requesters
[i
];
1303 if (match
->checkpoint_requesters
[i
] == left
->nodeid
) {
1304 LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)",
1305 SHORT_UUID(match
->name
.value
), left
->nodeid
);
1309 match
->checkpoints_needed
= j
;
1311 if (left
->nodeid
< my_cluster_id
) {
1312 match
->delay
= (match
->delay
> 0) ? match
->delay
- 1 : 0;
1313 if (!match
->delay
&& dm_list_empty(&match
->working_list
))
1314 match
->resend_requests
= 0;
1315 LOG_COND(log_resend_requests
, "[%s] %u has left, delay = %d%s",
1316 SHORT_UUID(match
->name
.value
), left
->nodeid
,
1317 match
->delay
, (dm_list_empty(&match
->working_list
)) ?
1318 " -- working_list empty": "");
1321 /* Find the lowest_id, i.e. the server */
1322 if (!member_list_entries
) {
1323 match
->lowest_id
= 0xDEAD;
1324 LOG_COND(log_membership_change
, "[%s] Server change %u -> <none> "
1325 "(%u is last to leave)",
1326 SHORT_UUID(match
->name
.value
), left
->nodeid
,
1331 match
->lowest_id
= member_list
[0].nodeid
;
1332 for (i
= 0; i
< member_list_entries
; i
++)
1333 if (match
->lowest_id
> member_list
[i
].nodeid
)
1334 match
->lowest_id
= member_list
[i
].nodeid
;
1336 if (lowest
!= match
->lowest_id
) {
1337 LOG_COND(log_membership_change
, "[%s] Server change %u -> %u (%u left)",
1338 SHORT_UUID(match
->name
.value
), lowest
,
1339 match
->lowest_id
, left
->nodeid
);
1341 LOG_COND(log_membership_change
, "[%s] Server unchanged at %u (%u left)",
1342 SHORT_UUID(match
->name
.value
), lowest
, left
->nodeid
);
1344 if ((match
->state
== INVALID
) && !match
->free_me
) {
1346 * If all CPG members are waiting for checkpoints and they
1347 * are all present in my startup_list, then I was the first to
1348 * join and I must assume control.
1350 * We do not normally end up here, but if there was a quick
1351 * 'resume -> suspend -> resume' across the cluster, we may
1352 * have initially thought we were not the first to join because
1353 * of the presence of out-going (and unable to respond) members.
1356 i
= 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */
1357 dm_list_iterate_items(rq
, &match
->startup_list
)
1358 if (rq
->u_rq
.request_type
== DM_ULOG_MEMBER_JOIN
)
1361 if (i
== member_list_entries
) {
1363 * Last node who could have given me a checkpoint just left.
1364 * Setting log state to VALID and acting as 'first join'.
1366 match
->state
= VALID
;
1367 flush_startup_list(match
);
1372 static void cpg_config_callback(cpg_handle_t handle
, const struct cpg_name
*gname
,
1373 const struct cpg_address
*member_list
,
1374 size_t member_list_entries
,
1375 const struct cpg_address
*left_list
,
1376 size_t left_list_entries
,
1377 const struct cpg_address
*joined_list
,
1378 size_t joined_list_entries
)
1380 struct clog_cpg
*match
;
1383 dm_list_iterate_items(match
, &clog_cpg_list
)
1384 if (match
->handle
== handle
) {
1390 LOG_ERROR("Unable to find match for CPG config callback");
1394 if ((joined_list_entries
+ left_list_entries
) > 1)
1395 LOG_ERROR("[%s] More than one node joining/leaving",
1396 SHORT_UUID(match
->name
.value
));
1398 if (joined_list_entries
)
1399 cpg_join_callback(match
, joined_list
,
1400 member_list
, member_list_entries
);
1402 cpg_leave_callback(match
, left_list
,
1403 member_list
, member_list_entries
);
1406 cpg_callbacks_t cpg_callbacks
= {
1407 .cpg_deliver_fn
= cpg_message_callback
,
1408 .cpg_confchg_fn
= cpg_config_callback
,
1415 * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error
1417 int remove_checkpoint(struct clog_cpg
*entry
)
1422 SaCkptCheckpointHandleT h
;
1424 len
= snprintf((char *)(name
.value
), SA_MAX_NAME_LENGTH
, "bitmaps_%s_%u",
1425 SHORT_UUID(entry
->name
.value
), my_cluster_id
);
1429 rv
= saCkptCheckpointOpen(ckpt_handle
, &name
, NULL
,
1430 SA_CKPT_CHECKPOINT_READ
, 0, &h
);
1431 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
1432 LOG_ERROR("abort_startup: ckpt open retry");
1437 if (rv
!= SA_AIS_OK
)
1440 LOG_DBG("[%s] Removing checkpoint", SHORT_UUID(entry
->name
.value
));
1442 rv
= saCkptCheckpointUnlink(ckpt_handle
, &name
);
1443 if (rv
== SA_AIS_ERR_TRY_AGAIN
) {
1444 LOG_ERROR("abort_startup: ckpt unlink retry");
1449 if (rv
!= SA_AIS_OK
) {
1450 LOG_ERROR("[%s] Failed to unlink checkpoint: %s",
1451 SHORT_UUID(entry
->name
.value
), str_ais_error(rv
));
1455 saCkptCheckpointClose(h
);
1460 int create_cluster_cpg(char *uuid
, uint64_t luid
)
1464 struct clog_cpg
*new = NULL
;
1465 struct clog_cpg
*tmp
;
1467 dm_list_iterate_items(tmp
, &clog_cpg_list
)
1468 if (!strncmp(tmp
->name
.value
, uuid
, CPG_MAX_NAME_LENGTH
)) {
1469 LOG_ERROR("Log entry already exists: %s", uuid
);
1473 new = malloc(sizeof(*new));
1475 LOG_ERROR("Unable to allocate memory for clog_cpg");
1478 memset(new, 0, sizeof(*new));
1479 dm_list_init(&new->list
);
1480 new->lowest_id
= 0xDEAD;
1481 dm_list_init(&new->startup_list
);
1482 dm_list_init(&new->working_list
);
1484 size
= ((strlen(uuid
) + 1) > CPG_MAX_NAME_LENGTH
) ?
1485 CPG_MAX_NAME_LENGTH
: (strlen(uuid
) + 1);
1486 strncpy(new->name
.value
, uuid
, size
);
1487 new->name
.length
= size
;
1491 * Ensure there are no stale checkpoints around before we join
1493 if (remove_checkpoint(new) == 1)
1494 LOG_COND(log_checkpoint
,
1495 "[%s] Removing checkpoints left from previous session",
1496 SHORT_UUID(new->name
.value
));
1498 r
= cpg_initialize(&new->handle
, &cpg_callbacks
);
1499 if (r
!= SA_AIS_OK
) {
1500 LOG_ERROR("cpg_initialize failed: Cannot join cluster");
1505 r
= cpg_join(new->handle
, &new->name
);
1506 if (r
!= SA_AIS_OK
) {
1507 LOG_ERROR("cpg_join failed: Cannot join cluster");
1512 new->cpg_state
= VALID
;
1513 dm_list_add(&clog_cpg_list
, &new->list
);
1514 LOG_DBG("New handle: %llu", (unsigned long long)new->handle
);
1515 LOG_DBG("New name: %s", new->name
.value
);
1517 /* FIXME: better variable */
1518 cpg_fd_get(new->handle
, &r
);
1519 links_register(r
, "cluster", do_cluster_work
, NULL
);
1524 static void abort_startup(struct clog_cpg
*del
)
1526 struct clog_request
*rq
, *n
;
1528 LOG_DBG("[%s] CPG teardown before checkpoint received",
1529 SHORT_UUID(del
->name
.value
));
1531 dm_list_iterate_items_safe(rq
, n
, &del
->startup_list
) {
1532 dm_list_del(&rq
->list
);
1534 LOG_DBG("[%s] Ignoring request from %u: %s",
1535 SHORT_UUID(del
->name
.value
), rq
->originator
,
1536 _RQ_TYPE(rq
->u_rq
.request_type
));
1540 remove_checkpoint(del
);
1543 static int _destroy_cluster_cpg(struct clog_cpg
*del
)
1548 LOG_COND(log_resend_requests
, "[%s] I am leaving.2.....",
1549 SHORT_UUID(del
->name
.value
));
1552 * We must send any left over checkpoints before
1553 * leaving. If we don't, an incoming node could
1554 * be stuck with no checkpoint and stall.
1555 do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
1557 - Incoming node deletes old checkpoints before joining
1558 - A stale checkpoint is issued here by leaving node
1559 - (leaving node leaves)
1560 - Incoming node joins cluster and finds stale checkpoint.
1561 - (leaving node leaves - option 2)
1563 do_checkpoints(del
, 1);
1567 del
->cpg_state
= INVALID
;
1568 del
->state
= LEAVING
;
1571 * If the state is VALID, we might be processing the
1572 * startup list. If so, we certainly don't want to
1573 * clear the startup_list here by calling abort_startup
1575 if (!dm_list_empty(&del
->startup_list
) && (state
!= VALID
))
1578 r
= cpg_leave(del
->handle
, &del
->name
);
1580 LOG_ERROR("Error leaving CPG!");
1584 int destroy_cluster_cpg(char *uuid
)
1586 struct clog_cpg
*del
, *tmp
;
1588 dm_list_iterate_items_safe(del
, tmp
, &clog_cpg_list
)
1589 if (!strncmp(del
->name
.value
, uuid
, CPG_MAX_NAME_LENGTH
))
1590 _destroy_cluster_cpg(del
);
1595 int init_cluster(void)
1599 dm_list_init(&clog_cpg_list
);
1600 rv
= saCkptInitialize(&ckpt_handle
, &callbacks
, &version
);
1602 if (rv
!= SA_AIS_OK
)
1603 return EXIT_CLUSTER_CKPT_INIT
;
1608 void cleanup_cluster(void)
1612 err
= saCkptFinalize(ckpt_handle
);
1613 if (err
!= SA_AIS_OK
)
1614 LOG_ERROR("Failed to finalize checkpoint handle");
1617 void cluster_debug(void)
1619 struct checkpoint_data
*cp
;
1620 struct clog_cpg
*entry
;
1621 struct clog_request
*rq
;
1625 LOG_ERROR("CLUSTER COMPONENT DEBUGGING::");
1626 dm_list_iterate_items(entry
, &clog_cpg_list
) {
1627 LOG_ERROR("%s::", SHORT_UUID(entry
->name
.value
));
1628 LOG_ERROR(" lowest_id : %u", entry
->lowest_id
);
1629 LOG_ERROR(" state : %s", (entry
->state
== INVALID
) ?
1630 "INVALID" : (entry
->state
== VALID
) ? "VALID" :
1631 (entry
->state
== LEAVING
) ? "LEAVING" : "UNKNOWN");
1632 LOG_ERROR(" cpg_state : %d", entry
->cpg_state
);
1633 LOG_ERROR(" free_me : %d", entry
->free_me
);
1634 LOG_ERROR(" delay : %d", entry
->delay
);
1635 LOG_ERROR(" resend_requests : %d", entry
->resend_requests
);
1636 LOG_ERROR(" checkpoints_needed: %d", entry
->checkpoints_needed
);
1637 for (i
= 0, cp
= entry
->checkpoint_list
;
1638 i
< MAX_CHECKPOINT_REQUESTERS
; i
++)
1643 LOG_ERROR(" CKPTs waiting : %d", i
);
1644 LOG_ERROR(" Working list:");
1645 dm_list_iterate_items(rq
, &entry
->working_list
)
1646 LOG_ERROR(" %s/%u", _RQ_TYPE(rq
->u_rq
.request_type
),
1649 LOG_ERROR(" Startup list:");
1650 dm_list_iterate_items(rq
, &entry
->startup_list
)
1651 LOG_ERROR(" %s/%u", _RQ_TYPE(rq
->u_rq
.request_type
),
1654 LOG_ERROR("Command History:");
1655 for (i
= 0; i
< DEBUGGING_HISTORY
; i
++) {
1657 entry
->idx
= entry
->idx
% DEBUGGING_HISTORY
;
1658 if (entry
->debugging
[entry
->idx
][0] == '\0')
1660 LOG_ERROR("%d:%d) %s", i
, entry
->idx
,
1661 entry
->debugging
[entry
->idx
]);