1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include "dlm_internal.h"
13 #include "lockspace.h"
25 static int rcom_response(struct dlm_ls
*ls
)
27 return test_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
30 static void _create_rcom(struct dlm_ls
*ls
, int to_nodeid
, int type
, int len
,
31 struct dlm_rcom
**rc_ret
, char *mb
, int mb_len
,
36 rc
= (struct dlm_rcom
*) mb
;
38 rc
->rc_header
.h_version
= cpu_to_le32(DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
39 rc
->rc_header
.u
.h_lockspace
= cpu_to_le32(ls
->ls_global_id
);
40 rc
->rc_header
.h_nodeid
= cpu_to_le32(dlm_our_nodeid());
41 rc
->rc_header
.h_length
= cpu_to_le16(mb_len
);
42 rc
->rc_header
.h_cmd
= DLM_RCOM
;
44 rc
->rc_type
= cpu_to_le32(type
);
45 rc
->rc_seq
= cpu_to_le64(seq
);
50 static int create_rcom(struct dlm_ls
*ls
, int to_nodeid
, int type
, int len
,
51 struct dlm_rcom
**rc_ret
, struct dlm_mhandle
**mh_ret
,
54 int mb_len
= sizeof(struct dlm_rcom
) + len
;
55 struct dlm_mhandle
*mh
;
58 mh
= dlm_midcomms_get_mhandle(to_nodeid
, mb_len
, &mb
);
60 log_print("%s to %d type %d len %d ENOBUFS",
61 __func__
, to_nodeid
, type
, len
);
65 _create_rcom(ls
, to_nodeid
, type
, len
, rc_ret
, mb
, mb_len
, seq
);
70 static int create_rcom_stateless(struct dlm_ls
*ls
, int to_nodeid
, int type
,
71 int len
, struct dlm_rcom
**rc_ret
,
72 struct dlm_msg
**msg_ret
, uint64_t seq
)
74 int mb_len
= sizeof(struct dlm_rcom
) + len
;
78 msg
= dlm_lowcomms_new_msg(to_nodeid
, mb_len
, &mb
, NULL
, NULL
);
80 log_print("create_rcom to %d type %d len %d ENOBUFS",
81 to_nodeid
, type
, len
);
85 _create_rcom(ls
, to_nodeid
, type
, len
, rc_ret
, mb
, mb_len
, seq
);
90 static void send_rcom(struct dlm_mhandle
*mh
, struct dlm_rcom
*rc
)
92 dlm_midcomms_commit_mhandle(mh
, NULL
, 0);
95 static void send_rcom_stateless(struct dlm_msg
*msg
, struct dlm_rcom
*rc
)
97 dlm_lowcomms_commit_msg(msg
);
98 dlm_lowcomms_put_msg(msg
);
101 static void set_rcom_status(struct dlm_ls
*ls
, struct rcom_status
*rs
,
104 rs
->rs_flags
= cpu_to_le32(flags
);
107 /* When replying to a status request, a node also sends back its
108 configuration values. The requesting node then checks that the remote
109 node is configured the same way as itself. */
111 static void set_rcom_config(struct dlm_ls
*ls
, struct rcom_config
*rf
,
114 rf
->rf_lvblen
= cpu_to_le32(ls
->ls_lvblen
);
115 rf
->rf_lsflags
= cpu_to_le32(ls
->ls_exflags
);
117 rf
->rf_our_slot
= cpu_to_le16(ls
->ls_slot
);
118 rf
->rf_num_slots
= cpu_to_le16(num_slots
);
119 rf
->rf_generation
= cpu_to_le32(ls
->ls_generation
);
122 static int check_rcom_config(struct dlm_ls
*ls
, struct dlm_rcom
*rc
, int nodeid
)
124 struct rcom_config
*rf
= (struct rcom_config
*) rc
->rc_buf
;
126 if ((le32_to_cpu(rc
->rc_header
.h_version
) & 0xFFFF0000) != DLM_HEADER_MAJOR
) {
127 log_error(ls
, "version mismatch: %x nodeid %d: %x",
128 DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
, nodeid
,
129 le32_to_cpu(rc
->rc_header
.h_version
));
133 if (le32_to_cpu(rf
->rf_lvblen
) != ls
->ls_lvblen
||
134 le32_to_cpu(rf
->rf_lsflags
) != ls
->ls_exflags
) {
135 log_error(ls
, "config mismatch: %d,%x nodeid %d: %d,%x",
136 ls
->ls_lvblen
, ls
->ls_exflags
, nodeid
,
137 le32_to_cpu(rf
->rf_lvblen
),
138 le32_to_cpu(rf
->rf_lsflags
));
144 static void allow_sync_reply(struct dlm_ls
*ls
, __le64
*new_seq
)
146 spin_lock_bh(&ls
->ls_rcom_spin
);
147 *new_seq
= cpu_to_le64(++ls
->ls_rcom_seq
);
148 set_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
149 spin_unlock_bh(&ls
->ls_rcom_spin
);
152 static void disallow_sync_reply(struct dlm_ls
*ls
)
154 spin_lock_bh(&ls
->ls_rcom_spin
);
155 clear_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
156 clear_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
157 spin_unlock_bh(&ls
->ls_rcom_spin
);
161 * low nodeid gathers one slot value at a time from each node.
162 * it sets need_slots=0, and saves rf_our_slot returned from each
165 * other nodes gather all slot values at once from the low nodeid.
166 * they set need_slots=1, and ignore the rf_our_slot returned from each
167 * rcom_config. they use the rf_num_slots returned from the low
168 * node's rcom_config.
171 int dlm_rcom_status(struct dlm_ls
*ls
, int nodeid
, uint32_t status_flags
,
178 ls
->ls_recover_nodeid
= nodeid
;
180 if (nodeid
== dlm_our_nodeid()) {
181 rc
= ls
->ls_recover_buf
;
182 rc
->rc_result
= cpu_to_le32(dlm_recover_status(ls
));
187 error
= create_rcom_stateless(ls
, nodeid
, DLM_RCOM_STATUS
,
188 sizeof(struct rcom_status
), &rc
, &msg
,
193 set_rcom_status(ls
, (struct rcom_status
*)rc
->rc_buf
, status_flags
);
195 allow_sync_reply(ls
, &rc
->rc_id
);
196 memset(ls
->ls_recover_buf
, 0, DLM_MAX_SOCKET_BUFSIZE
);
198 send_rcom_stateless(msg
, rc
);
200 error
= dlm_wait_function(ls
, &rcom_response
);
201 disallow_sync_reply(ls
);
202 if (error
== -ETIMEDOUT
)
207 rc
= ls
->ls_recover_buf
;
209 if (rc
->rc_result
== cpu_to_le32(-ESRCH
)) {
210 /* we pretend the remote lockspace exists with 0 status */
211 log_debug(ls
, "remote node %d not ready", nodeid
);
215 error
= check_rcom_config(ls
, rc
, nodeid
);
218 /* the caller looks at rc_result for the remote recovery status */
223 static void receive_rcom_status(struct dlm_ls
*ls
,
224 const struct dlm_rcom
*rc_in
,
228 struct rcom_status
*rs
;
230 int nodeid
= le32_to_cpu(rc_in
->rc_header
.h_nodeid
);
231 int len
= sizeof(struct rcom_config
);
236 if (!dlm_slots_version(&rc_in
->rc_header
)) {
237 status
= dlm_recover_status(ls
);
241 rs
= (struct rcom_status
*)rc_in
->rc_buf
;
243 if (!(le32_to_cpu(rs
->rs_flags
) & DLM_RSF_NEED_SLOTS
)) {
244 status
= dlm_recover_status(ls
);
248 spin_lock_bh(&ls
->ls_recover_lock
);
249 status
= ls
->ls_recover_status
;
250 num_slots
= ls
->ls_num_slots
;
251 spin_unlock_bh(&ls
->ls_recover_lock
);
252 len
+= num_slots
* sizeof(struct rcom_slot
);
255 error
= create_rcom_stateless(ls
, nodeid
, DLM_RCOM_STATUS_REPLY
,
256 len
, &rc
, &msg
, seq
);
260 rc
->rc_id
= rc_in
->rc_id
;
261 rc
->rc_seq_reply
= rc_in
->rc_seq
;
262 rc
->rc_result
= cpu_to_le32(status
);
264 set_rcom_config(ls
, (struct rcom_config
*)rc
->rc_buf
, num_slots
);
269 spin_lock_bh(&ls
->ls_recover_lock
);
270 if (ls
->ls_num_slots
!= num_slots
) {
271 spin_unlock_bh(&ls
->ls_recover_lock
);
272 log_debug(ls
, "receive_rcom_status num_slots %d to %d",
273 num_slots
, ls
->ls_num_slots
);
275 set_rcom_config(ls
, (struct rcom_config
*)rc
->rc_buf
, 0);
279 dlm_slots_copy_out(ls
, rc
);
280 spin_unlock_bh(&ls
->ls_recover_lock
);
283 send_rcom_stateless(msg
, rc
);
286 static void receive_sync_reply(struct dlm_ls
*ls
, const struct dlm_rcom
*rc_in
)
288 spin_lock_bh(&ls
->ls_rcom_spin
);
289 if (!test_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
) ||
290 le64_to_cpu(rc_in
->rc_id
) != ls
->ls_rcom_seq
) {
291 log_debug(ls
, "reject reply %d from %d seq %llx expect %llx",
292 le32_to_cpu(rc_in
->rc_type
),
293 le32_to_cpu(rc_in
->rc_header
.h_nodeid
),
294 (unsigned long long)le64_to_cpu(rc_in
->rc_id
),
295 (unsigned long long)ls
->ls_rcom_seq
);
298 memcpy(ls
->ls_recover_buf
, rc_in
,
299 le16_to_cpu(rc_in
->rc_header
.h_length
));
300 set_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
301 clear_bit(LSFL_RCOM_WAIT
, &ls
->ls_flags
);
302 wake_up(&ls
->ls_wait_general
);
304 spin_unlock_bh(&ls
->ls_rcom_spin
);
307 int dlm_rcom_names(struct dlm_ls
*ls
, int nodeid
, char *last_name
,
308 int last_len
, uint64_t seq
)
310 struct dlm_mhandle
*mh
;
314 ls
->ls_recover_nodeid
= nodeid
;
317 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES
, last_len
,
321 memcpy(rc
->rc_buf
, last_name
, last_len
);
323 allow_sync_reply(ls
, &rc
->rc_id
);
324 memset(ls
->ls_recover_buf
, 0, DLM_MAX_SOCKET_BUFSIZE
);
328 error
= dlm_wait_function(ls
, &rcom_response
);
329 disallow_sync_reply(ls
);
330 if (error
== -ETIMEDOUT
)
336 static void receive_rcom_names(struct dlm_ls
*ls
, const struct dlm_rcom
*rc_in
,
339 struct dlm_mhandle
*mh
;
341 int error
, inlen
, outlen
, nodeid
;
343 nodeid
= le32_to_cpu(rc_in
->rc_header
.h_nodeid
);
344 inlen
= le16_to_cpu(rc_in
->rc_header
.h_length
) -
345 sizeof(struct dlm_rcom
);
346 outlen
= DLM_MAX_APP_BUFSIZE
- sizeof(struct dlm_rcom
);
348 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES_REPLY
, outlen
,
352 rc
->rc_id
= rc_in
->rc_id
;
353 rc
->rc_seq_reply
= rc_in
->rc_seq
;
355 dlm_copy_master_names(ls
, rc_in
->rc_buf
, inlen
, rc
->rc_buf
, outlen
,
360 int dlm_send_rcom_lookup(struct dlm_rsb
*r
, int dir_nodeid
, uint64_t seq
)
363 struct dlm_mhandle
*mh
;
364 struct dlm_ls
*ls
= r
->res_ls
;
367 error
= create_rcom(ls
, dir_nodeid
, DLM_RCOM_LOOKUP
, r
->res_length
,
371 memcpy(rc
->rc_buf
, r
->res_name
, r
->res_length
);
372 rc
->rc_id
= cpu_to_le64(r
->res_id
);
379 static void receive_rcom_lookup(struct dlm_ls
*ls
,
380 const struct dlm_rcom
*rc_in
, uint64_t seq
)
383 struct dlm_mhandle
*mh
;
384 int error
, ret_nodeid
, nodeid
= le32_to_cpu(rc_in
->rc_header
.h_nodeid
);
385 int len
= le16_to_cpu(rc_in
->rc_header
.h_length
) -
386 sizeof(struct dlm_rcom
);
388 /* Old code would send this special id to trigger a debug dump. */
389 if (rc_in
->rc_id
== cpu_to_le64(0xFFFFFFFF)) {
390 log_error(ls
, "receive_rcom_lookup dump from %d", nodeid
);
391 dlm_dump_rsb_name(ls
, rc_in
->rc_buf
, len
);
395 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOOKUP_REPLY
, 0, &rc
, &mh
,
400 error
= dlm_master_lookup(ls
, nodeid
, rc_in
->rc_buf
, len
,
401 DLM_LU_RECOVER_MASTER
, &ret_nodeid
, NULL
);
404 rc
->rc_result
= cpu_to_le32(ret_nodeid
);
405 rc
->rc_id
= rc_in
->rc_id
;
406 rc
->rc_seq_reply
= rc_in
->rc_seq
;
411 static void receive_rcom_lookup_reply(struct dlm_ls
*ls
,
412 const struct dlm_rcom
*rc_in
)
414 dlm_recover_master_reply(ls
, rc_in
);
417 static void pack_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
418 struct rcom_lock
*rl
)
420 memset(rl
, 0, sizeof(*rl
));
422 rl
->rl_ownpid
= cpu_to_le32(lkb
->lkb_ownpid
);
423 rl
->rl_lkid
= cpu_to_le32(lkb
->lkb_id
);
424 rl
->rl_exflags
= cpu_to_le32(lkb
->lkb_exflags
);
425 rl
->rl_flags
= cpu_to_le32(dlm_dflags_val(lkb
));
426 rl
->rl_lvbseq
= cpu_to_le32(lkb
->lkb_lvbseq
);
427 rl
->rl_rqmode
= lkb
->lkb_rqmode
;
428 rl
->rl_grmode
= lkb
->lkb_grmode
;
429 rl
->rl_status
= lkb
->lkb_status
;
430 rl
->rl_wait_type
= cpu_to_le16(lkb
->lkb_wait_type
);
433 rl
->rl_asts
|= DLM_CB_BAST
;
435 rl
->rl_asts
|= DLM_CB_CAST
;
437 rl
->rl_namelen
= cpu_to_le16(r
->res_length
);
438 memcpy(rl
->rl_name
, r
->res_name
, r
->res_length
);
440 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
441 If so, receive_rcom_lock_args() won't take this copy. */
444 memcpy(rl
->rl_lvb
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
447 int dlm_send_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, uint64_t seq
)
449 struct dlm_ls
*ls
= r
->res_ls
;
451 struct dlm_mhandle
*mh
;
452 struct rcom_lock
*rl
;
453 int error
, len
= sizeof(struct rcom_lock
);
456 len
+= ls
->ls_lvblen
;
458 error
= create_rcom(ls
, r
->res_nodeid
, DLM_RCOM_LOCK
, len
, &rc
, &mh
,
463 rl
= (struct rcom_lock
*) rc
->rc_buf
;
464 pack_rcom_lock(r
, lkb
, rl
);
465 rc
->rc_id
= cpu_to_le64((uintptr_t)r
);
472 /* needs at least dlm_rcom + rcom_lock */
473 static void receive_rcom_lock(struct dlm_ls
*ls
, const struct dlm_rcom
*rc_in
,
476 __le32 rl_remid
, rl_result
;
477 struct rcom_lock
*rl
;
479 struct dlm_mhandle
*mh
;
480 int error
, nodeid
= le32_to_cpu(rc_in
->rc_header
.h_nodeid
);
482 dlm_recover_master_copy(ls
, rc_in
, &rl_remid
, &rl_result
);
484 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOCK_REPLY
,
485 sizeof(struct rcom_lock
), &rc
, &mh
, seq
);
489 memcpy(rc
->rc_buf
, rc_in
->rc_buf
, sizeof(struct rcom_lock
));
490 rl
= (struct rcom_lock
*)rc
->rc_buf
;
491 /* set rl_remid and rl_result from dlm_recover_master_copy() */
492 rl
->rl_remid
= rl_remid
;
493 rl
->rl_result
= rl_result
;
495 rc
->rc_id
= rc_in
->rc_id
;
496 rc
->rc_seq_reply
= rc_in
->rc_seq
;
501 /* If the lockspace doesn't exist then still send a status message
502 back; it's possible that it just doesn't have its global_id yet. */
504 int dlm_send_ls_not_ready(int nodeid
, const struct dlm_rcom
*rc_in
)
507 struct rcom_config
*rf
;
508 struct dlm_mhandle
*mh
;
510 int mb_len
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_config
);
512 mh
= dlm_midcomms_get_mhandle(nodeid
, mb_len
, &mb
);
516 rc
= (struct dlm_rcom
*) mb
;
518 rc
->rc_header
.h_version
= cpu_to_le32(DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
519 rc
->rc_header
.u
.h_lockspace
= rc_in
->rc_header
.u
.h_lockspace
;
520 rc
->rc_header
.h_nodeid
= cpu_to_le32(dlm_our_nodeid());
521 rc
->rc_header
.h_length
= cpu_to_le16(mb_len
);
522 rc
->rc_header
.h_cmd
= DLM_RCOM
;
524 rc
->rc_type
= cpu_to_le32(DLM_RCOM_STATUS_REPLY
);
525 rc
->rc_id
= rc_in
->rc_id
;
526 rc
->rc_seq_reply
= rc_in
->rc_seq
;
527 rc
->rc_result
= cpu_to_le32(-ESRCH
);
529 rf
= (struct rcom_config
*) rc
->rc_buf
;
530 rf
->rf_lvblen
= cpu_to_le32(~0U);
532 dlm_midcomms_commit_mhandle(mh
, NULL
, 0);
538 * Ignore messages for stage Y before we set
539 * recover_status bit for stage X:
543 * dlm_recover_members()
546 * - ignore NAMES, NAMES_REPLY
547 * - ignore LOOKUP, LOOKUP_REPLY
548 * - ignore LOCK, LOCK_REPLY
550 * recover_status |= NODES
552 * dlm_recover_members_wait()
554 * dlm_recover_directory()
557 * - ignore LOOKUP, LOOKUP_REPLY
558 * - ignore LOCK, LOCK_REPLY
560 * recover_status |= DIR
562 * dlm_recover_directory_wait()
564 * dlm_recover_masters()
566 * - recv LOOKUP_REPLY
568 * dlm_recover_locks()
572 * recover_status |= LOCKS
574 * dlm_recover_locks_wait()
576 * recover_status |= DONE
579 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
580 recovery-only comms are sent through here. */
582 void dlm_receive_rcom(struct dlm_ls
*ls
, const struct dlm_rcom
*rc
, int nodeid
)
584 int lock_size
= sizeof(struct dlm_rcom
) + sizeof(struct rcom_lock
);
585 int stop
, reply
= 0, names
= 0, lookup
= 0, lock
= 0;
589 switch (rc
->rc_type
) {
590 case cpu_to_le32(DLM_RCOM_STATUS_REPLY
):
593 case cpu_to_le32(DLM_RCOM_NAMES
):
596 case cpu_to_le32(DLM_RCOM_NAMES_REPLY
):
600 case cpu_to_le32(DLM_RCOM_LOOKUP
):
603 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY
):
607 case cpu_to_le32(DLM_RCOM_LOCK
):
610 case cpu_to_le32(DLM_RCOM_LOCK_REPLY
):
616 spin_lock_bh(&ls
->ls_recover_lock
);
617 status
= ls
->ls_recover_status
;
618 stop
= dlm_recovery_stopped(ls
);
619 seq
= ls
->ls_recover_seq
;
620 spin_unlock_bh(&ls
->ls_recover_lock
);
622 if (stop
&& (rc
->rc_type
!= cpu_to_le32(DLM_RCOM_STATUS
)))
625 if (reply
&& (le64_to_cpu(rc
->rc_seq_reply
) != seq
))
628 if (!(status
& DLM_RS_NODES
) && (names
|| lookup
|| lock
))
631 if (!(status
& DLM_RS_DIR
) && (lookup
|| lock
))
634 switch (rc
->rc_type
) {
635 case cpu_to_le32(DLM_RCOM_STATUS
):
636 receive_rcom_status(ls
, rc
, seq
);
639 case cpu_to_le32(DLM_RCOM_NAMES
):
640 receive_rcom_names(ls
, rc
, seq
);
643 case cpu_to_le32(DLM_RCOM_LOOKUP
):
644 receive_rcom_lookup(ls
, rc
, seq
);
647 case cpu_to_le32(DLM_RCOM_LOCK
):
648 if (le16_to_cpu(rc
->rc_header
.h_length
) < lock_size
)
650 receive_rcom_lock(ls
, rc
, seq
);
653 case cpu_to_le32(DLM_RCOM_STATUS_REPLY
):
654 receive_sync_reply(ls
, rc
);
657 case cpu_to_le32(DLM_RCOM_NAMES_REPLY
):
658 receive_sync_reply(ls
, rc
);
661 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY
):
662 receive_rcom_lookup_reply(ls
, rc
);
665 case cpu_to_le32(DLM_RCOM_LOCK_REPLY
):
666 if (le16_to_cpu(rc
->rc_header
.h_length
) < lock_size
)
668 dlm_recover_process_copy(ls
, rc
, seq
);
672 log_error(ls
, "receive_rcom bad type %d",
673 le32_to_cpu(rc
->rc_type
));
678 log_limit(ls
, "dlm_receive_rcom ignore msg %d "
679 "from %d %llu %llu recover seq %llu sts %x gen %u",
680 le32_to_cpu(rc
->rc_type
),
682 (unsigned long long)le64_to_cpu(rc
->rc_seq
),
683 (unsigned long long)le64_to_cpu(rc
->rc_seq_reply
),
684 (unsigned long long)seq
,
685 status
, ls
->ls_generation
);
688 log_error(ls
, "recovery message %d from %d is too short",
689 le32_to_cpu(rc
->rc_type
), nodeid
);