1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
28 static int rcom_response(struct dlm_ls
*ls
)
30 return test_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
33 static int create_rcom(struct dlm_ls
*ls
, int to_nodeid
, int type
, int len
,
34 struct dlm_rcom
**rc_ret
, struct dlm_mhandle
**mh_ret
)
37 struct dlm_mhandle
*mh
;
39 int mb_len
= sizeof(struct dlm_rcom
) + len
;
41 mh
= dlm_lowcomms_get_buffer(to_nodeid
, mb_len
, GFP_KERNEL
, &mb
);
43 log_print("create_rcom to %d type %d len %d ENOBUFS",
44 to_nodeid
, type
, len
);
47 memset(mb
, 0, mb_len
);
49 rc
= (struct dlm_rcom
*) mb
;
51 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
52 rc
->rc_header
.h_lockspace
= ls
->ls_global_id
;
53 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
54 rc
->rc_header
.h_length
= mb_len
;
55 rc
->rc_header
.h_cmd
= DLM_RCOM
;
64 static void send_rcom(struct dlm_ls
*ls
, struct dlm_mhandle
*mh
,
68 dlm_lowcomms_commit_buffer(mh
);
71 /* When replying to a status request, a node also sends back its
72 configuration values. The requesting node then checks that the remote
73 node is configured the same way as itself. */
75 static void make_config(struct dlm_ls
*ls
, struct rcom_config
*rf
)
77 rf
->rf_lvblen
= ls
->ls_lvblen
;
78 rf
->rf_lsflags
= ls
->ls_exflags
;
81 static int check_config(struct dlm_ls
*ls
, struct rcom_config
*rf
, int nodeid
)
83 if (rf
->rf_lvblen
!= ls
->ls_lvblen
||
84 rf
->rf_lsflags
!= ls
->ls_exflags
) {
85 log_error(ls
, "config mismatch: %d,%x nodeid %d: %d,%x",
86 ls
->ls_lvblen
, ls
->ls_exflags
,
87 nodeid
, rf
->rf_lvblen
, rf
->rf_lsflags
);
93 int dlm_rcom_status(struct dlm_ls
*ls
, int nodeid
)
96 struct dlm_mhandle
*mh
;
99 memset(ls
->ls_recover_buf
, 0, dlm_config
.buffer_size
);
100 ls
->ls_recover_nodeid
= nodeid
;
102 if (nodeid
== dlm_our_nodeid()) {
103 rc
= (struct dlm_rcom
*) ls
->ls_recover_buf
;
104 rc
->rc_result
= dlm_recover_status(ls
);
108 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS
, 0, &rc
, &mh
);
111 rc
->rc_id
= ++ls
->ls_rcom_seq
;
113 send_rcom(ls
, mh
, rc
);
115 error
= dlm_wait_function(ls
, &rcom_response
);
116 clear_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
120 rc
= (struct dlm_rcom
*) ls
->ls_recover_buf
;
122 if (rc
->rc_result
== -ESRCH
) {
123 /* we pretend the remote lockspace exists with 0 status */
124 log_debug(ls
, "remote node %d not ready", nodeid
);
127 error
= check_config(ls
, (struct rcom_config
*) rc
->rc_buf
,
129 /* the caller looks at rc_result for the remote recovery status */
134 static void receive_rcom_status(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
137 struct dlm_mhandle
*mh
;
138 int error
, nodeid
= rc_in
->rc_header
.h_nodeid
;
140 error
= create_rcom(ls
, nodeid
, DLM_RCOM_STATUS_REPLY
,
141 sizeof(struct rcom_config
), &rc
, &mh
);
144 rc
->rc_id
= rc_in
->rc_id
;
145 rc
->rc_result
= dlm_recover_status(ls
);
146 make_config(ls
, (struct rcom_config
*) rc
->rc_buf
);
148 send_rcom(ls
, mh
, rc
);
151 static void receive_sync_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
153 if (rc_in
->rc_id
!= ls
->ls_rcom_seq
) {
154 log_debug(ls
, "reject old reply %d got %llx wanted %llx",
155 rc_in
->rc_type
, rc_in
->rc_id
, ls
->ls_rcom_seq
);
158 memcpy(ls
->ls_recover_buf
, rc_in
, rc_in
->rc_header
.h_length
);
159 set_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
160 wake_up(&ls
->ls_wait_general
);
163 static void receive_rcom_status_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
165 receive_sync_reply(ls
, rc_in
);
168 int dlm_rcom_names(struct dlm_ls
*ls
, int nodeid
, char *last_name
, int last_len
)
171 struct dlm_mhandle
*mh
;
172 int error
= 0, len
= sizeof(struct dlm_rcom
);
174 memset(ls
->ls_recover_buf
, 0, dlm_config
.buffer_size
);
175 ls
->ls_recover_nodeid
= nodeid
;
177 if (nodeid
== dlm_our_nodeid()) {
178 dlm_copy_master_names(ls
, last_name
, last_len
,
179 ls
->ls_recover_buf
+ len
,
180 dlm_config
.buffer_size
- len
, nodeid
);
184 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES
, last_len
, &rc
, &mh
);
187 memcpy(rc
->rc_buf
, last_name
, last_len
);
188 rc
->rc_id
= ++ls
->ls_rcom_seq
;
190 send_rcom(ls
, mh
, rc
);
192 error
= dlm_wait_function(ls
, &rcom_response
);
193 clear_bit(LSFL_RCOM_READY
, &ls
->ls_flags
);
198 static void receive_rcom_names(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
201 struct dlm_mhandle
*mh
;
202 int error
, inlen
, outlen
;
203 int nodeid
= rc_in
->rc_header
.h_nodeid
;
204 uint32_t status
= dlm_recover_status(ls
);
207 * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
208 * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
209 * It could only happen in rare cases where we get a late NAMES
210 * message from a previous instance of recovery.
213 if (!(status
& DLM_RS_NODES
)) {
214 log_debug(ls
, "ignoring RCOM_NAMES from %u", nodeid
);
218 nodeid
= rc_in
->rc_header
.h_nodeid
;
219 inlen
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
220 outlen
= dlm_config
.buffer_size
- sizeof(struct dlm_rcom
);
222 error
= create_rcom(ls
, nodeid
, DLM_RCOM_NAMES_REPLY
, outlen
, &rc
, &mh
);
225 rc
->rc_id
= rc_in
->rc_id
;
227 dlm_copy_master_names(ls
, rc_in
->rc_buf
, inlen
, rc
->rc_buf
, outlen
,
229 send_rcom(ls
, mh
, rc
);
232 static void receive_rcom_names_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
234 receive_sync_reply(ls
, rc_in
);
237 int dlm_send_rcom_lookup(struct dlm_rsb
*r
, int dir_nodeid
)
240 struct dlm_mhandle
*mh
;
241 struct dlm_ls
*ls
= r
->res_ls
;
244 error
= create_rcom(ls
, dir_nodeid
, DLM_RCOM_LOOKUP
, r
->res_length
,
248 memcpy(rc
->rc_buf
, r
->res_name
, r
->res_length
);
249 rc
->rc_id
= (unsigned long) r
;
251 send_rcom(ls
, mh
, rc
);
256 static void receive_rcom_lookup(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
259 struct dlm_mhandle
*mh
;
260 int error
, ret_nodeid
, nodeid
= rc_in
->rc_header
.h_nodeid
;
261 int len
= rc_in
->rc_header
.h_length
- sizeof(struct dlm_rcom
);
263 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOOKUP_REPLY
, 0, &rc
, &mh
);
267 error
= dlm_dir_lookup(ls
, nodeid
, rc_in
->rc_buf
, len
, &ret_nodeid
);
270 rc
->rc_result
= ret_nodeid
;
271 rc
->rc_id
= rc_in
->rc_id
;
273 send_rcom(ls
, mh
, rc
);
276 static void receive_rcom_lookup_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
278 dlm_recover_master_reply(ls
, rc_in
);
281 static void pack_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
282 struct rcom_lock
*rl
)
284 memset(rl
, 0, sizeof(*rl
));
286 rl
->rl_ownpid
= lkb
->lkb_ownpid
;
287 rl
->rl_lkid
= lkb
->lkb_id
;
288 rl
->rl_exflags
= lkb
->lkb_exflags
;
289 rl
->rl_flags
= lkb
->lkb_flags
;
290 rl
->rl_lvbseq
= lkb
->lkb_lvbseq
;
291 rl
->rl_rqmode
= lkb
->lkb_rqmode
;
292 rl
->rl_grmode
= lkb
->lkb_grmode
;
293 rl
->rl_status
= lkb
->lkb_status
;
294 rl
->rl_wait_type
= lkb
->lkb_wait_type
;
296 if (lkb
->lkb_bastaddr
)
297 rl
->rl_asts
|= AST_BAST
;
298 if (lkb
->lkb_astaddr
)
299 rl
->rl_asts
|= AST_COMP
;
301 rl
->rl_namelen
= r
->res_length
;
302 memcpy(rl
->rl_name
, r
->res_name
, r
->res_length
);
304 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
305 If so, receive_rcom_lock_args() won't take this copy. */
308 memcpy(rl
->rl_lvb
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
311 int dlm_send_rcom_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
313 struct dlm_ls
*ls
= r
->res_ls
;
315 struct dlm_mhandle
*mh
;
316 struct rcom_lock
*rl
;
317 int error
, len
= sizeof(struct rcom_lock
);
320 len
+= ls
->ls_lvblen
;
322 error
= create_rcom(ls
, r
->res_nodeid
, DLM_RCOM_LOCK
, len
, &rc
, &mh
);
326 rl
= (struct rcom_lock
*) rc
->rc_buf
;
327 pack_rcom_lock(r
, lkb
, rl
);
328 rc
->rc_id
= (unsigned long) r
;
330 send_rcom(ls
, mh
, rc
);
335 static void receive_rcom_lock(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
338 struct dlm_mhandle
*mh
;
339 int error
, nodeid
= rc_in
->rc_header
.h_nodeid
;
341 dlm_recover_master_copy(ls
, rc_in
);
343 error
= create_rcom(ls
, nodeid
, DLM_RCOM_LOCK_REPLY
,
344 sizeof(struct rcom_lock
), &rc
, &mh
);
348 /* We send back the same rcom_lock struct we received, but
349 dlm_recover_master_copy() has filled in rl_remid and rl_result */
351 memcpy(rc
->rc_buf
, rc_in
->rc_buf
, sizeof(struct rcom_lock
));
352 rc
->rc_id
= rc_in
->rc_id
;
354 send_rcom(ls
, mh
, rc
);
357 static void receive_rcom_lock_reply(struct dlm_ls
*ls
, struct dlm_rcom
*rc_in
)
359 uint32_t status
= dlm_recover_status(ls
);
361 if (!(status
& DLM_RS_DIR
)) {
362 log_debug(ls
, "ignoring RCOM_LOCK_REPLY from %u",
363 rc_in
->rc_header
.h_nodeid
);
367 dlm_recover_process_copy(ls
, rc_in
);
370 static int send_ls_not_ready(int nodeid
, struct dlm_rcom
*rc_in
)
373 struct dlm_mhandle
*mh
;
375 int mb_len
= sizeof(struct dlm_rcom
);
377 mh
= dlm_lowcomms_get_buffer(nodeid
, mb_len
, GFP_KERNEL
, &mb
);
380 memset(mb
, 0, mb_len
);
382 rc
= (struct dlm_rcom
*) mb
;
384 rc
->rc_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
385 rc
->rc_header
.h_lockspace
= rc_in
->rc_header
.h_lockspace
;
386 rc
->rc_header
.h_nodeid
= dlm_our_nodeid();
387 rc
->rc_header
.h_length
= mb_len
;
388 rc
->rc_header
.h_cmd
= DLM_RCOM
;
390 rc
->rc_type
= DLM_RCOM_STATUS_REPLY
;
391 rc
->rc_id
= rc_in
->rc_id
;
392 rc
->rc_result
= -ESRCH
;
395 dlm_lowcomms_commit_buffer(mh
);
400 /* Called by dlm_recvd; corresponds to dlm_receive_message() but special
401 recovery-only comms are sent through here. */
403 void dlm_receive_rcom(struct dlm_header
*hd
, int nodeid
)
405 struct dlm_rcom
*rc
= (struct dlm_rcom
*) hd
;
410 /* If the lockspace doesn't exist then still send a status message
411 back; it's possible that it just doesn't have its global_id yet. */
413 ls
= dlm_find_lockspace_global(hd
->h_lockspace
);
415 log_print("lockspace %x from %d not found",
416 hd
->h_lockspace
, nodeid
);
417 send_ls_not_ready(nodeid
, rc
);
421 if (dlm_recovery_stopped(ls
) && (rc
->rc_type
!= DLM_RCOM_STATUS
)) {
422 log_error(ls
, "ignoring recovery message %x from %d",
423 rc
->rc_type
, nodeid
);
427 if (nodeid
!= rc
->rc_header
.h_nodeid
) {
428 log_error(ls
, "bad rcom nodeid %d from %d",
429 rc
->rc_header
.h_nodeid
, nodeid
);
433 switch (rc
->rc_type
) {
434 case DLM_RCOM_STATUS
:
435 receive_rcom_status(ls
, rc
);
439 receive_rcom_names(ls
, rc
);
442 case DLM_RCOM_LOOKUP
:
443 receive_rcom_lookup(ls
, rc
);
447 receive_rcom_lock(ls
, rc
);
450 case DLM_RCOM_STATUS_REPLY
:
451 receive_rcom_status_reply(ls
, rc
);
454 case DLM_RCOM_NAMES_REPLY
:
455 receive_rcom_names_reply(ls
, rc
);
458 case DLM_RCOM_LOOKUP_REPLY
:
459 receive_rcom_lookup_reply(ls
, rc
);
462 case DLM_RCOM_LOCK_REPLY
:
463 receive_rcom_lock_reply(ls
, rc
);
467 DLM_ASSERT(0, printk("rc_type=%x\n", rc
->rc_type
););
470 dlm_put_lockspace(ls
);