1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
6 * underlying calls for lock conversion
8 * Copyright (C) 2004 Oracle. All rights reserved.
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
28 #include <linux/module.h>
30 #include <linux/types.h>
31 #include <linux/highmem.h>
32 #include <linux/init.h>
33 #include <linux/sysctl.h>
34 #include <linux/random.h>
35 #include <linux/blkdev.h>
36 #include <linux/socket.h>
37 #include <linux/inet.h>
38 #include <linux/spinlock.h>
41 #include "cluster/heartbeat.h"
42 #include "cluster/nodemanager.h"
43 #include "cluster/tcp.h"
46 #include "dlmcommon.h"
48 #include "dlmconvert.h"
50 #define MLOG_MASK_PREFIX ML_DLM
51 #include "cluster/masklog.h"
53 /* NOTE: __dlmconvert_master is the only function in here that
54 * needs a spinlock held on entry (res->spinlock) and it is the
55 * only one that holds a lock on exit (res->spinlock).
56 * All other functions in here need no locks and drop all of
57 * the locks that they acquire. */
58 static enum dlm_status
__dlmconvert_master(struct dlm_ctxt
*dlm
,
59 struct dlm_lock_resource
*res
,
60 struct dlm_lock
*lock
, int flags
,
61 int type
, int *call_ast
,
63 static enum dlm_status
dlm_send_remote_convert_request(struct dlm_ctxt
*dlm
,
64 struct dlm_lock_resource
*res
,
65 struct dlm_lock
*lock
, int flags
, int type
);
68 * this is only called directly by dlmlock(), and only when the
69 * local node is the owner of the lockres
72 * taken: takes and drops res->spinlock
74 * returns: see __dlmconvert_master
76 enum dlm_status
dlmconvert_master(struct dlm_ctxt
*dlm
,
77 struct dlm_lock_resource
*res
,
78 struct dlm_lock
*lock
, int flags
, int type
)
80 int call_ast
= 0, kick_thread
= 0;
81 enum dlm_status status
;
83 spin_lock(&res
->spinlock
);
84 /* we are not in a network handler, this is fine */
85 __dlm_wait_on_lockres(res
);
86 __dlm_lockres_reserve_ast(res
);
87 res
->state
|= DLM_LOCK_RES_IN_PROGRESS
;
89 status
= __dlmconvert_master(dlm
, res
, lock
, flags
, type
,
90 &call_ast
, &kick_thread
);
92 res
->state
&= ~DLM_LOCK_RES_IN_PROGRESS
;
93 spin_unlock(&res
->spinlock
);
95 if (status
!= DLM_NORMAL
&& status
!= DLM_NOTQUEUED
)
98 /* either queue the ast or release it */
100 dlm_queue_ast(dlm
, lock
);
102 dlm_lockres_release_ast(dlm
, res
);
105 dlm_kick_thread(dlm
, res
);
110 /* performs lock conversion at the lockres master site
112 * caller needs: res->spinlock
113 * taken: takes and drops lock->spinlock
114 * held on exit: res->spinlock
115 * returns: DLM_NORMAL, DLM_NOTQUEUED, DLM_DENIED
116 * call_ast: whether ast should be called for this lock
117 * kick_thread: whether dlm_kick_thread should be called
119 static enum dlm_status
__dlmconvert_master(struct dlm_ctxt
*dlm
,
120 struct dlm_lock_resource
*res
,
121 struct dlm_lock
*lock
, int flags
,
122 int type
, int *call_ast
,
125 enum dlm_status status
= DLM_NORMAL
;
126 struct dlm_lock
*tmplock
=NULL
;
128 assert_spin_locked(&res
->spinlock
);
130 mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n",
131 lock
->ml
.type
, lock
->ml
.convert_type
, type
);
133 spin_lock(&lock
->spinlock
);
135 /* already converting? */
136 if (lock
->ml
.convert_type
!= LKM_IVMODE
) {
137 mlog(ML_ERROR
, "attempted to convert a lock with a lock "
138 "conversion pending\n");
143 /* must be on grant queue to convert */
144 if (!dlm_lock_on_list(&res
->granted
, lock
)) {
145 mlog(ML_ERROR
, "attempted to convert a lock not on grant "
151 if (flags
& LKM_VALBLK
) {
152 switch (lock
->ml
.type
) {
154 /* EX + LKM_VALBLK + convert == set lvb */
155 mlog(0, "will set lvb: converting %s->%s\n",
156 dlm_lock_mode_name(lock
->ml
.type
),
157 dlm_lock_mode_name(type
));
158 lock
->lksb
->flags
|= DLM_LKSB_PUT_LVB
;
162 /* refetch if new level is not NL */
163 if (type
> LKM_NLMODE
) {
164 mlog(0, "will fetch new value into "
165 "lvb: converting %s->%s\n",
166 dlm_lock_mode_name(lock
->ml
.type
),
167 dlm_lock_mode_name(type
));
168 lock
->lksb
->flags
|= DLM_LKSB_GET_LVB
;
170 mlog(0, "will NOT fetch new value "
171 "into lvb: converting %s->%s\n",
172 dlm_lock_mode_name(lock
->ml
.type
),
173 dlm_lock_mode_name(type
));
174 flags
&= ~(LKM_VALBLK
);
181 /* in-place downconvert? */
182 if (type
<= lock
->ml
.type
)
185 /* upconvert from here on */
187 list_for_each_entry(tmplock
, &res
->granted
, list
) {
190 if (!dlm_lock_compatible(tmplock
->ml
.type
, type
))
194 list_for_each_entry(tmplock
, &res
->converting
, list
) {
195 if (!dlm_lock_compatible(tmplock
->ml
.type
, type
))
197 /* existing conversion requests take precedence */
198 if (!dlm_lock_compatible(tmplock
->ml
.convert_type
, type
))
202 /* fall thru to grant */
205 mlog(0, "res %.*s, granting %s lock\n", res
->lockname
.len
,
206 res
->lockname
.name
, dlm_lock_mode_name(type
));
207 /* immediately grant the new lock type */
208 lock
->lksb
->status
= DLM_NORMAL
;
209 if (lock
->ml
.node
== dlm
->node_num
)
210 mlog(0, "doing in-place convert for nonlocal lock\n");
211 lock
->ml
.type
= type
;
212 if (lock
->lksb
->flags
& DLM_LKSB_PUT_LVB
)
213 memcpy(res
->lvb
, lock
->lksb
->lvb
, DLM_LVB_LEN
);
216 * Move the lock to the tail because it may be the only lock which has
219 list_move_tail(&lock
->list
, &res
->granted
);
226 if (flags
& LKM_NOQUEUE
) {
227 mlog(0, "failed to convert NOQUEUE lock %.*s from "
228 "%d to %d...\n", res
->lockname
.len
, res
->lockname
.name
,
229 lock
->ml
.type
, type
);
230 status
= DLM_NOTQUEUED
;
233 mlog(0, "res %.*s, queueing...\n", res
->lockname
.len
,
236 lock
->ml
.convert_type
= type
;
237 /* do not alter lock refcount. switching lists. */
238 list_move_tail(&lock
->list
, &res
->converting
);
241 spin_unlock(&lock
->spinlock
);
242 if (status
== DLM_DENIED
) {
243 __dlm_print_one_lock_resource(res
);
245 if (status
== DLM_NORMAL
)
250 void dlm_revert_pending_convert(struct dlm_lock_resource
*res
,
251 struct dlm_lock
*lock
)
253 /* do not alter lock refcount. switching lists. */
254 list_move_tail(&lock
->list
, &res
->granted
);
255 lock
->ml
.convert_type
= LKM_IVMODE
;
256 lock
->lksb
->flags
&= ~(DLM_LKSB_GET_LVB
|DLM_LKSB_PUT_LVB
);
259 /* messages the master site to do lock conversion
262 * taken: takes and drops res->spinlock, uses DLM_LOCK_RES_IN_PROGRESS
264 * returns: DLM_NORMAL, DLM_RECOVERING, status from remote node
266 enum dlm_status
dlmconvert_remote(struct dlm_ctxt
*dlm
,
267 struct dlm_lock_resource
*res
,
268 struct dlm_lock
*lock
, int flags
, int type
)
270 enum dlm_status status
;
272 mlog(0, "type=%d, convert_type=%d, busy=%d\n", lock
->ml
.type
,
273 lock
->ml
.convert_type
, res
->state
& DLM_LOCK_RES_IN_PROGRESS
);
275 spin_lock(&res
->spinlock
);
276 if (res
->state
& DLM_LOCK_RES_RECOVERING
) {
277 mlog(0, "bailing out early since res is RECOVERING "
278 "on secondary queue\n");
279 /* __dlm_print_one_lock_resource(res); */
280 status
= DLM_RECOVERING
;
283 /* will exit this call with spinlock held */
284 __dlm_wait_on_lockres(res
);
286 if (lock
->ml
.convert_type
!= LKM_IVMODE
) {
287 __dlm_print_one_lock_resource(res
);
288 mlog(ML_ERROR
, "converting a remote lock that is already "
289 "converting! (cookie=%u:%llu, conv=%d)\n",
290 dlm_get_lock_cookie_node(be64_to_cpu(lock
->ml
.cookie
)),
291 dlm_get_lock_cookie_seq(be64_to_cpu(lock
->ml
.cookie
)),
292 lock
->ml
.convert_type
);
297 if (lock
->ml
.type
== type
&& lock
->ml
.convert_type
== LKM_IVMODE
) {
298 mlog(0, "last convert request returned DLM_RECOVERING, but "
299 "owner has already queued and sent ast to me. res %.*s, "
300 "(cookie=%u:%llu, type=%d, conv=%d)\n",
301 res
->lockname
.len
, res
->lockname
.name
,
302 dlm_get_lock_cookie_node(be64_to_cpu(lock
->ml
.cookie
)),
303 dlm_get_lock_cookie_seq(be64_to_cpu(lock
->ml
.cookie
)),
304 lock
->ml
.type
, lock
->ml
.convert_type
);
309 res
->state
|= DLM_LOCK_RES_IN_PROGRESS
;
310 /* move lock to local convert queue */
311 /* do not alter lock refcount. switching lists. */
312 list_move_tail(&lock
->list
, &res
->converting
);
313 lock
->convert_pending
= 1;
314 lock
->ml
.convert_type
= type
;
316 if (flags
& LKM_VALBLK
) {
317 if (lock
->ml
.type
== LKM_EXMODE
) {
318 flags
|= LKM_PUT_LVB
;
319 lock
->lksb
->flags
|= DLM_LKSB_PUT_LVB
;
321 if (lock
->ml
.convert_type
== LKM_NLMODE
)
322 flags
&= ~LKM_VALBLK
;
324 flags
|= LKM_GET_LVB
;
325 lock
->lksb
->flags
|= DLM_LKSB_GET_LVB
;
329 spin_unlock(&res
->spinlock
);
331 /* no locks held here.
332 * need to wait for a reply as to whether it got queued or not. */
333 status
= dlm_send_remote_convert_request(dlm
, res
, lock
, flags
, type
);
335 spin_lock(&res
->spinlock
);
336 res
->state
&= ~DLM_LOCK_RES_IN_PROGRESS
;
337 /* if it failed, move it back to granted queue.
338 * if master returns DLM_NORMAL and then down before sending ast,
339 * it may have already been moved to granted queue, reset to
340 * DLM_RECOVERING and retry convert */
341 if (status
!= DLM_NORMAL
) {
342 if (status
!= DLM_NOTQUEUED
)
344 dlm_revert_pending_convert(res
, lock
);
345 } else if (!lock
->convert_pending
) {
346 mlog(0, "%s: res %.*s, owner died and lock has been moved back "
347 "to granted list, retry convert.\n",
348 dlm
->name
, res
->lockname
.len
, res
->lockname
.name
);
349 status
= DLM_RECOVERING
;
352 lock
->convert_pending
= 0;
354 spin_unlock(&res
->spinlock
);
356 /* TODO: should this be a wake_one? */
357 /* wake up any IN_PROGRESS waiters */
363 /* sends DLM_CONVERT_LOCK_MSG to master site
368 * returns: DLM_NOLOCKMGR, status from remote node
370 static enum dlm_status
dlm_send_remote_convert_request(struct dlm_ctxt
*dlm
,
371 struct dlm_lock_resource
*res
,
372 struct dlm_lock
*lock
, int flags
, int type
)
374 struct dlm_convert_lock convert
;
381 mlog(0, "%.*s\n", res
->lockname
.len
, res
->lockname
.name
);
383 memset(&convert
, 0, sizeof(struct dlm_convert_lock
));
384 convert
.node_idx
= dlm
->node_num
;
385 convert
.requested_type
= type
;
386 convert
.cookie
= lock
->ml
.cookie
;
387 convert
.namelen
= res
->lockname
.len
;
388 convert
.flags
= cpu_to_be32(flags
);
389 memcpy(convert
.name
, res
->lockname
.name
, convert
.namelen
);
391 vec
[0].iov_len
= sizeof(struct dlm_convert_lock
);
392 vec
[0].iov_base
= &convert
;
394 if (flags
& LKM_PUT_LVB
) {
395 /* extra data to send if we are updating lvb */
396 vec
[1].iov_len
= DLM_LVB_LEN
;
397 vec
[1].iov_base
= lock
->lksb
->lvb
;
401 tmpret
= o2net_send_message_vec(DLM_CONVERT_LOCK_MSG
, dlm
->key
,
402 vec
, veclen
, res
->owner
, &status
);
404 // successfully sent and received
405 ret
= status
; // this is already a dlm_status
406 if (ret
== DLM_RECOVERING
) {
407 mlog(0, "node %u returned DLM_RECOVERING from convert "
408 "message!\n", res
->owner
);
409 } else if (ret
== DLM_MIGRATING
) {
410 mlog(0, "node %u returned DLM_MIGRATING from convert "
411 "message!\n", res
->owner
);
412 } else if (ret
== DLM_FORWARD
) {
413 mlog(0, "node %u returned DLM_FORWARD from convert "
414 "message!\n", res
->owner
);
415 } else if (ret
!= DLM_NORMAL
&& ret
!= DLM_NOTQUEUED
)
418 mlog(ML_ERROR
, "Error %d when sending message %u (key 0x%x) to "
419 "node %u\n", tmpret
, DLM_CONVERT_LOCK_MSG
, dlm
->key
,
421 if (dlm_is_host_down(tmpret
)) {
422 /* instead of logging the same network error over
423 * and over, sleep here and wait for the heartbeat
424 * to notice the node is dead. times out after 5s. */
425 dlm_wait_for_node_death(dlm
, res
->owner
,
426 DLM_NODE_DEATH_WAIT_MAX
);
427 ret
= DLM_RECOVERING
;
428 mlog(0, "node %u died so returning DLM_RECOVERING "
429 "from convert message!\n", res
->owner
);
431 ret
= dlm_err_to_dlm_status(tmpret
);
438 /* handler for DLM_CONVERT_LOCK_MSG on master site
441 * taken: takes and drop res->spinlock
443 * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
444 * status from __dlmconvert_master
446 int dlm_convert_lock_handler(struct o2net_msg
*msg
, u32 len
, void *data
,
449 struct dlm_ctxt
*dlm
= data
;
450 struct dlm_convert_lock
*cnv
= (struct dlm_convert_lock
*)msg
->buf
;
451 struct dlm_lock_resource
*res
= NULL
;
452 struct dlm_lock
*lock
= NULL
;
453 struct dlm_lock
*tmp_lock
;
454 struct dlm_lockstatus
*lksb
;
455 enum dlm_status status
= DLM_NORMAL
;
457 int call_ast
= 0, kick_thread
= 0, ast_reserved
= 0, wake
= 0;
459 if (!dlm_grab(dlm
)) {
460 dlm_error(DLM_REJECTED
);
464 mlog_bug_on_msg(!dlm_domain_fully_joined(dlm
),
465 "Domain %s not fully joined!\n", dlm
->name
);
467 if (cnv
->namelen
> DLM_LOCKID_NAME_MAX
) {
468 status
= DLM_IVBUFLEN
;
473 flags
= be32_to_cpu(cnv
->flags
);
475 if ((flags
& (LKM_PUT_LVB
|LKM_GET_LVB
)) ==
476 (LKM_PUT_LVB
|LKM_GET_LVB
)) {
477 mlog(ML_ERROR
, "both PUT and GET lvb specified\n");
478 status
= DLM_BADARGS
;
482 mlog(0, "lvb: %s\n", flags
& LKM_PUT_LVB
? "put lvb" :
483 (flags
& LKM_GET_LVB
? "get lvb" : "none"));
485 status
= DLM_IVLOCKID
;
486 res
= dlm_lookup_lockres(dlm
, cnv
->name
, cnv
->namelen
);
492 spin_lock(&res
->spinlock
);
493 status
= __dlm_lockres_state_to_status(res
);
494 if (status
!= DLM_NORMAL
) {
495 spin_unlock(&res
->spinlock
);
499 list_for_each_entry(tmp_lock
, &res
->granted
, list
) {
500 if (tmp_lock
->ml
.cookie
== cnv
->cookie
&&
501 tmp_lock
->ml
.node
== cnv
->node_idx
) {
507 spin_unlock(&res
->spinlock
);
509 status
= DLM_IVLOCKID
;
510 mlog(ML_ERROR
, "did not find lock to convert on grant queue! "
512 dlm_get_lock_cookie_node(be64_to_cpu(cnv
->cookie
)),
513 dlm_get_lock_cookie_seq(be64_to_cpu(cnv
->cookie
)));
514 dlm_print_one_lock_resource(res
);
521 /* see if caller needed to get/put lvb */
522 if (flags
& LKM_PUT_LVB
) {
523 BUG_ON(lksb
->flags
& (DLM_LKSB_PUT_LVB
|DLM_LKSB_GET_LVB
));
524 lksb
->flags
|= DLM_LKSB_PUT_LVB
;
525 memcpy(&lksb
->lvb
[0], &cnv
->lvb
[0], DLM_LVB_LEN
);
526 } else if (flags
& LKM_GET_LVB
) {
527 BUG_ON(lksb
->flags
& (DLM_LKSB_PUT_LVB
|DLM_LKSB_GET_LVB
));
528 lksb
->flags
|= DLM_LKSB_GET_LVB
;
531 spin_lock(&res
->spinlock
);
532 status
= __dlm_lockres_state_to_status(res
);
533 if (status
== DLM_NORMAL
) {
534 __dlm_lockres_reserve_ast(res
);
536 res
->state
|= DLM_LOCK_RES_IN_PROGRESS
;
537 status
= __dlmconvert_master(dlm
, res
, lock
, flags
,
539 &call_ast
, &kick_thread
);
540 res
->state
&= ~DLM_LOCK_RES_IN_PROGRESS
;
543 spin_unlock(&res
->spinlock
);
547 if (status
!= DLM_NORMAL
) {
548 if (status
!= DLM_NOTQUEUED
)
550 lksb
->flags
&= ~(DLM_LKSB_GET_LVB
|DLM_LKSB_PUT_LVB
);
557 /* either queue the ast or release it, if reserved */
559 dlm_queue_ast(dlm
, lock
);
560 else if (ast_reserved
)
561 dlm_lockres_release_ast(dlm
, res
);
564 dlm_kick_thread(dlm
, res
);
567 dlm_lockres_put(res
);