1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
29 static int rcom_response(struct dlm_ls
*ls
)
31 return test_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
34 static int create_rcom(struct dlm_ls
*ls
, int to_nodeid
, int type
, int len
,
35 struct dlm_rcom
**rc_ret
, struct dlm_mhandle
**mh_ret
)
38 struct dlm_mhandle
*mh
;
40 int mb_len
= sizeof(struct dlm_rcom
) + len
;
42 mh
= dlm_lowcomms_get_buffer(to_nodeid
, mb_len
, GFP_NOFS
, &mb
);
44 log_print("create_rcom to %d type %d len %d ENOBUFS",
45 to_nodeid
, type
, len
);
48 memset(mb
, 0, mb_len
);
50 rc
= (struct dlm_rcom
*) mb
;
52 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
53 rc
->rc_header
.h_lockspace
= ls
->ls_global_id
;
54 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
55 rc
->rc_header
.h_length
= mb_len
;
56 rc
->rc_header
.h_cmd
= DLM_RCOM
;
60 spin_lock(&ls
->ls_recover_lock
);
61 rc
->rc_seq
= ls
->ls_recover_seq
;
62 spin_unlock(&ls
->ls_recover_lock
);
69 static void send_rcom(struct dlm_ls
*ls
, struct dlm_mhandle
*mh
,
73 dlm_lowcomms_commit_buffer(mh
);
76 static void set_rcom_status(struct dlm_ls
*ls
, struct rcom_status
*rs
,
79 rs
->rs_flags
= cpu_to_le32(flags
);
82 /* When replying to a status request, a node also sends back its
83 configuration values. The requesting node then checks that the remote
84 node is configured the same way as itself. */
86 static void set_rcom_config(struct dlm_ls
*ls
, struct rcom_config
*rf
,
89 rf
->rf_lvblen
= cpu_to_le32(ls
->ls_lvblen
);
90 rf
->rf_lsflags
= cpu_to_le32(ls
->ls_exflags
);
92 rf
->rf_our_slot
= cpu_to_le16(ls
->ls_slot
);
93 rf
->rf_num_slots
= cpu_to_le16(num_slots
);
94 rf
->rf_generation
= cpu_to_le32(ls
->ls_generation
);
97 static int check_rcom_config(struct dlm_ls
*ls
, struct dlm_rcom
*rc
, int nodeid
)
99 struct rcom_config
*rf
= (struct rcom_config
*) rc
->rc_buf
;
101 if ((rc
->rc_header
.h_version
& 0xFFFF0000) != DLM_HEADER_MAJOR
) {
102 log_error(ls
, "version mismatch: %x nodeid %d: %x",
103 DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
, nodeid
,
104 rc
->rc_header
.h_version
);
108 if (le32_to_cpu(rf
->rf_lvblen
) != ls
->ls_lvblen
||
109 le32_to_cpu(rf
->rf_lsflags
) != ls
->ls_exflags
) {
110 log_error(ls
, "config mismatch: %d,%x nodeid %d: %d,%x",
111 ls
->ls_lvblen
, ls
->ls_exflags
, nodeid
,
112 le32_to_cpu(rf
->rf_lvblen
),
113 le32_to_cpu(rf
->rf_lsflags
));
119 static void allow_sync_reply(struct dlm_ls
*ls
, uint64_t *new_seq
)
121 spin_lock(&ls
->ls_rcom_spin
);
122 *new_seq
= ++ls
->ls_rcom_seq
;
123 set_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
124 spin_unlock(&ls
->ls_rcom_spin
);
127 static void disallow_sync_reply(struct dlm_ls
*ls
)
129 spin_lock(&ls
->ls_rcom_spin
);
130 clear_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
131 clear_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
132 spin_unlock(&ls
->ls_rcom_spin
);
136 * low nodeid gathers one slot value at a time from each node.
137 * it sets need_slots=0, and saves rf_our_slot returned from each
140 * other nodes gather all slot values at once from the low nodeid.
141 * they set need_slots=1, and ignore the rf_our_slot returned from each
142 * rcom_config. they use the rf_num_slots returned from the low
143 * node's rcom_config.
146 int dlm_rcom_status(struct dlm_ls
*ls
, int nodeid
, uint32_t status_flags
)
149 struct dlm_mhandle
*mh
;
152 ls
->ls_recover_nodeid
= nodeid
;
154 if (nodeid
== dlm_our_nodeid()) {
155 rc
= ls
->ls_recover_buf
;
156 rc
->rc_result
= dlm_recover_status(ls
);
160 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS
,
161 sizeof(struct rcom_status
), &rc
, &mh
);
165 set_rcom_status(ls
, (struct rcom_status
*)rc
->rc_buf
, status_flags
);
167 allow_sync_reply(ls
, &rc
->rc_id
);
168 memset(ls
->ls_recover_buf
, 0, dlm_config
.ci_buffer_size
);
170 send_rcom(ls
, mh
, rc
);
172 error
= dlm_wait_function(ls
, &rcom_response
);
173 disallow_sync_reply(ls
);
177 rc
= ls
->ls_recover_buf
;
179 if (rc
->rc_result
== -ESRCH
) {
180 /* we pretend the remote lockspace exists with 0 status */
181 log_debug(ls
, "remote node %d not ready", nodeid
);
185 error
= check_rcom_config(ls
, rc
, nodeid
);
188 /* the caller looks at rc_result for the remote recovery status */
193 static void receive_rcom_status(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
196 struct dlm_mhandle
*mh
;
197 struct rcom_status
*rs
;
199 int nodeid
= rc_in
->rc_header
.h_nodeid
;
200 int len
= sizeof(struct rcom_config
);
204 if (!dlm_slots_version(&rc_in
->rc_header
)) {
205 status
= dlm_recover_status(ls
);
209 rs
= (struct rcom_status
*)rc_in
->rc_buf
;
211 if (!(rs
->rs_flags
& DLM_RSF_NEED_SLOTS
)) {
212 status
= dlm_recover_status(ls
);
216 spin_lock(&ls
->ls_recover_lock
);
217 status
= ls
->ls_recover_status
;
218 num_slots
= ls
->ls_num_slots
;
219 spin_unlock(&ls
->ls_recover_lock
);
220 len
+= num_slots
* sizeof(struct rcom_slot
);
223 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS_REPLY
,
228 rc
->rc_id
= rc_in
->rc_id
;
229 rc
->rc_seq_reply
= rc_in
->rc_seq
;
230 rc
->rc_result
= status
;
232 set_rcom_config(ls
, (struct rcom_config
*)rc
->rc_buf
, num_slots
);
237 spin_lock(&ls
->ls_recover_lock
);
238 if (ls
->ls_num_slots
!= num_slots
) {
239 spin_unlock(&ls
->ls_recover_lock
);
240 log_debug(ls
, "receive_rcom_status num_slots %d to %d",
241 num_slots
, ls
->ls_num_slots
);
243 set_rcom_config(ls
, (struct rcom_config
*)rc
->rc_buf
, 0);
247 dlm_slots_copy_out(ls
, rc
);
248 spin_unlock(&ls
->ls_recover_lock
);
251 send_rcom(ls
, mh
, rc
);
254 static void receive_sync_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
256 spin_lock(&ls
->ls_rcom_spin
);
257 if (!test_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
) ||
258 rc_in
->rc_id
!= ls
->ls_rcom_seq
) {
259 log_debug(ls
, "reject reply %d from %d seq %llx expect %llx",
260 rc_in
->rc_type
, rc_in
->rc_header
.h_nodeid
,
261 (unsigned long long)rc_in
->rc_id
,
262 (unsigned long long)ls
->ls_rcom_seq
);
265 memcpy(ls
->ls_recover_buf
, rc_in
, rc_in
->rc_header
.h_length
);
266 set_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
267 clear_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
268 wake_up(&ls
->ls_wait_general
);
270 spin_unlock(&ls
->ls_rcom_spin
);
273 int dlm_rcom_names(struct dlm_ls
*ls
, int nodeid
, char *last_name
, int last_len
)
276 struct dlm_mhandle
*mh
;
278 int max_size
= dlm_config
.ci_buffer_size
- sizeof(struct dlm_rcom
);
280 ls
->ls_recover_nodeid
= nodeid
;
282 if (nodeid
== dlm_our_nodeid()) {
283 ls
->ls_recover_buf
->rc_header
.h_length
=
284 dlm_config
.ci_buffer_size
;
285 dlm_copy_master_names(ls
, last_name
, last_len
,
286 ls
->ls_recover_buf
->rc_buf
,
291 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES
, last_len
, &rc
, &mh
);
294 memcpy(rc
->rc_buf
, last_name
, last_len
);
296 allow_sync_reply(ls
, &rc
->rc_id
);
297 memset(ls
->ls_recover_buf
, 0, dlm_config
.ci_buffer_size
);
299 send_rcom(ls
, mh
, rc
);
301 error
= dlm_wait_function(ls
, &rcom_response
);
302 disallow_sync_reply(ls
);
307 static void receive_rcom_names(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
310 struct dlm_mhandle
*mh
;
311 int error
, inlen
, outlen
, nodeid
;
313 nodeid
= rc_in
->rc_header
.h_nodeid
;
314 inlen
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
315 outlen
= dlm_config
.ci_buffer_size
- sizeof(struct dlm_rcom
);
317 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES_REPLY
, outlen
, &rc
, &mh
);
320 rc
->rc_id
= rc_in
->rc_id
;
321 rc
->rc_seq_reply
= rc_in
->rc_seq
;
323 dlm_copy_master_names(ls
, rc_in
->rc_buf
, inlen
, rc
->rc_buf
, outlen
,
325 send_rcom(ls
, mh
, rc
);
328 int dlm_send_rcom_lookup(struct dlm_rsb
*r
, int dir_nodeid
)
331 struct dlm_mhandle
*mh
;
332 struct dlm_ls
*ls
= r
->res_ls
;
335 error
= create_rcom(ls
, dir_nodeid
, DLM_RCOM_LOOKUP
, r
->res_length
,
339 memcpy(rc
->rc_buf
, r
->res_name
, r
->res_length
);
340 rc
->rc_id
= (unsigned long) r
;
342 send_rcom(ls
, mh
, rc
);
347 static void receive_rcom_lookup(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
350 struct dlm_mhandle
*mh
;
351 int error
, ret_nodeid
, nodeid
= rc_in
->rc_header
.h_nodeid
;
352 int len
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
354 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOOKUP_REPLY
, 0, &rc
, &mh
);
358 error
= dlm_dir_lookup(ls
, nodeid
, rc_in
->rc_buf
, len
, &ret_nodeid
);
361 rc
->rc_result
= ret_nodeid
;
362 rc
->rc_id
= rc_in
->rc_id
;
363 rc
->rc_seq_reply
= rc_in
->rc_seq
;
365 send_rcom(ls
, mh
, rc
);
368 static void receive_rcom_lookup_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
370 dlm_recover_master_reply(ls
, rc_in
);
373 static void pack_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
374 struct rcom_lock
*rl
)
376 memset(rl
, 0, sizeof(*rl
));
378 rl
->rl_ownpid
= cpu_to_le32(lkb
->lkb_ownpid
);
379 rl
->rl_lkid
= cpu_to_le32(lkb
->lkb_id
);
380 rl
->rl_exflags
= cpu_to_le32(lkb
->lkb_exflags
);
381 rl
->rl_flags
= cpu_to_le32(lkb
->lkb_flags
);
382 rl
->rl_lvbseq
= cpu_to_le32(lkb
->lkb_lvbseq
);
383 rl
->rl_rqmode
= lkb
->lkb_rqmode
;
384 rl
->rl_grmode
= lkb
->lkb_grmode
;
385 rl
->rl_status
= lkb
->lkb_status
;
386 rl
->rl_wait_type
= cpu_to_le16(lkb
->lkb_wait_type
);
389 rl
->rl_asts
|= DLM_CB_BAST
;
391 rl
->rl_asts
|= DLM_CB_CAST
;
393 rl
->rl_namelen
= cpu_to_le16(r
->res_length
);
394 memcpy(rl
->rl_name
, r
->res_name
, r
->res_length
);
396 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
397 If so, receive_rcom_lock_args() won't take this copy. */
400 memcpy(rl
->rl_lvb
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
403 int dlm_send_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
405 struct dlm_ls
*ls
= r
->res_ls
;
407 struct dlm_mhandle
*mh
;
408 struct rcom_lock
*rl
;
409 int error
, len
= sizeof(struct rcom_lock
);
412 len
+= ls
->ls_lvblen
;
414 error
= create_rcom(ls
, r
->res_nodeid
, DLM_RCOM_LOCK
, len
, &rc
, &mh
);
418 rl
= (struct rcom_lock
*) rc
->rc_buf
;
419 pack_rcom_lock(r
, lkb
, rl
);
420 rc
->rc_id
= (unsigned long) r
;
422 send_rcom(ls
, mh
, rc
);
427 /* needs at least dlm_rcom + rcom_lock */
428 static void receive_rcom_lock(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
431 struct dlm_mhandle
*mh
;
432 int error
, nodeid
= rc_in
->rc_header
.h_nodeid
;
434 dlm_recover_master_copy(ls
, rc_in
);
436 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOCK_REPLY
,
437 sizeof(struct rcom_lock
), &rc
, &mh
);
441 /* We send back the same rcom_lock struct we received, but
442 dlm_recover_master_copy() has filled in rl_remid and rl_result */
444 memcpy(rc
->rc_buf
, rc_in
->rc_buf
, sizeof(struct rcom_lock
));
445 rc
->rc_id
= rc_in
->rc_id
;
446 rc
->rc_seq_reply
= rc_in
->rc_seq
;
448 send_rcom(ls
, mh
, rc
);
451 /* If the lockspace doesn't exist then still send a status message
452 back; it's possible that it just doesn't have its global_id yet. */
454 int dlm_send_ls_not_ready(int nodeid
, struct dlm_rcom
*rc_in
)
457 struct rcom_config
*rf
;
458 struct dlm_mhandle
*mh
;
460 int mb_len
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_config
);
462 mh
= dlm_lowcomms_get_buffer(nodeid
, mb_len
, GFP_NOFS
, &mb
);
465 memset(mb
, 0, mb_len
);
467 rc
= (struct dlm_rcom
*) mb
;
469 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
470 rc
->rc_header
.h_lockspace
= rc_in
->rc_header
.h_lockspace
;
471 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
472 rc
->rc_header
.h_length
= mb_len
;
473 rc
->rc_header
.h_cmd
= DLM_RCOM
;
475 rc
->rc_type
= DLM_RCOM_STATUS_REPLY
;
476 rc
->rc_id
= rc_in
->rc_id
;
477 rc
->rc_seq_reply
= rc_in
->rc_seq
;
478 rc
->rc_result
= -ESRCH
;
480 rf
= (struct rcom_config
*) rc
->rc_buf
;
481 rf
->rf_lvblen
= cpu_to_le32(~0U);
484 dlm_lowcomms_commit_buffer(mh
);
489 static int is_old_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc
)
494 switch (rc
->rc_type
) {
495 case DLM_RCOM_STATUS_REPLY
:
496 case DLM_RCOM_NAMES_REPLY
:
497 case DLM_RCOM_LOOKUP_REPLY
:
498 case DLM_RCOM_LOCK_REPLY
:
499 spin_lock(&ls
->ls_recover_lock
);
500 seq
= ls
->ls_recover_seq
;
501 spin_unlock(&ls
->ls_recover_lock
);
502 if (rc
->rc_seq_reply
!= seq
) {
503 log_debug(ls
, "ignoring old reply %x from %d "
504 "seq_reply %llx expect %llx",
505 rc
->rc_type
, rc
->rc_header
.h_nodeid
,
506 (unsigned long long)rc
->rc_seq_reply
,
507 (unsigned long long)seq
);
514 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
515 recovery-only comms are sent through here. */
517 void dlm_receive_rcom(struct dlm_ls
*ls
, struct dlm_rcom
*rc
, int nodeid
)
519 int lock_size
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_lock
);
521 if (dlm_recovery_stopped(ls
) && (rc
->rc_type
!= DLM_RCOM_STATUS
)) {
522 log_debug(ls
, "ignoring recovery message %x from %d",
523 rc
->rc_type
, nodeid
);
527 if (is_old_reply(ls
, rc
))
530 switch (rc
->rc_type
) {
531 case DLM_RCOM_STATUS
:
532 receive_rcom_status(ls
, rc
);
536 receive_rcom_names(ls
, rc
);
539 case DLM_RCOM_LOOKUP
:
540 receive_rcom_lookup(ls
, rc
);
544 if (rc
->rc_header
.h_length
< lock_size
)
546 receive_rcom_lock(ls
, rc
);
549 case DLM_RCOM_STATUS_REPLY
:
550 receive_sync_reply(ls
, rc
);
553 case DLM_RCOM_NAMES_REPLY
:
554 receive_sync_reply(ls
, rc
);
557 case DLM_RCOM_LOOKUP_REPLY
:
558 receive_rcom_lookup_reply(ls
, rc
);
561 case DLM_RCOM_LOCK_REPLY
:
562 if (rc
->rc_header
.h_length
< lock_size
)
564 dlm_recover_process_copy(ls
, rc
);
568 log_error(ls
, "receive_rcom bad type %d", rc
->rc_type
);
573 log_error(ls
, "recovery message %x from %d is too short",
574 rc
->rc_type
, nodeid
);