1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* -*- mode: c; c-basic-offset: 8; -*-
3 * vim: noexpandtab sw=8 ts=8 sts=0:
7 * underlying calls for lock conversion
9 * Copyright (C) 2004 Oracle. All rights reserved.
13 #include <linux/module.h>
15 #include <linux/types.h>
16 #include <linux/highmem.h>
17 #include <linux/init.h>
18 #include <linux/sysctl.h>
19 #include <linux/random.h>
20 #include <linux/blkdev.h>
21 #include <linux/socket.h>
22 #include <linux/inet.h>
23 #include <linux/spinlock.h>
26 #include "cluster/heartbeat.h"
27 #include "cluster/nodemanager.h"
28 #include "cluster/tcp.h"
31 #include "dlmcommon.h"
33 #include "dlmconvert.h"
35 #define MLOG_MASK_PREFIX ML_DLM
36 #include "cluster/masklog.h"
38 /* NOTE: __dlmconvert_master is the only function in here that
39 * needs a spinlock held on entry (res->spinlock) and it is the
40 * only one that holds a lock on exit (res->spinlock).
41 * All other functions in here need no locks and drop all of
42 * the locks that they acquire. */
43 static enum dlm_status
__dlmconvert_master(struct dlm_ctxt
*dlm
,
44 struct dlm_lock_resource
*res
,
45 struct dlm_lock
*lock
, int flags
,
46 int type
, int *call_ast
,
48 static enum dlm_status
dlm_send_remote_convert_request(struct dlm_ctxt
*dlm
,
49 struct dlm_lock_resource
*res
,
50 struct dlm_lock
*lock
, int flags
, int type
);
53 * this is only called directly by dlmlock(), and only when the
54 * local node is the owner of the lockres
57 * taken: takes and drops res->spinlock
59 * returns: see __dlmconvert_master
61 enum dlm_status
dlmconvert_master(struct dlm_ctxt
*dlm
,
62 struct dlm_lock_resource
*res
,
63 struct dlm_lock
*lock
, int flags
, int type
)
65 int call_ast
= 0, kick_thread
= 0;
66 enum dlm_status status
;
68 spin_lock(&res
->spinlock
);
69 /* we are not in a network handler, this is fine */
70 __dlm_wait_on_lockres(res
);
71 __dlm_lockres_reserve_ast(res
);
72 res
->state
|= DLM_LOCK_RES_IN_PROGRESS
;
74 status
= __dlmconvert_master(dlm
, res
, lock
, flags
, type
,
75 &call_ast
, &kick_thread
);
77 res
->state
&= ~DLM_LOCK_RES_IN_PROGRESS
;
78 spin_unlock(&res
->spinlock
);
80 if (status
!= DLM_NORMAL
&& status
!= DLM_NOTQUEUED
)
83 /* either queue the ast or release it */
85 dlm_queue_ast(dlm
, lock
);
87 dlm_lockres_release_ast(dlm
, res
);
90 dlm_kick_thread(dlm
, res
);
95 /* performs lock conversion at the lockres master site
97 * caller needs: res->spinlock
98 * taken: takes and drops lock->spinlock
99 * held on exit: res->spinlock
100 * returns: DLM_NORMAL, DLM_NOTQUEUED, DLM_DENIED
101 * call_ast: whether ast should be called for this lock
102 * kick_thread: whether dlm_kick_thread should be called
104 static enum dlm_status
__dlmconvert_master(struct dlm_ctxt
*dlm
,
105 struct dlm_lock_resource
*res
,
106 struct dlm_lock
*lock
, int flags
,
107 int type
, int *call_ast
,
110 enum dlm_status status
= DLM_NORMAL
;
111 struct dlm_lock
*tmplock
=NULL
;
113 assert_spin_locked(&res
->spinlock
);
115 mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n",
116 lock
->ml
.type
, lock
->ml
.convert_type
, type
);
118 spin_lock(&lock
->spinlock
);
120 /* already converting? */
121 if (lock
->ml
.convert_type
!= LKM_IVMODE
) {
122 mlog(ML_ERROR
, "attempted to convert a lock with a lock "
123 "conversion pending\n");
128 /* must be on grant queue to convert */
129 if (!dlm_lock_on_list(&res
->granted
, lock
)) {
130 mlog(ML_ERROR
, "attempted to convert a lock not on grant "
136 if (flags
& LKM_VALBLK
) {
137 switch (lock
->ml
.type
) {
139 /* EX + LKM_VALBLK + convert == set lvb */
140 mlog(0, "will set lvb: converting %s->%s\n",
141 dlm_lock_mode_name(lock
->ml
.type
),
142 dlm_lock_mode_name(type
));
143 lock
->lksb
->flags
|= DLM_LKSB_PUT_LVB
;
147 /* refetch if new level is not NL */
148 if (type
> LKM_NLMODE
) {
149 mlog(0, "will fetch new value into "
150 "lvb: converting %s->%s\n",
151 dlm_lock_mode_name(lock
->ml
.type
),
152 dlm_lock_mode_name(type
));
153 lock
->lksb
->flags
|= DLM_LKSB_GET_LVB
;
155 mlog(0, "will NOT fetch new value "
156 "into lvb: converting %s->%s\n",
157 dlm_lock_mode_name(lock
->ml
.type
),
158 dlm_lock_mode_name(type
));
159 flags
&= ~(LKM_VALBLK
);
166 /* in-place downconvert? */
167 if (type
<= lock
->ml
.type
)
170 /* upconvert from here on */
172 list_for_each_entry(tmplock
, &res
->granted
, list
) {
175 if (!dlm_lock_compatible(tmplock
->ml
.type
, type
))
179 list_for_each_entry(tmplock
, &res
->converting
, list
) {
180 if (!dlm_lock_compatible(tmplock
->ml
.type
, type
))
182 /* existing conversion requests take precedence */
183 if (!dlm_lock_compatible(tmplock
->ml
.convert_type
, type
))
187 /* fall thru to grant */
190 mlog(0, "res %.*s, granting %s lock\n", res
->lockname
.len
,
191 res
->lockname
.name
, dlm_lock_mode_name(type
));
192 /* immediately grant the new lock type */
193 lock
->lksb
->status
= DLM_NORMAL
;
194 if (lock
->ml
.node
== dlm
->node_num
)
195 mlog(0, "doing in-place convert for nonlocal lock\n");
196 lock
->ml
.type
= type
;
197 if (lock
->lksb
->flags
& DLM_LKSB_PUT_LVB
)
198 memcpy(res
->lvb
, lock
->lksb
->lvb
, DLM_LVB_LEN
);
201 * Move the lock to the tail because it may be the only lock which has
204 list_move_tail(&lock
->list
, &res
->granted
);
211 if (flags
& LKM_NOQUEUE
) {
212 mlog(0, "failed to convert NOQUEUE lock %.*s from "
213 "%d to %d...\n", res
->lockname
.len
, res
->lockname
.name
,
214 lock
->ml
.type
, type
);
215 status
= DLM_NOTQUEUED
;
218 mlog(0, "res %.*s, queueing...\n", res
->lockname
.len
,
221 lock
->ml
.convert_type
= type
;
222 /* do not alter lock refcount. switching lists. */
223 list_move_tail(&lock
->list
, &res
->converting
);
226 spin_unlock(&lock
->spinlock
);
227 if (status
== DLM_DENIED
) {
228 __dlm_print_one_lock_resource(res
);
230 if (status
== DLM_NORMAL
)
235 void dlm_revert_pending_convert(struct dlm_lock_resource
*res
,
236 struct dlm_lock
*lock
)
238 /* do not alter lock refcount. switching lists. */
239 list_move_tail(&lock
->list
, &res
->granted
);
240 lock
->ml
.convert_type
= LKM_IVMODE
;
241 lock
->lksb
->flags
&= ~(DLM_LKSB_GET_LVB
|DLM_LKSB_PUT_LVB
);
244 /* messages the master site to do lock conversion
247 * taken: takes and drops res->spinlock, uses DLM_LOCK_RES_IN_PROGRESS
249 * returns: DLM_NORMAL, DLM_RECOVERING, status from remote node
251 enum dlm_status
dlmconvert_remote(struct dlm_ctxt
*dlm
,
252 struct dlm_lock_resource
*res
,
253 struct dlm_lock
*lock
, int flags
, int type
)
255 enum dlm_status status
;
257 mlog(0, "type=%d, convert_type=%d, busy=%d\n", lock
->ml
.type
,
258 lock
->ml
.convert_type
, res
->state
& DLM_LOCK_RES_IN_PROGRESS
);
260 spin_lock(&res
->spinlock
);
261 if (res
->state
& DLM_LOCK_RES_RECOVERING
) {
262 mlog(0, "bailing out early since res is RECOVERING "
263 "on secondary queue\n");
264 /* __dlm_print_one_lock_resource(res); */
265 status
= DLM_RECOVERING
;
268 /* will exit this call with spinlock held */
269 __dlm_wait_on_lockres(res
);
271 if (lock
->ml
.convert_type
!= LKM_IVMODE
) {
272 __dlm_print_one_lock_resource(res
);
273 mlog(ML_ERROR
, "converting a remote lock that is already "
274 "converting! (cookie=%u:%llu, conv=%d)\n",
275 dlm_get_lock_cookie_node(be64_to_cpu(lock
->ml
.cookie
)),
276 dlm_get_lock_cookie_seq(be64_to_cpu(lock
->ml
.cookie
)),
277 lock
->ml
.convert_type
);
282 if (lock
->ml
.type
== type
&& lock
->ml
.convert_type
== LKM_IVMODE
) {
283 mlog(0, "last convert request returned DLM_RECOVERING, but "
284 "owner has already queued and sent ast to me. res %.*s, "
285 "(cookie=%u:%llu, type=%d, conv=%d)\n",
286 res
->lockname
.len
, res
->lockname
.name
,
287 dlm_get_lock_cookie_node(be64_to_cpu(lock
->ml
.cookie
)),
288 dlm_get_lock_cookie_seq(be64_to_cpu(lock
->ml
.cookie
)),
289 lock
->ml
.type
, lock
->ml
.convert_type
);
294 res
->state
|= DLM_LOCK_RES_IN_PROGRESS
;
295 /* move lock to local convert queue */
296 /* do not alter lock refcount. switching lists. */
297 list_move_tail(&lock
->list
, &res
->converting
);
298 lock
->convert_pending
= 1;
299 lock
->ml
.convert_type
= type
;
301 if (flags
& LKM_VALBLK
) {
302 if (lock
->ml
.type
== LKM_EXMODE
) {
303 flags
|= LKM_PUT_LVB
;
304 lock
->lksb
->flags
|= DLM_LKSB_PUT_LVB
;
306 if (lock
->ml
.convert_type
== LKM_NLMODE
)
307 flags
&= ~LKM_VALBLK
;
309 flags
|= LKM_GET_LVB
;
310 lock
->lksb
->flags
|= DLM_LKSB_GET_LVB
;
314 spin_unlock(&res
->spinlock
);
316 /* no locks held here.
317 * need to wait for a reply as to whether it got queued or not. */
318 status
= dlm_send_remote_convert_request(dlm
, res
, lock
, flags
, type
);
320 spin_lock(&res
->spinlock
);
321 res
->state
&= ~DLM_LOCK_RES_IN_PROGRESS
;
322 /* if it failed, move it back to granted queue.
323 * if master returns DLM_NORMAL and then down before sending ast,
324 * it may have already been moved to granted queue, reset to
325 * DLM_RECOVERING and retry convert */
326 if (status
!= DLM_NORMAL
) {
327 if (status
!= DLM_NOTQUEUED
)
329 dlm_revert_pending_convert(res
, lock
);
330 } else if (!lock
->convert_pending
) {
331 mlog(0, "%s: res %.*s, owner died and lock has been moved back "
332 "to granted list, retry convert.\n",
333 dlm
->name
, res
->lockname
.len
, res
->lockname
.name
);
334 status
= DLM_RECOVERING
;
337 lock
->convert_pending
= 0;
339 spin_unlock(&res
->spinlock
);
341 /* TODO: should this be a wake_one? */
342 /* wake up any IN_PROGRESS waiters */
348 /* sends DLM_CONVERT_LOCK_MSG to master site
353 * returns: DLM_NOLOCKMGR, status from remote node
355 static enum dlm_status
dlm_send_remote_convert_request(struct dlm_ctxt
*dlm
,
356 struct dlm_lock_resource
*res
,
357 struct dlm_lock
*lock
, int flags
, int type
)
359 struct dlm_convert_lock convert
;
366 mlog(0, "%.*s\n", res
->lockname
.len
, res
->lockname
.name
);
368 memset(&convert
, 0, sizeof(struct dlm_convert_lock
));
369 convert
.node_idx
= dlm
->node_num
;
370 convert
.requested_type
= type
;
371 convert
.cookie
= lock
->ml
.cookie
;
372 convert
.namelen
= res
->lockname
.len
;
373 convert
.flags
= cpu_to_be32(flags
);
374 memcpy(convert
.name
, res
->lockname
.name
, convert
.namelen
);
376 vec
[0].iov_len
= sizeof(struct dlm_convert_lock
);
377 vec
[0].iov_base
= &convert
;
379 if (flags
& LKM_PUT_LVB
) {
380 /* extra data to send if we are updating lvb */
381 vec
[1].iov_len
= DLM_LVB_LEN
;
382 vec
[1].iov_base
= lock
->lksb
->lvb
;
386 tmpret
= o2net_send_message_vec(DLM_CONVERT_LOCK_MSG
, dlm
->key
,
387 vec
, veclen
, res
->owner
, &status
);
389 // successfully sent and received
390 ret
= status
; // this is already a dlm_status
391 if (ret
== DLM_RECOVERING
) {
392 mlog(0, "node %u returned DLM_RECOVERING from convert "
393 "message!\n", res
->owner
);
394 } else if (ret
== DLM_MIGRATING
) {
395 mlog(0, "node %u returned DLM_MIGRATING from convert "
396 "message!\n", res
->owner
);
397 } else if (ret
== DLM_FORWARD
) {
398 mlog(0, "node %u returned DLM_FORWARD from convert "
399 "message!\n", res
->owner
);
400 } else if (ret
!= DLM_NORMAL
&& ret
!= DLM_NOTQUEUED
)
403 mlog(ML_ERROR
, "Error %d when sending message %u (key 0x%x) to "
404 "node %u\n", tmpret
, DLM_CONVERT_LOCK_MSG
, dlm
->key
,
406 if (dlm_is_host_down(tmpret
)) {
407 /* instead of logging the same network error over
408 * and over, sleep here and wait for the heartbeat
409 * to notice the node is dead. times out after 5s. */
410 dlm_wait_for_node_death(dlm
, res
->owner
,
411 DLM_NODE_DEATH_WAIT_MAX
);
412 ret
= DLM_RECOVERING
;
413 mlog(0, "node %u died so returning DLM_RECOVERING "
414 "from convert message!\n", res
->owner
);
416 ret
= dlm_err_to_dlm_status(tmpret
);
423 /* handler for DLM_CONVERT_LOCK_MSG on master site
426 * taken: takes and drop res->spinlock
428 * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
429 * status from __dlmconvert_master
431 int dlm_convert_lock_handler(struct o2net_msg
*msg
, u32 len
, void *data
,
434 struct dlm_ctxt
*dlm
= data
;
435 struct dlm_convert_lock
*cnv
= (struct dlm_convert_lock
*)msg
->buf
;
436 struct dlm_lock_resource
*res
= NULL
;
437 struct dlm_lock
*lock
= NULL
;
438 struct dlm_lock
*tmp_lock
;
439 struct dlm_lockstatus
*lksb
;
440 enum dlm_status status
= DLM_NORMAL
;
442 int call_ast
= 0, kick_thread
= 0, ast_reserved
= 0, wake
= 0;
444 if (!dlm_grab(dlm
)) {
445 dlm_error(DLM_REJECTED
);
449 mlog_bug_on_msg(!dlm_domain_fully_joined(dlm
),
450 "Domain %s not fully joined!\n", dlm
->name
);
452 if (cnv
->namelen
> DLM_LOCKID_NAME_MAX
) {
453 status
= DLM_IVBUFLEN
;
458 flags
= be32_to_cpu(cnv
->flags
);
460 if ((flags
& (LKM_PUT_LVB
|LKM_GET_LVB
)) ==
461 (LKM_PUT_LVB
|LKM_GET_LVB
)) {
462 mlog(ML_ERROR
, "both PUT and GET lvb specified\n");
463 status
= DLM_BADARGS
;
467 mlog(0, "lvb: %s\n", flags
& LKM_PUT_LVB
? "put lvb" :
468 (flags
& LKM_GET_LVB
? "get lvb" : "none"));
470 status
= DLM_IVLOCKID
;
471 res
= dlm_lookup_lockres(dlm
, cnv
->name
, cnv
->namelen
);
477 spin_lock(&res
->spinlock
);
478 status
= __dlm_lockres_state_to_status(res
);
479 if (status
!= DLM_NORMAL
) {
480 spin_unlock(&res
->spinlock
);
484 list_for_each_entry(tmp_lock
, &res
->granted
, list
) {
485 if (tmp_lock
->ml
.cookie
== cnv
->cookie
&&
486 tmp_lock
->ml
.node
== cnv
->node_idx
) {
492 spin_unlock(&res
->spinlock
);
494 status
= DLM_IVLOCKID
;
495 mlog(ML_ERROR
, "did not find lock to convert on grant queue! "
497 dlm_get_lock_cookie_node(be64_to_cpu(cnv
->cookie
)),
498 dlm_get_lock_cookie_seq(be64_to_cpu(cnv
->cookie
)));
499 dlm_print_one_lock_resource(res
);
506 /* see if caller needed to get/put lvb */
507 if (flags
& LKM_PUT_LVB
) {
508 BUG_ON(lksb
->flags
& (DLM_LKSB_PUT_LVB
|DLM_LKSB_GET_LVB
));
509 lksb
->flags
|= DLM_LKSB_PUT_LVB
;
510 memcpy(&lksb
->lvb
[0], &cnv
->lvb
[0], DLM_LVB_LEN
);
511 } else if (flags
& LKM_GET_LVB
) {
512 BUG_ON(lksb
->flags
& (DLM_LKSB_PUT_LVB
|DLM_LKSB_GET_LVB
));
513 lksb
->flags
|= DLM_LKSB_GET_LVB
;
516 spin_lock(&res
->spinlock
);
517 status
= __dlm_lockres_state_to_status(res
);
518 if (status
== DLM_NORMAL
) {
519 __dlm_lockres_reserve_ast(res
);
521 res
->state
|= DLM_LOCK_RES_IN_PROGRESS
;
522 status
= __dlmconvert_master(dlm
, res
, lock
, flags
,
524 &call_ast
, &kick_thread
);
525 res
->state
&= ~DLM_LOCK_RES_IN_PROGRESS
;
528 spin_unlock(&res
->spinlock
);
532 if (status
!= DLM_NORMAL
) {
533 if (status
!= DLM_NOTQUEUED
)
535 lksb
->flags
&= ~(DLM_LKSB_GET_LVB
|DLM_LKSB_PUT_LVB
);
542 /* either queue the ast or release it, if reserved */
544 dlm_queue_ast(dlm
, lock
);
545 else if (ast_reserved
)
546 dlm_lockres_release_ast(dlm
, res
);
549 dlm_kick_thread(dlm
, res
);
552 dlm_lockres_put(res
);