1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
27 static int rcom_response(struct dlm_ls
*ls
)
29 return test_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
32 static int create_rcom(struct dlm_ls
*ls
, int to_nodeid
, int type
, int len
,
33 struct dlm_rcom
**rc_ret
, struct dlm_mhandle
**mh_ret
)
36 struct dlm_mhandle
*mh
;
38 int mb_len
= sizeof(struct dlm_rcom
) + len
;
40 mh
= dlm_lowcomms_get_buffer(to_nodeid
, mb_len
, GFP_NOFS
, &mb
);
42 log_print("create_rcom to %d type %d len %d ENOBUFS",
43 to_nodeid
, type
, len
);
46 memset(mb
, 0, mb_len
);
48 rc
= (struct dlm_rcom
*) mb
;
50 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
51 rc
->rc_header
.h_lockspace
= ls
->ls_global_id
;
52 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
53 rc
->rc_header
.h_length
= mb_len
;
54 rc
->rc_header
.h_cmd
= DLM_RCOM
;
58 spin_lock(&ls
->ls_recover_lock
);
59 rc
->rc_seq
= ls
->ls_recover_seq
;
60 spin_unlock(&ls
->ls_recover_lock
);
67 static void send_rcom(struct dlm_ls
*ls
, struct dlm_mhandle
*mh
,
71 dlm_lowcomms_commit_buffer(mh
);
74 static void set_rcom_status(struct dlm_ls
*ls
, struct rcom_status
*rs
,
77 rs
->rs_flags
= cpu_to_le32(flags
);
80 /* When replying to a status request, a node also sends back its
81 configuration values. The requesting node then checks that the remote
82 node is configured the same way as itself. */
84 static void set_rcom_config(struct dlm_ls
*ls
, struct rcom_config
*rf
,
87 rf
->rf_lvblen
= cpu_to_le32(ls
->ls_lvblen
);
88 rf
->rf_lsflags
= cpu_to_le32(ls
->ls_exflags
);
90 rf
->rf_our_slot
= cpu_to_le16(ls
->ls_slot
);
91 rf
->rf_num_slots
= cpu_to_le16(num_slots
);
92 rf
->rf_generation
= cpu_to_le32(ls
->ls_generation
);
95 static int check_rcom_config(struct dlm_ls
*ls
, struct dlm_rcom
*rc
, int nodeid
)
97 struct rcom_config
*rf
= (struct rcom_config
*) rc
->rc_buf
;
99 if ((rc
->rc_header
.h_version
& 0xFFFF0000) != DLM_HEADER_MAJOR
) {
100 log_error(ls
, "version mismatch: %x nodeid %d: %x",
101 DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
, nodeid
,
102 rc
->rc_header
.h_version
);
106 if (le32_to_cpu(rf
->rf_lvblen
) != ls
->ls_lvblen
||
107 le32_to_cpu(rf
->rf_lsflags
) != ls
->ls_exflags
) {
108 log_error(ls
, "config mismatch: %d,%x nodeid %d: %d,%x",
109 ls
->ls_lvblen
, ls
->ls_exflags
, nodeid
,
110 le32_to_cpu(rf
->rf_lvblen
),
111 le32_to_cpu(rf
->rf_lsflags
));
117 static void allow_sync_reply(struct dlm_ls
*ls
, uint64_t *new_seq
)
119 spin_lock(&ls
->ls_rcom_spin
);
120 *new_seq
= ++ls
->ls_rcom_seq
;
121 set_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
122 spin_unlock(&ls
->ls_rcom_spin
);
125 static void disallow_sync_reply(struct dlm_ls
*ls
)
127 spin_lock(&ls
->ls_rcom_spin
);
128 clear_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
129 clear_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
130 spin_unlock(&ls
->ls_rcom_spin
);
134 * low nodeid gathers one slot value at a time from each node.
135 * it sets need_slots=0, and saves rf_our_slot returned from each
138 * other nodes gather all slot values at once from the low nodeid.
139 * they set need_slots=1, and ignore the rf_our_slot returned from each
140 * rcom_config. they use the rf_num_slots returned from the low
141 * node's rcom_config.
144 int dlm_rcom_status(struct dlm_ls
*ls
, int nodeid
, uint32_t status_flags
)
147 struct dlm_mhandle
*mh
;
150 ls
->ls_recover_nodeid
= nodeid
;
152 if (nodeid
== dlm_our_nodeid()) {
153 rc
= ls
->ls_recover_buf
;
154 rc
->rc_result
= dlm_recover_status(ls
);
158 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS
,
159 sizeof(struct rcom_status
), &rc
, &mh
);
163 set_rcom_status(ls
, (struct rcom_status
*)rc
->rc_buf
, status_flags
);
165 allow_sync_reply(ls
, &rc
->rc_id
);
166 memset(ls
->ls_recover_buf
, 0, dlm_config
.ci_buffer_size
);
168 send_rcom(ls
, mh
, rc
);
170 error
= dlm_wait_function(ls
, &rcom_response
);
171 disallow_sync_reply(ls
);
175 rc
= ls
->ls_recover_buf
;
177 if (rc
->rc_result
== -ESRCH
) {
178 /* we pretend the remote lockspace exists with 0 status */
179 log_debug(ls
, "remote node %d not ready", nodeid
);
183 error
= check_rcom_config(ls
, rc
, nodeid
);
186 /* the caller looks at rc_result for the remote recovery status */
191 static void receive_rcom_status(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
194 struct dlm_mhandle
*mh
;
195 struct rcom_status
*rs
;
197 int nodeid
= rc_in
->rc_header
.h_nodeid
;
198 int len
= sizeof(struct rcom_config
);
202 if (!dlm_slots_version(&rc_in
->rc_header
)) {
203 status
= dlm_recover_status(ls
);
207 rs
= (struct rcom_status
*)rc_in
->rc_buf
;
209 if (!(rs
->rs_flags
& DLM_RSF_NEED_SLOTS
)) {
210 status
= dlm_recover_status(ls
);
214 spin_lock(&ls
->ls_recover_lock
);
215 status
= ls
->ls_recover_status
;
216 num_slots
= ls
->ls_num_slots
;
217 spin_unlock(&ls
->ls_recover_lock
);
218 len
+= num_slots
* sizeof(struct rcom_slot
);
221 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS_REPLY
,
226 rc
->rc_id
= rc_in
->rc_id
;
227 rc
->rc_seq_reply
= rc_in
->rc_seq
;
228 rc
->rc_result
= status
;
230 set_rcom_config(ls
, (struct rcom_config
*)rc
->rc_buf
, num_slots
);
235 spin_lock(&ls
->ls_recover_lock
);
236 if (ls
->ls_num_slots
!= num_slots
) {
237 spin_unlock(&ls
->ls_recover_lock
);
238 log_debug(ls
, "receive_rcom_status num_slots %d to %d",
239 num_slots
, ls
->ls_num_slots
);
241 set_rcom_config(ls
, (struct rcom_config
*)rc
->rc_buf
, 0);
245 dlm_slots_copy_out(ls
, rc
);
246 spin_unlock(&ls
->ls_recover_lock
);
249 send_rcom(ls
, mh
, rc
);
252 static void receive_sync_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
254 spin_lock(&ls
->ls_rcom_spin
);
255 if (!test_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
) ||
256 rc_in
->rc_id
!= ls
->ls_rcom_seq
) {
257 log_debug(ls
, "reject reply %d from %d seq %llx expect %llx",
258 rc_in
->rc_type
, rc_in
->rc_header
.h_nodeid
,
259 (unsigned long long)rc_in
->rc_id
,
260 (unsigned long long)ls
->ls_rcom_seq
);
263 memcpy(ls
->ls_recover_buf
, rc_in
, rc_in
->rc_header
.h_length
);
264 set_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
265 clear_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
266 wake_up(&ls
->ls_wait_general
);
268 spin_unlock(&ls
->ls_rcom_spin
);
271 int dlm_rcom_names(struct dlm_ls
*ls
, int nodeid
, char *last_name
, int last_len
)
274 struct dlm_mhandle
*mh
;
277 ls
->ls_recover_nodeid
= nodeid
;
279 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES
, last_len
, &rc
, &mh
);
282 memcpy(rc
->rc_buf
, last_name
, last_len
);
284 allow_sync_reply(ls
, &rc
->rc_id
);
285 memset(ls
->ls_recover_buf
, 0, dlm_config
.ci_buffer_size
);
287 send_rcom(ls
, mh
, rc
);
289 error
= dlm_wait_function(ls
, &rcom_response
);
290 disallow_sync_reply(ls
);
295 static void receive_rcom_names(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
298 struct dlm_mhandle
*mh
;
299 int error
, inlen
, outlen
, nodeid
;
301 nodeid
= rc_in
->rc_header
.h_nodeid
;
302 inlen
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
303 outlen
= dlm_config
.ci_buffer_size
- sizeof(struct dlm_rcom
);
305 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES_REPLY
, outlen
, &rc
, &mh
);
308 rc
->rc_id
= rc_in
->rc_id
;
309 rc
->rc_seq_reply
= rc_in
->rc_seq
;
311 dlm_copy_master_names(ls
, rc_in
->rc_buf
, inlen
, rc
->rc_buf
, outlen
,
313 send_rcom(ls
, mh
, rc
);
316 int dlm_send_rcom_lookup(struct dlm_rsb
*r
, int dir_nodeid
)
319 struct dlm_mhandle
*mh
;
320 struct dlm_ls
*ls
= r
->res_ls
;
323 error
= create_rcom(ls
, dir_nodeid
, DLM_RCOM_LOOKUP
, r
->res_length
,
327 memcpy(rc
->rc_buf
, r
->res_name
, r
->res_length
);
328 rc
->rc_id
= (unsigned long) r
->res_id
;
330 send_rcom(ls
, mh
, rc
);
335 int dlm_send_rcom_lookup_dump(struct dlm_rsb
*r
, int to_nodeid
)
338 struct dlm_mhandle
*mh
;
339 struct dlm_ls
*ls
= r
->res_ls
;
342 error
= create_rcom(ls
, to_nodeid
, DLM_RCOM_LOOKUP
, r
->res_length
,
346 memcpy(rc
->rc_buf
, r
->res_name
, r
->res_length
);
347 rc
->rc_id
= 0xFFFFFFFF;
349 send_rcom(ls
, mh
, rc
);
354 static void receive_rcom_lookup(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
357 struct dlm_mhandle
*mh
;
358 int error
, ret_nodeid
, nodeid
= rc_in
->rc_header
.h_nodeid
;
359 int len
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
361 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOOKUP_REPLY
, 0, &rc
, &mh
);
365 if (rc_in
->rc_id
== 0xFFFFFFFF) {
366 log_error(ls
, "receive_rcom_lookup dump from %d", nodeid
);
367 dlm_dump_rsb_name(ls
, rc_in
->rc_buf
, len
);
371 error
= dlm_master_lookup(ls
, nodeid
, rc_in
->rc_buf
, len
,
372 DLM_LU_RECOVER_MASTER
, &ret_nodeid
, NULL
);
375 rc
->rc_result
= ret_nodeid
;
376 rc
->rc_id
= rc_in
->rc_id
;
377 rc
->rc_seq_reply
= rc_in
->rc_seq
;
379 send_rcom(ls
, mh
, rc
);
382 static void receive_rcom_lookup_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
384 dlm_recover_master_reply(ls
, rc_in
);
387 static void pack_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
388 struct rcom_lock
*rl
)
390 memset(rl
, 0, sizeof(*rl
));
392 rl
->rl_ownpid
= cpu_to_le32(lkb
->lkb_ownpid
);
393 rl
->rl_lkid
= cpu_to_le32(lkb
->lkb_id
);
394 rl
->rl_exflags
= cpu_to_le32(lkb
->lkb_exflags
);
395 rl
->rl_flags
= cpu_to_le32(lkb
->lkb_flags
);
396 rl
->rl_lvbseq
= cpu_to_le32(lkb
->lkb_lvbseq
);
397 rl
->rl_rqmode
= lkb
->lkb_rqmode
;
398 rl
->rl_grmode
= lkb
->lkb_grmode
;
399 rl
->rl_status
= lkb
->lkb_status
;
400 rl
->rl_wait_type
= cpu_to_le16(lkb
->lkb_wait_type
);
403 rl
->rl_asts
|= DLM_CB_BAST
;
405 rl
->rl_asts
|= DLM_CB_CAST
;
407 rl
->rl_namelen
= cpu_to_le16(r
->res_length
);
408 memcpy(rl
->rl_name
, r
->res_name
, r
->res_length
);
410 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
411 If so, receive_rcom_lock_args() won't take this copy. */
414 memcpy(rl
->rl_lvb
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
417 int dlm_send_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
419 struct dlm_ls
*ls
= r
->res_ls
;
421 struct dlm_mhandle
*mh
;
422 struct rcom_lock
*rl
;
423 int error
, len
= sizeof(struct rcom_lock
);
426 len
+= ls
->ls_lvblen
;
428 error
= create_rcom(ls
, r
->res_nodeid
, DLM_RCOM_LOCK
, len
, &rc
, &mh
);
432 rl
= (struct rcom_lock
*) rc
->rc_buf
;
433 pack_rcom_lock(r
, lkb
, rl
);
434 rc
->rc_id
= (unsigned long) r
;
436 send_rcom(ls
, mh
, rc
);
441 /* needs at least dlm_rcom + rcom_lock */
442 static void receive_rcom_lock(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
445 struct dlm_mhandle
*mh
;
446 int error
, nodeid
= rc_in
->rc_header
.h_nodeid
;
448 dlm_recover_master_copy(ls
, rc_in
);
450 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOCK_REPLY
,
451 sizeof(struct rcom_lock
), &rc
, &mh
);
455 /* We send back the same rcom_lock struct we received, but
456 dlm_recover_master_copy() has filled in rl_remid and rl_result */
458 memcpy(rc
->rc_buf
, rc_in
->rc_buf
, sizeof(struct rcom_lock
));
459 rc
->rc_id
= rc_in
->rc_id
;
460 rc
->rc_seq_reply
= rc_in
->rc_seq
;
462 send_rcom(ls
, mh
, rc
);
465 /* If the lockspace doesn't exist then still send a status message
466 back; it's possible that it just doesn't have its global_id yet. */
468 int dlm_send_ls_not_ready(int nodeid
, struct dlm_rcom
*rc_in
)
471 struct rcom_config
*rf
;
472 struct dlm_mhandle
*mh
;
474 int mb_len
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_config
);
476 mh
= dlm_lowcomms_get_buffer(nodeid
, mb_len
, GFP_NOFS
, &mb
);
479 memset(mb
, 0, mb_len
);
481 rc
= (struct dlm_rcom
*) mb
;
483 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
484 rc
->rc_header
.h_lockspace
= rc_in
->rc_header
.h_lockspace
;
485 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
486 rc
->rc_header
.h_length
= mb_len
;
487 rc
->rc_header
.h_cmd
= DLM_RCOM
;
489 rc
->rc_type
= DLM_RCOM_STATUS_REPLY
;
490 rc
->rc_id
= rc_in
->rc_id
;
491 rc
->rc_seq_reply
= rc_in
->rc_seq
;
492 rc
->rc_result
= -ESRCH
;
494 rf
= (struct rcom_config
*) rc
->rc_buf
;
495 rf
->rf_lvblen
= cpu_to_le32(~0U);
498 dlm_lowcomms_commit_buffer(mh
);
504 * Ignore messages for stage Y before we set
505 * recover_status bit for stage X:
509 * dlm_recover_members()
512 * - ignore NAMES, NAMES_REPLY
513 * - ignore LOOKUP, LOOKUP_REPLY
514 * - ignore LOCK, LOCK_REPLY
516 * recover_status |= NODES
518 * dlm_recover_members_wait()
520 * dlm_recover_directory()
523 * - ignore LOOKUP, LOOKUP_REPLY
524 * - ignore LOCK, LOCK_REPLY
526 * recover_status |= DIR
528 * dlm_recover_directory_wait()
530 * dlm_recover_masters()
532 * - recv LOOKUP_REPLY
534 * dlm_recover_locks()
538 * recover_status |= LOCKS
540 * dlm_recover_locks_wait()
542 * recover_status |= DONE
545 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
546 recovery-only comms are sent through here. */
548 void dlm_receive_rcom(struct dlm_ls
*ls
, struct dlm_rcom
*rc
, int nodeid
)
550 int lock_size
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_lock
);
551 int stop
, reply
= 0, names
= 0, lookup
= 0, lock
= 0;
555 switch (rc
->rc_type
) {
556 case DLM_RCOM_STATUS_REPLY
:
562 case DLM_RCOM_NAMES_REPLY
:
566 case DLM_RCOM_LOOKUP
:
569 case DLM_RCOM_LOOKUP_REPLY
:
576 case DLM_RCOM_LOCK_REPLY
:
582 spin_lock(&ls
->ls_recover_lock
);
583 status
= ls
->ls_recover_status
;
584 stop
= test_bit(LSFL_RECOVERY_STOP
, &ls
->ls_flags
);
585 seq
= ls
->ls_recover_seq
;
586 spin_unlock(&ls
->ls_recover_lock
);
588 if (stop
&& (rc
->rc_type
!= DLM_RCOM_STATUS
))
591 if (reply
&& (rc
->rc_seq_reply
!= seq
))
594 if (!(status
& DLM_RS_NODES
) && (names
|| lookup
|| lock
))
597 if (!(status
& DLM_RS_DIR
) && (lookup
|| lock
))
600 switch (rc
->rc_type
) {
601 case DLM_RCOM_STATUS
:
602 receive_rcom_status(ls
, rc
);
606 receive_rcom_names(ls
, rc
);
609 case DLM_RCOM_LOOKUP
:
610 receive_rcom_lookup(ls
, rc
);
614 if (rc
->rc_header
.h_length
< lock_size
)
616 receive_rcom_lock(ls
, rc
);
619 case DLM_RCOM_STATUS_REPLY
:
620 receive_sync_reply(ls
, rc
);
623 case DLM_RCOM_NAMES_REPLY
:
624 receive_sync_reply(ls
, rc
);
627 case DLM_RCOM_LOOKUP_REPLY
:
628 receive_rcom_lookup_reply(ls
, rc
);
631 case DLM_RCOM_LOCK_REPLY
:
632 if (rc
->rc_header
.h_length
< lock_size
)
634 dlm_recover_process_copy(ls
, rc
);
638 log_error(ls
, "receive_rcom bad type %d", rc
->rc_type
);
643 log_limit(ls
, "dlm_receive_rcom ignore msg %d "
644 "from %d %llu %llu recover seq %llu sts %x gen %u",
647 (unsigned long long)rc
->rc_seq
,
648 (unsigned long long)rc
->rc_seq_reply
,
649 (unsigned long long)seq
,
650 status
, ls
->ls_generation
);
653 log_error(ls
, "recovery message %d from %d is too short",
654 rc
->rc_type
, nodeid
);