1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
64 #include "requestqueue.h"
68 #include "lockspace.h"
73 #include "lvb_table.h"
77 static int send_request(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
78 static int send_convert(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
79 static int send_unlock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
80 static int send_cancel(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
81 static int send_grant(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
82 static int send_bast(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int mode
);
83 static int send_lookup(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
84 static int send_remove(struct dlm_rsb
*r
);
85 static int _request_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
86 static int _cancel_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
87 static void __receive_convert_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
88 struct dlm_message
*ms
);
89 static int receive_extralen(struct dlm_message
*ms
);
90 static void do_purge(struct dlm_ls
*ls
, int nodeid
, int pid
);
91 static void del_timeout(struct dlm_lkb
*lkb
);
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
101 static const int __dlm_compat_matrix
[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
122 const int dlm_lvb_operations
[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
134 #define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 int dlm_modes_compat(int mode1
, int mode2
)
139 return __dlm_compat_matrix
[mode1
+ 1][mode2
+ 1];
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
148 static const int __quecvt_compat_matrix
[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
160 void dlm_print_lkb(struct dlm_lkb
*lkb
)
162 printk(KERN_ERR
"lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 " status %d rqmode %d grmode %d wait_type %d\n",
164 lkb
->lkb_nodeid
, lkb
->lkb_id
, lkb
->lkb_remid
, lkb
->lkb_exflags
,
165 lkb
->lkb_flags
, lkb
->lkb_status
, lkb
->lkb_rqmode
,
166 lkb
->lkb_grmode
, lkb
->lkb_wait_type
);
169 static void dlm_print_rsb(struct dlm_rsb
*r
)
171 printk(KERN_ERR
"rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r
->res_nodeid
, r
->res_flags
, r
->res_first_lkid
,
173 r
->res_recover_locks_count
, r
->res_name
);
176 void dlm_dump_rsb(struct dlm_rsb
*r
)
182 printk(KERN_ERR
"rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r
->res_root_list
), list_empty(&r
->res_recover_list
));
184 printk(KERN_ERR
"rsb lookup list\n");
185 list_for_each_entry(lkb
, &r
->res_lookup
, lkb_rsb_lookup
)
187 printk(KERN_ERR
"rsb grant queue:\n");
188 list_for_each_entry(lkb
, &r
->res_grantqueue
, lkb_statequeue
)
190 printk(KERN_ERR
"rsb convert queue:\n");
191 list_for_each_entry(lkb
, &r
->res_convertqueue
, lkb_statequeue
)
193 printk(KERN_ERR
"rsb wait queue:\n");
194 list_for_each_entry(lkb
, &r
->res_waitqueue
, lkb_statequeue
)
198 /* Threads cannot use the lockspace while it's being recovered */
200 static inline void dlm_lock_recovery(struct dlm_ls
*ls
)
202 down_read(&ls
->ls_in_recovery
);
205 void dlm_unlock_recovery(struct dlm_ls
*ls
)
207 up_read(&ls
->ls_in_recovery
);
210 int dlm_lock_recovery_try(struct dlm_ls
*ls
)
212 return down_read_trylock(&ls
->ls_in_recovery
);
215 static inline int can_be_queued(struct dlm_lkb
*lkb
)
217 return !(lkb
->lkb_exflags
& DLM_LKF_NOQUEUE
);
220 static inline int force_blocking_asts(struct dlm_lkb
*lkb
)
222 return (lkb
->lkb_exflags
& DLM_LKF_NOQUEUEBAST
);
225 static inline int is_demoted(struct dlm_lkb
*lkb
)
227 return (lkb
->lkb_sbflags
& DLM_SBF_DEMOTED
);
230 static inline int is_altmode(struct dlm_lkb
*lkb
)
232 return (lkb
->lkb_sbflags
& DLM_SBF_ALTMODE
);
235 static inline int is_granted(struct dlm_lkb
*lkb
)
237 return (lkb
->lkb_status
== DLM_LKSTS_GRANTED
);
240 static inline int is_remote(struct dlm_rsb
*r
)
242 DLM_ASSERT(r
->res_nodeid
>= 0, dlm_print_rsb(r
););
243 return !!r
->res_nodeid
;
246 static inline int is_process_copy(struct dlm_lkb
*lkb
)
248 return (lkb
->lkb_nodeid
&& !(lkb
->lkb_flags
& DLM_IFL_MSTCPY
));
251 static inline int is_master_copy(struct dlm_lkb
*lkb
)
253 if (lkb
->lkb_flags
& DLM_IFL_MSTCPY
)
254 DLM_ASSERT(lkb
->lkb_nodeid
, dlm_print_lkb(lkb
););
255 return (lkb
->lkb_flags
& DLM_IFL_MSTCPY
) ? 1 : 0;
258 static inline int middle_conversion(struct dlm_lkb
*lkb
)
260 if ((lkb
->lkb_grmode
==DLM_LOCK_PR
&& lkb
->lkb_rqmode
==DLM_LOCK_CW
) ||
261 (lkb
->lkb_rqmode
==DLM_LOCK_PR
&& lkb
->lkb_grmode
==DLM_LOCK_CW
))
266 static inline int down_conversion(struct dlm_lkb
*lkb
)
268 return (!middle_conversion(lkb
) && lkb
->lkb_rqmode
< lkb
->lkb_grmode
);
271 static inline int is_overlap_unlock(struct dlm_lkb
*lkb
)
273 return lkb
->lkb_flags
& DLM_IFL_OVERLAP_UNLOCK
;
276 static inline int is_overlap_cancel(struct dlm_lkb
*lkb
)
278 return lkb
->lkb_flags
& DLM_IFL_OVERLAP_CANCEL
;
281 static inline int is_overlap(struct dlm_lkb
*lkb
)
283 return (lkb
->lkb_flags
& (DLM_IFL_OVERLAP_UNLOCK
|
284 DLM_IFL_OVERLAP_CANCEL
));
287 static void queue_cast(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
289 if (is_master_copy(lkb
))
294 DLM_ASSERT(lkb
->lkb_lksb
, dlm_print_lkb(lkb
););
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv
== -DLM_ECANCEL
&& (lkb
->lkb_flags
& DLM_IFL_TIMEOUT_CANCEL
)) {
299 lkb
->lkb_flags
&= ~DLM_IFL_TIMEOUT_CANCEL
;
303 if (rv
== -DLM_ECANCEL
&& (lkb
->lkb_flags
& DLM_IFL_DEADLOCK_CANCEL
)) {
304 lkb
->lkb_flags
&= ~DLM_IFL_DEADLOCK_CANCEL
;
308 dlm_add_ast(lkb
, DLM_CB_CAST
, lkb
->lkb_grmode
, rv
, lkb
->lkb_sbflags
);
311 static inline void queue_cast_overlap(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
314 is_overlap_unlock(lkb
) ? -DLM_EUNLOCK
: -DLM_ECANCEL
);
317 static void queue_bast(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rqmode
)
319 if (is_master_copy(lkb
)) {
320 send_bast(r
, lkb
, rqmode
);
322 dlm_add_ast(lkb
, DLM_CB_BAST
, rqmode
, 0, 0);
327 * Basic operations on rsb's and lkb's
330 static struct dlm_rsb
*create_rsb(struct dlm_ls
*ls
, char *name
, int len
)
334 r
= dlm_allocate_rsb(ls
, len
);
340 memcpy(r
->res_name
, name
, len
);
341 mutex_init(&r
->res_mutex
);
343 INIT_LIST_HEAD(&r
->res_lookup
);
344 INIT_LIST_HEAD(&r
->res_grantqueue
);
345 INIT_LIST_HEAD(&r
->res_convertqueue
);
346 INIT_LIST_HEAD(&r
->res_waitqueue
);
347 INIT_LIST_HEAD(&r
->res_root_list
);
348 INIT_LIST_HEAD(&r
->res_recover_list
);
353 static int search_rsb_list(struct list_head
*head
, char *name
, int len
,
354 unsigned int flags
, struct dlm_rsb
**r_ret
)
359 list_for_each_entry(r
, head
, res_hashchain
) {
360 if (len
== r
->res_length
&& !memcmp(name
, r
->res_name
, len
))
367 if (r
->res_nodeid
&& (flags
& R_MASTER
))
373 static int _search_rsb(struct dlm_ls
*ls
, char *name
, int len
, int b
,
374 unsigned int flags
, struct dlm_rsb
**r_ret
)
379 error
= search_rsb_list(&ls
->ls_rsbtbl
[b
].list
, name
, len
, flags
, &r
);
381 kref_get(&r
->res_ref
);
384 error
= search_rsb_list(&ls
->ls_rsbtbl
[b
].toss
, name
, len
, flags
, &r
);
388 list_move(&r
->res_hashchain
, &ls
->ls_rsbtbl
[b
].list
);
390 if (dlm_no_directory(ls
))
393 if (r
->res_nodeid
== -1) {
394 rsb_clear_flag(r
, RSB_MASTER_UNCERTAIN
);
395 r
->res_first_lkid
= 0;
396 } else if (r
->res_nodeid
> 0) {
397 rsb_set_flag(r
, RSB_MASTER_UNCERTAIN
);
398 r
->res_first_lkid
= 0;
400 DLM_ASSERT(r
->res_nodeid
== 0, dlm_print_rsb(r
););
401 DLM_ASSERT(!rsb_flag(r
, RSB_MASTER_UNCERTAIN
),);
408 static int search_rsb(struct dlm_ls
*ls
, char *name
, int len
, int b
,
409 unsigned int flags
, struct dlm_rsb
**r_ret
)
412 spin_lock(&ls
->ls_rsbtbl
[b
].lock
);
413 error
= _search_rsb(ls
, name
, len
, b
, flags
, r_ret
);
414 spin_unlock(&ls
->ls_rsbtbl
[b
].lock
);
419 * Find rsb in rsbtbl and potentially create/add one
421 * Delaying the release of rsb's has a similar benefit to applications keeping
422 * NL locks on an rsb, but without the guarantee that the cached master value
423 * will still be valid when the rsb is reused. Apps aren't always smart enough
424 * to keep NL locks on an rsb that they may lock again shortly; this can lead
425 * to excessive master lookups and removals if we don't delay the release.
427 * Searching for an rsb means looking through both the normal list and toss
428 * list. When found on the toss list the rsb is moved to the normal list with
429 * ref count of 1; when found on normal list the ref count is incremented.
432 static int find_rsb(struct dlm_ls
*ls
, char *name
, int namelen
,
433 unsigned int flags
, struct dlm_rsb
**r_ret
)
435 struct dlm_rsb
*r
= NULL
, *tmp
;
436 uint32_t hash
, bucket
;
439 if (namelen
> DLM_RESNAME_MAXLEN
)
442 if (dlm_no_directory(ls
))
446 hash
= jhash(name
, namelen
, 0);
447 bucket
= hash
& (ls
->ls_rsbtbl_size
- 1);
449 error
= search_rsb(ls
, name
, namelen
, bucket
, flags
, &r
);
453 if (error
== -EBADR
&& !(flags
& R_CREATE
))
456 /* the rsb was found but wasn't a master copy */
457 if (error
== -ENOTBLK
)
461 r
= create_rsb(ls
, name
, namelen
);
466 r
->res_bucket
= bucket
;
468 kref_init(&r
->res_ref
);
470 /* With no directory, the master can be set immediately */
471 if (dlm_no_directory(ls
)) {
472 int nodeid
= dlm_dir_nodeid(r
);
473 if (nodeid
== dlm_our_nodeid())
475 r
->res_nodeid
= nodeid
;
478 spin_lock(&ls
->ls_rsbtbl
[bucket
].lock
);
479 error
= _search_rsb(ls
, name
, namelen
, bucket
, 0, &tmp
);
481 spin_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
486 list_add(&r
->res_hashchain
, &ls
->ls_rsbtbl
[bucket
].list
);
487 spin_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
494 /* This is only called to add a reference when the code already holds
495 a valid reference to the rsb, so there's no need for locking. */
497 static inline void hold_rsb(struct dlm_rsb
*r
)
499 kref_get(&r
->res_ref
);
502 void dlm_hold_rsb(struct dlm_rsb
*r
)
507 static void toss_rsb(struct kref
*kref
)
509 struct dlm_rsb
*r
= container_of(kref
, struct dlm_rsb
, res_ref
);
510 struct dlm_ls
*ls
= r
->res_ls
;
512 DLM_ASSERT(list_empty(&r
->res_root_list
), dlm_print_rsb(r
););
513 kref_init(&r
->res_ref
);
514 list_move(&r
->res_hashchain
, &ls
->ls_rsbtbl
[r
->res_bucket
].toss
);
515 r
->res_toss_time
= jiffies
;
517 dlm_free_lvb(r
->res_lvbptr
);
518 r
->res_lvbptr
= NULL
;
522 /* When all references to the rsb are gone it's transferred to
523 the tossed list for later disposal. */
525 static void put_rsb(struct dlm_rsb
*r
)
527 struct dlm_ls
*ls
= r
->res_ls
;
528 uint32_t bucket
= r
->res_bucket
;
530 spin_lock(&ls
->ls_rsbtbl
[bucket
].lock
);
531 kref_put(&r
->res_ref
, toss_rsb
);
532 spin_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
535 void dlm_put_rsb(struct dlm_rsb
*r
)
540 /* See comment for unhold_lkb */
542 static void unhold_rsb(struct dlm_rsb
*r
)
545 rv
= kref_put(&r
->res_ref
, toss_rsb
);
546 DLM_ASSERT(!rv
, dlm_dump_rsb(r
););
549 static void kill_rsb(struct kref
*kref
)
551 struct dlm_rsb
*r
= container_of(kref
, struct dlm_rsb
, res_ref
);
553 /* All work is done after the return from kref_put() so we
554 can release the write_lock before the remove and free. */
556 DLM_ASSERT(list_empty(&r
->res_lookup
), dlm_dump_rsb(r
););
557 DLM_ASSERT(list_empty(&r
->res_grantqueue
), dlm_dump_rsb(r
););
558 DLM_ASSERT(list_empty(&r
->res_convertqueue
), dlm_dump_rsb(r
););
559 DLM_ASSERT(list_empty(&r
->res_waitqueue
), dlm_dump_rsb(r
););
560 DLM_ASSERT(list_empty(&r
->res_root_list
), dlm_dump_rsb(r
););
561 DLM_ASSERT(list_empty(&r
->res_recover_list
), dlm_dump_rsb(r
););
564 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565 The rsb must exist as long as any lkb's for it do. */
567 static void attach_lkb(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
570 lkb
->lkb_resource
= r
;
573 static void detach_lkb(struct dlm_lkb
*lkb
)
575 if (lkb
->lkb_resource
) {
576 put_rsb(lkb
->lkb_resource
);
577 lkb
->lkb_resource
= NULL
;
581 static int create_lkb(struct dlm_ls
*ls
, struct dlm_lkb
**lkb_ret
)
583 struct dlm_lkb
*lkb
, *tmp
;
587 lkb
= dlm_allocate_lkb(ls
);
591 lkb
->lkb_nodeid
= -1;
592 lkb
->lkb_grmode
= DLM_LOCK_IV
;
593 kref_init(&lkb
->lkb_ref
);
594 INIT_LIST_HEAD(&lkb
->lkb_ownqueue
);
595 INIT_LIST_HEAD(&lkb
->lkb_rsb_lookup
);
596 INIT_LIST_HEAD(&lkb
->lkb_time_list
);
597 INIT_LIST_HEAD(&lkb
->lkb_astqueue
);
599 get_random_bytes(&bucket
, sizeof(bucket
));
600 bucket
&= (ls
->ls_lkbtbl_size
- 1);
602 write_lock(&ls
->ls_lkbtbl
[bucket
].lock
);
604 /* counter can roll over so we must verify lkid is not in use */
607 lkid
= (bucket
<< 16) | ls
->ls_lkbtbl
[bucket
].counter
++;
609 list_for_each_entry(tmp
, &ls
->ls_lkbtbl
[bucket
].list
,
611 if (tmp
->lkb_id
!= lkid
)
619 list_add(&lkb
->lkb_idtbl_list
, &ls
->ls_lkbtbl
[bucket
].list
);
620 write_unlock(&ls
->ls_lkbtbl
[bucket
].lock
);
626 static struct dlm_lkb
*__find_lkb(struct dlm_ls
*ls
, uint32_t lkid
)
629 uint16_t bucket
= (lkid
>> 16);
631 list_for_each_entry(lkb
, &ls
->ls_lkbtbl
[bucket
].list
, lkb_idtbl_list
) {
632 if (lkb
->lkb_id
== lkid
)
638 static int find_lkb(struct dlm_ls
*ls
, uint32_t lkid
, struct dlm_lkb
**lkb_ret
)
641 uint16_t bucket
= (lkid
>> 16);
643 if (bucket
>= ls
->ls_lkbtbl_size
)
646 read_lock(&ls
->ls_lkbtbl
[bucket
].lock
);
647 lkb
= __find_lkb(ls
, lkid
);
649 kref_get(&lkb
->lkb_ref
);
650 read_unlock(&ls
->ls_lkbtbl
[bucket
].lock
);
653 return lkb
? 0 : -ENOENT
;
656 static void kill_lkb(struct kref
*kref
)
658 struct dlm_lkb
*lkb
= container_of(kref
, struct dlm_lkb
, lkb_ref
);
660 /* All work is done after the return from kref_put() so we
661 can release the write_lock before the detach_lkb */
663 DLM_ASSERT(!lkb
->lkb_status
, dlm_print_lkb(lkb
););
666 /* __put_lkb() is used when an lkb may not have an rsb attached to
667 it so we need to provide the lockspace explicitly */
669 static int __put_lkb(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
671 uint16_t bucket
= (lkb
->lkb_id
>> 16);
673 write_lock(&ls
->ls_lkbtbl
[bucket
].lock
);
674 if (kref_put(&lkb
->lkb_ref
, kill_lkb
)) {
675 list_del(&lkb
->lkb_idtbl_list
);
676 write_unlock(&ls
->ls_lkbtbl
[bucket
].lock
);
680 /* for local/process lkbs, lvbptr points to caller's lksb */
681 if (lkb
->lkb_lvbptr
&& is_master_copy(lkb
))
682 dlm_free_lvb(lkb
->lkb_lvbptr
);
686 write_unlock(&ls
->ls_lkbtbl
[bucket
].lock
);
691 int dlm_put_lkb(struct dlm_lkb
*lkb
)
695 DLM_ASSERT(lkb
->lkb_resource
, dlm_print_lkb(lkb
););
696 DLM_ASSERT(lkb
->lkb_resource
->res_ls
, dlm_print_lkb(lkb
););
698 ls
= lkb
->lkb_resource
->res_ls
;
699 return __put_lkb(ls
, lkb
);
702 /* This is only called to add a reference when the code already holds
703 a valid reference to the lkb, so there's no need for locking. */
705 static inline void hold_lkb(struct dlm_lkb
*lkb
)
707 kref_get(&lkb
->lkb_ref
);
710 /* This is called when we need to remove a reference and are certain
711 it's not the last ref. e.g. del_lkb is always called between a
712 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
713 put_lkb would work fine, but would involve unnecessary locking */
715 static inline void unhold_lkb(struct dlm_lkb
*lkb
)
718 rv
= kref_put(&lkb
->lkb_ref
, kill_lkb
);
719 DLM_ASSERT(!rv
, dlm_print_lkb(lkb
););
722 static void lkb_add_ordered(struct list_head
*new, struct list_head
*head
,
725 struct dlm_lkb
*lkb
= NULL
;
727 list_for_each_entry(lkb
, head
, lkb_statequeue
)
728 if (lkb
->lkb_rqmode
< mode
)
731 __list_add(new, lkb
->lkb_statequeue
.prev
, &lkb
->lkb_statequeue
);
734 /* add/remove lkb to rsb's grant/convert/wait queue */
736 static void add_lkb(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int status
)
738 kref_get(&lkb
->lkb_ref
);
740 DLM_ASSERT(!lkb
->lkb_status
, dlm_print_lkb(lkb
););
742 lkb
->lkb_timestamp
= ktime_get();
744 lkb
->lkb_status
= status
;
747 case DLM_LKSTS_WAITING
:
748 if (lkb
->lkb_exflags
& DLM_LKF_HEADQUE
)
749 list_add(&lkb
->lkb_statequeue
, &r
->res_waitqueue
);
751 list_add_tail(&lkb
->lkb_statequeue
, &r
->res_waitqueue
);
753 case DLM_LKSTS_GRANTED
:
754 /* convention says granted locks kept in order of grmode */
755 lkb_add_ordered(&lkb
->lkb_statequeue
, &r
->res_grantqueue
,
758 case DLM_LKSTS_CONVERT
:
759 if (lkb
->lkb_exflags
& DLM_LKF_HEADQUE
)
760 list_add(&lkb
->lkb_statequeue
, &r
->res_convertqueue
);
762 list_add_tail(&lkb
->lkb_statequeue
,
763 &r
->res_convertqueue
);
766 DLM_ASSERT(0, dlm_print_lkb(lkb
); printk("sts=%d\n", status
););
770 static void del_lkb(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
773 list_del(&lkb
->lkb_statequeue
);
777 static void move_lkb(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int sts
)
781 add_lkb(r
, lkb
, sts
);
785 static int msg_reply_type(int mstype
)
788 case DLM_MSG_REQUEST
:
789 return DLM_MSG_REQUEST_REPLY
;
790 case DLM_MSG_CONVERT
:
791 return DLM_MSG_CONVERT_REPLY
;
793 return DLM_MSG_UNLOCK_REPLY
;
795 return DLM_MSG_CANCEL_REPLY
;
797 return DLM_MSG_LOOKUP_REPLY
;
802 static int nodeid_warned(int nodeid
, int num_nodes
, int *warned
)
806 for (i
= 0; i
< num_nodes
; i
++) {
811 if (warned
[i
] == nodeid
)
817 void dlm_scan_waiters(struct dlm_ls
*ls
)
820 ktime_t zero
= ktime_set(0, 0);
823 u32 debug_scanned
= 0;
824 u32 debug_expired
= 0;
828 if (!dlm_config
.ci_waitwarn_us
)
831 mutex_lock(&ls
->ls_waiters_mutex
);
833 list_for_each_entry(lkb
, &ls
->ls_waiters
, lkb_wait_reply
) {
834 if (ktime_equal(lkb
->lkb_wait_time
, zero
))
839 us
= ktime_to_us(ktime_sub(ktime_get(), lkb
->lkb_wait_time
));
841 if (us
< dlm_config
.ci_waitwarn_us
)
844 lkb
->lkb_wait_time
= zero
;
847 if (us
> debug_maxus
)
851 num_nodes
= ls
->ls_num_nodes
;
852 warned
= kmalloc(GFP_KERNEL
, num_nodes
* sizeof(int));
854 memset(warned
, 0, num_nodes
* sizeof(int));
858 if (nodeid_warned(lkb
->lkb_wait_nodeid
, num_nodes
, warned
))
861 log_error(ls
, "waitwarn %x %lld %d us check connection to "
862 "node %d", lkb
->lkb_id
, (long long)us
,
863 dlm_config
.ci_waitwarn_us
, lkb
->lkb_wait_nodeid
);
865 mutex_unlock(&ls
->ls_waiters_mutex
);
871 log_debug(ls
, "scan_waiters %u warn %u over %d us max %lld us",
872 debug_scanned
, debug_expired
,
873 dlm_config
.ci_waitwarn_us
, (long long)debug_maxus
);
876 /* add/remove lkb from global waiters list of lkb's waiting for
877 a reply from a remote node */
879 static int add_to_waiters(struct dlm_lkb
*lkb
, int mstype
, int to_nodeid
)
881 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
884 mutex_lock(&ls
->ls_waiters_mutex
);
886 if (is_overlap_unlock(lkb
) ||
887 (is_overlap_cancel(lkb
) && (mstype
== DLM_MSG_CANCEL
))) {
892 if (lkb
->lkb_wait_type
|| is_overlap_cancel(lkb
)) {
895 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_UNLOCK
;
898 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_CANCEL
;
904 lkb
->lkb_wait_count
++;
907 log_debug(ls
, "addwait %x cur %d overlap %d count %d f %x",
908 lkb
->lkb_id
, lkb
->lkb_wait_type
, mstype
,
909 lkb
->lkb_wait_count
, lkb
->lkb_flags
);
913 DLM_ASSERT(!lkb
->lkb_wait_count
,
915 printk("wait_count %d\n", lkb
->lkb_wait_count
););
917 lkb
->lkb_wait_count
++;
918 lkb
->lkb_wait_type
= mstype
;
919 lkb
->lkb_wait_time
= ktime_get();
920 lkb
->lkb_wait_nodeid
= to_nodeid
; /* for debugging */
922 list_add(&lkb
->lkb_wait_reply
, &ls
->ls_waiters
);
925 log_error(ls
, "addwait error %x %d flags %x %d %d %s",
926 lkb
->lkb_id
, error
, lkb
->lkb_flags
, mstype
,
927 lkb
->lkb_wait_type
, lkb
->lkb_resource
->res_name
);
928 mutex_unlock(&ls
->ls_waiters_mutex
);
932 /* We clear the RESEND flag because we might be taking an lkb off the waiters
933 list as part of process_requestqueue (e.g. a lookup that has an optimized
934 request reply on the requestqueue) between dlm_recover_waiters_pre() which
935 set RESEND and dlm_recover_waiters_post() */
937 static int _remove_from_waiters(struct dlm_lkb
*lkb
, int mstype
,
938 struct dlm_message
*ms
)
940 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
941 int overlap_done
= 0;
943 if (is_overlap_unlock(lkb
) && (mstype
== DLM_MSG_UNLOCK_REPLY
)) {
944 log_debug(ls
, "remwait %x unlock_reply overlap", lkb
->lkb_id
);
945 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
950 if (is_overlap_cancel(lkb
) && (mstype
== DLM_MSG_CANCEL_REPLY
)) {
951 log_debug(ls
, "remwait %x cancel_reply overlap", lkb
->lkb_id
);
952 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
957 /* Cancel state was preemptively cleared by a successful convert,
958 see next comment, nothing to do. */
960 if ((mstype
== DLM_MSG_CANCEL_REPLY
) &&
961 (lkb
->lkb_wait_type
!= DLM_MSG_CANCEL
)) {
962 log_debug(ls
, "remwait %x cancel_reply wait_type %d",
963 lkb
->lkb_id
, lkb
->lkb_wait_type
);
967 /* Remove for the convert reply, and premptively remove for the
968 cancel reply. A convert has been granted while there's still
969 an outstanding cancel on it (the cancel is moot and the result
970 in the cancel reply should be 0). We preempt the cancel reply
971 because the app gets the convert result and then can follow up
972 with another op, like convert. This subsequent op would see the
973 lingering state of the cancel and fail with -EBUSY. */
975 if ((mstype
== DLM_MSG_CONVERT_REPLY
) &&
976 (lkb
->lkb_wait_type
== DLM_MSG_CONVERT
) &&
977 is_overlap_cancel(lkb
) && ms
&& !ms
->m_result
) {
978 log_debug(ls
, "remwait %x convert_reply zap overlap_cancel",
980 lkb
->lkb_wait_type
= 0;
981 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
982 lkb
->lkb_wait_count
--;
986 /* N.B. type of reply may not always correspond to type of original
987 msg due to lookup->request optimization, verify others? */
989 if (lkb
->lkb_wait_type
) {
990 lkb
->lkb_wait_type
= 0;
994 log_error(ls
, "remwait error %x reply %d flags %x no wait_type",
995 lkb
->lkb_id
, mstype
, lkb
->lkb_flags
);
999 /* the force-unlock/cancel has completed and we haven't recvd a reply
1000 to the op that was in progress prior to the unlock/cancel; we
1001 give up on any reply to the earlier op. FIXME: not sure when/how
1002 this would happen */
1004 if (overlap_done
&& lkb
->lkb_wait_type
) {
1005 log_error(ls
, "remwait error %x reply %d wait_type %d overlap",
1006 lkb
->lkb_id
, mstype
, lkb
->lkb_wait_type
);
1007 lkb
->lkb_wait_count
--;
1008 lkb
->lkb_wait_type
= 0;
1011 DLM_ASSERT(lkb
->lkb_wait_count
, dlm_print_lkb(lkb
););
1013 lkb
->lkb_flags
&= ~DLM_IFL_RESEND
;
1014 lkb
->lkb_wait_count
--;
1015 if (!lkb
->lkb_wait_count
)
1016 list_del_init(&lkb
->lkb_wait_reply
);
1021 static int remove_from_waiters(struct dlm_lkb
*lkb
, int mstype
)
1023 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
1026 mutex_lock(&ls
->ls_waiters_mutex
);
1027 error
= _remove_from_waiters(lkb
, mstype
, NULL
);
1028 mutex_unlock(&ls
->ls_waiters_mutex
);
1032 /* Handles situations where we might be processing a "fake" or "stub" reply in
1033 which we can't try to take waiters_mutex again. */
1035 static int remove_from_waiters_ms(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
1037 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
1040 if (ms
->m_flags
!= DLM_IFL_STUB_MS
)
1041 mutex_lock(&ls
->ls_waiters_mutex
);
1042 error
= _remove_from_waiters(lkb
, ms
->m_type
, ms
);
1043 if (ms
->m_flags
!= DLM_IFL_STUB_MS
)
1044 mutex_unlock(&ls
->ls_waiters_mutex
);
1048 static void dir_remove(struct dlm_rsb
*r
)
1052 if (dlm_no_directory(r
->res_ls
))
1055 to_nodeid
= dlm_dir_nodeid(r
);
1056 if (to_nodeid
!= dlm_our_nodeid())
1059 dlm_dir_remove_entry(r
->res_ls
, to_nodeid
,
1060 r
->res_name
, r
->res_length
);
1063 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1064 found since they are in order of newest to oldest? */
1066 static int shrink_bucket(struct dlm_ls
*ls
, int b
)
1069 int count
= 0, found
;
1073 spin_lock(&ls
->ls_rsbtbl
[b
].lock
);
1074 list_for_each_entry_reverse(r
, &ls
->ls_rsbtbl
[b
].toss
,
1076 if (!time_after_eq(jiffies
, r
->res_toss_time
+
1077 dlm_config
.ci_toss_secs
* HZ
))
1084 spin_unlock(&ls
->ls_rsbtbl
[b
].lock
);
1088 if (kref_put(&r
->res_ref
, kill_rsb
)) {
1089 list_del(&r
->res_hashchain
);
1090 spin_unlock(&ls
->ls_rsbtbl
[b
].lock
);
1097 spin_unlock(&ls
->ls_rsbtbl
[b
].lock
);
1098 log_error(ls
, "tossed rsb in use %s", r
->res_name
);
1105 void dlm_scan_rsbs(struct dlm_ls
*ls
)
1109 for (i
= 0; i
< ls
->ls_rsbtbl_size
; i
++) {
1110 shrink_bucket(ls
, i
);
1111 if (dlm_locking_stopped(ls
))
1117 static void add_timeout(struct dlm_lkb
*lkb
)
1119 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
1121 if (is_master_copy(lkb
))
1124 if (test_bit(LSFL_TIMEWARN
, &ls
->ls_flags
) &&
1125 !(lkb
->lkb_exflags
& DLM_LKF_NODLCKWT
)) {
1126 lkb
->lkb_flags
|= DLM_IFL_WATCH_TIMEWARN
;
1129 if (lkb
->lkb_exflags
& DLM_LKF_TIMEOUT
)
1134 DLM_ASSERT(list_empty(&lkb
->lkb_time_list
), dlm_print_lkb(lkb
););
1135 mutex_lock(&ls
->ls_timeout_mutex
);
1137 list_add_tail(&lkb
->lkb_time_list
, &ls
->ls_timeout
);
1138 mutex_unlock(&ls
->ls_timeout_mutex
);
1141 static void del_timeout(struct dlm_lkb
*lkb
)
1143 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
1145 mutex_lock(&ls
->ls_timeout_mutex
);
1146 if (!list_empty(&lkb
->lkb_time_list
)) {
1147 list_del_init(&lkb
->lkb_time_list
);
1150 mutex_unlock(&ls
->ls_timeout_mutex
);
1153 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1154 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1155 and then lock rsb because of lock ordering in add_timeout. We may need
1156 to specify some special timeout-related bits in the lkb that are just to
1157 be accessed under the timeout_mutex. */
1159 void dlm_scan_timeout(struct dlm_ls
*ls
)
1162 struct dlm_lkb
*lkb
;
1163 int do_cancel
, do_warn
;
1167 if (dlm_locking_stopped(ls
))
1172 mutex_lock(&ls
->ls_timeout_mutex
);
1173 list_for_each_entry(lkb
, &ls
->ls_timeout
, lkb_time_list
) {
1175 wait_us
= ktime_to_us(ktime_sub(ktime_get(),
1176 lkb
->lkb_timestamp
));
1178 if ((lkb
->lkb_exflags
& DLM_LKF_TIMEOUT
) &&
1179 wait_us
>= (lkb
->lkb_timeout_cs
* 10000))
1182 if ((lkb
->lkb_flags
& DLM_IFL_WATCH_TIMEWARN
) &&
1183 wait_us
>= dlm_config
.ci_timewarn_cs
* 10000)
1186 if (!do_cancel
&& !do_warn
)
1191 mutex_unlock(&ls
->ls_timeout_mutex
);
1193 if (!do_cancel
&& !do_warn
)
1196 r
= lkb
->lkb_resource
;
1201 /* clear flag so we only warn once */
1202 lkb
->lkb_flags
&= ~DLM_IFL_WATCH_TIMEWARN
;
1203 if (!(lkb
->lkb_exflags
& DLM_LKF_TIMEOUT
))
1205 dlm_timeout_warn(lkb
);
1209 log_debug(ls
, "timeout cancel %x node %d %s",
1210 lkb
->lkb_id
, lkb
->lkb_nodeid
, r
->res_name
);
1211 lkb
->lkb_flags
&= ~DLM_IFL_WATCH_TIMEWARN
;
1212 lkb
->lkb_flags
|= DLM_IFL_TIMEOUT_CANCEL
;
1214 _cancel_lock(r
, lkb
);
1223 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1224 dlm_recoverd before checking/setting ls_recover_begin. */
1226 void dlm_adjust_timeouts(struct dlm_ls
*ls
)
1228 struct dlm_lkb
*lkb
;
1229 u64 adj_us
= jiffies_to_usecs(jiffies
- ls
->ls_recover_begin
);
1231 ls
->ls_recover_begin
= 0;
1232 mutex_lock(&ls
->ls_timeout_mutex
);
1233 list_for_each_entry(lkb
, &ls
->ls_timeout
, lkb_time_list
)
1234 lkb
->lkb_timestamp
= ktime_add_us(lkb
->lkb_timestamp
, adj_us
);
1235 mutex_unlock(&ls
->ls_timeout_mutex
);
1237 if (!dlm_config
.ci_waitwarn_us
)
1240 mutex_lock(&ls
->ls_waiters_mutex
);
1241 list_for_each_entry(lkb
, &ls
->ls_waiters
, lkb_wait_reply
) {
1242 if (ktime_to_us(lkb
->lkb_wait_time
))
1243 lkb
->lkb_wait_time
= ktime_get();
1245 mutex_unlock(&ls
->ls_waiters_mutex
);
1248 /* lkb is master or local copy */
1250 static void set_lvb_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1252 int b
, len
= r
->res_ls
->ls_lvblen
;
1254 /* b=1 lvb returned to caller
1255 b=0 lvb written to rsb or invalidated
1258 b
= dlm_lvb_operations
[lkb
->lkb_grmode
+ 1][lkb
->lkb_rqmode
+ 1];
1261 if (!lkb
->lkb_lvbptr
)
1264 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
1270 memcpy(lkb
->lkb_lvbptr
, r
->res_lvbptr
, len
);
1271 lkb
->lkb_lvbseq
= r
->res_lvbseq
;
1273 } else if (b
== 0) {
1274 if (lkb
->lkb_exflags
& DLM_LKF_IVVALBLK
) {
1275 rsb_set_flag(r
, RSB_VALNOTVALID
);
1279 if (!lkb
->lkb_lvbptr
)
1282 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
1286 r
->res_lvbptr
= dlm_allocate_lvb(r
->res_ls
);
1291 memcpy(r
->res_lvbptr
, lkb
->lkb_lvbptr
, len
);
1293 lkb
->lkb_lvbseq
= r
->res_lvbseq
;
1294 rsb_clear_flag(r
, RSB_VALNOTVALID
);
1297 if (rsb_flag(r
, RSB_VALNOTVALID
))
1298 lkb
->lkb_sbflags
|= DLM_SBF_VALNOTVALID
;
1301 static void set_lvb_unlock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1303 if (lkb
->lkb_grmode
< DLM_LOCK_PW
)
1306 if (lkb
->lkb_exflags
& DLM_LKF_IVVALBLK
) {
1307 rsb_set_flag(r
, RSB_VALNOTVALID
);
1311 if (!lkb
->lkb_lvbptr
)
1314 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
1318 r
->res_lvbptr
= dlm_allocate_lvb(r
->res_ls
);
1323 memcpy(r
->res_lvbptr
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
1325 rsb_clear_flag(r
, RSB_VALNOTVALID
);
1328 /* lkb is process copy (pc) */
1330 static void set_lvb_lock_pc(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
1331 struct dlm_message
*ms
)
1335 if (!lkb
->lkb_lvbptr
)
1338 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
1341 b
= dlm_lvb_operations
[lkb
->lkb_grmode
+ 1][lkb
->lkb_rqmode
+ 1];
1343 int len
= receive_extralen(ms
);
1344 if (len
> DLM_RESNAME_MAXLEN
)
1345 len
= DLM_RESNAME_MAXLEN
;
1346 memcpy(lkb
->lkb_lvbptr
, ms
->m_extra
, len
);
1347 lkb
->lkb_lvbseq
= ms
->m_lvbseq
;
1351 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1352 remove_lock -- used for unlock, removes lkb from granted
1353 revert_lock -- used for cancel, moves lkb from convert to granted
1354 grant_lock -- used for request and convert, adds lkb to granted or
1355 moves lkb from convert or waiting to granted
1357 Each of these is used for master or local copy lkb's. There is
1358 also a _pc() variation used to make the corresponding change on
1359 a process copy (pc) lkb. */
1361 static void _remove_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1364 lkb
->lkb_grmode
= DLM_LOCK_IV
;
1365 /* this unhold undoes the original ref from create_lkb()
1366 so this leads to the lkb being freed */
1370 static void remove_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1372 set_lvb_unlock(r
, lkb
);
1373 _remove_lock(r
, lkb
);
1376 static void remove_lock_pc(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1378 _remove_lock(r
, lkb
);
1381 /* returns: 0 did nothing
1382 1 moved lock to granted
1385 static int revert_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1389 lkb
->lkb_rqmode
= DLM_LOCK_IV
;
1391 switch (lkb
->lkb_status
) {
1392 case DLM_LKSTS_GRANTED
:
1394 case DLM_LKSTS_CONVERT
:
1395 move_lkb(r
, lkb
, DLM_LKSTS_GRANTED
);
1398 case DLM_LKSTS_WAITING
:
1400 lkb
->lkb_grmode
= DLM_LOCK_IV
;
1401 /* this unhold undoes the original ref from create_lkb()
1402 so this leads to the lkb being freed */
1407 log_print("invalid status for revert %d", lkb
->lkb_status
);
1412 static int revert_lock_pc(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1414 return revert_lock(r
, lkb
);
1417 static void _grant_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1419 if (lkb
->lkb_grmode
!= lkb
->lkb_rqmode
) {
1420 lkb
->lkb_grmode
= lkb
->lkb_rqmode
;
1421 if (lkb
->lkb_status
)
1422 move_lkb(r
, lkb
, DLM_LKSTS_GRANTED
);
1424 add_lkb(r
, lkb
, DLM_LKSTS_GRANTED
);
1427 lkb
->lkb_rqmode
= DLM_LOCK_IV
;
1430 static void grant_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1432 set_lvb_lock(r
, lkb
);
1433 _grant_lock(r
, lkb
);
1434 lkb
->lkb_highbast
= 0;
1437 static void grant_lock_pc(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
1438 struct dlm_message
*ms
)
1440 set_lvb_lock_pc(r
, lkb
, ms
);
1441 _grant_lock(r
, lkb
);
1444 /* called by grant_pending_locks() which means an async grant message must
1445 be sent to the requesting node in addition to granting the lock if the
1446 lkb belongs to a remote node. */
1448 static void grant_lock_pending(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1451 if (is_master_copy(lkb
))
1454 queue_cast(r
, lkb
, 0);
1457 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1458 change the granted/requested modes. We're munging things accordingly in
1460 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1462 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1463 compatible with other granted locks */
1465 static void munge_demoted(struct dlm_lkb
*lkb
)
1467 if (lkb
->lkb_rqmode
== DLM_LOCK_IV
|| lkb
->lkb_grmode
== DLM_LOCK_IV
) {
1468 log_print("munge_demoted %x invalid modes gr %d rq %d",
1469 lkb
->lkb_id
, lkb
->lkb_grmode
, lkb
->lkb_rqmode
);
1473 lkb
->lkb_grmode
= DLM_LOCK_NL
;
1476 static void munge_altmode(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
1478 if (ms
->m_type
!= DLM_MSG_REQUEST_REPLY
&&
1479 ms
->m_type
!= DLM_MSG_GRANT
) {
1480 log_print("munge_altmode %x invalid reply type %d",
1481 lkb
->lkb_id
, ms
->m_type
);
1485 if (lkb
->lkb_exflags
& DLM_LKF_ALTPR
)
1486 lkb
->lkb_rqmode
= DLM_LOCK_PR
;
1487 else if (lkb
->lkb_exflags
& DLM_LKF_ALTCW
)
1488 lkb
->lkb_rqmode
= DLM_LOCK_CW
;
1490 log_print("munge_altmode invalid exflags %x", lkb
->lkb_exflags
);
1495 static inline int first_in_list(struct dlm_lkb
*lkb
, struct list_head
*head
)
1497 struct dlm_lkb
*first
= list_entry(head
->next
, struct dlm_lkb
,
1499 if (lkb
->lkb_id
== first
->lkb_id
)
1505 /* Check if the given lkb conflicts with another lkb on the queue. */
1507 static int queue_conflict(struct list_head
*head
, struct dlm_lkb
*lkb
)
1509 struct dlm_lkb
*this;
1511 list_for_each_entry(this, head
, lkb_statequeue
) {
1514 if (!modes_compat(this, lkb
))
1521 * "A conversion deadlock arises with a pair of lock requests in the converting
1522 * queue for one resource. The granted mode of each lock blocks the requested
1523 * mode of the other lock."
1525 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1526 * convert queue from being granted, then deadlk/demote lkb.
1529 * Granted Queue: empty
1530 * Convert Queue: NL->EX (first lock)
1531 * PR->EX (second lock)
1533 * The first lock can't be granted because of the granted mode of the second
1534 * lock and the second lock can't be granted because it's not first in the
1535 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1536 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1537 * flag set and return DEMOTED in the lksb flags.
1539 * Originally, this function detected conv-deadlk in a more limited scope:
1540 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1541 * - if lkb1 was the first entry in the queue (not just earlier), and was
1542 * blocked by the granted mode of lkb2, and there was nothing on the
1543 * granted queue preventing lkb1 from being granted immediately, i.e.
1544 * lkb2 was the only thing preventing lkb1 from being granted.
1546 * That second condition meant we'd only say there was conv-deadlk if
1547 * resolving it (by demotion) would lead to the first lock on the convert
1548 * queue being granted right away. It allowed conversion deadlocks to exist
1549 * between locks on the convert queue while they couldn't be granted anyway.
1551 * Now, we detect and take action on conversion deadlocks immediately when
1552 * they're created, even if they may not be immediately consequential. If
1553 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1554 * mode that would prevent lkb1's conversion from being granted, we do a
1555 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1556 * I think this means that the lkb_is_ahead condition below should always
1557 * be zero, i.e. there will never be conv-deadlk between two locks that are
1558 * both already on the convert queue.
1561 static int conversion_deadlock_detect(struct dlm_rsb
*r
, struct dlm_lkb
*lkb2
)
1563 struct dlm_lkb
*lkb1
;
1564 int lkb_is_ahead
= 0;
1566 list_for_each_entry(lkb1
, &r
->res_convertqueue
, lkb_statequeue
) {
1572 if (!lkb_is_ahead
) {
1573 if (!modes_compat(lkb2
, lkb1
))
1576 if (!modes_compat(lkb2
, lkb1
) &&
1577 !modes_compat(lkb1
, lkb2
))
1585 * Return 1 if the lock can be granted, 0 otherwise.
1586 * Also detect and resolve conversion deadlocks.
1588 * lkb is the lock to be granted
1590 * now is 1 if the function is being called in the context of the
1591 * immediate request, it is 0 if called later, after the lock has been
1594 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1597 static int _can_be_granted(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int now
)
1599 int8_t conv
= (lkb
->lkb_grmode
!= DLM_LOCK_IV
);
1602 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1603 * a new request for a NL mode lock being blocked.
1605 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1606 * request, then it would be granted. In essence, the use of this flag
1607 * tells the Lock Manager to expedite theis request by not considering
1608 * what may be in the CONVERTING or WAITING queues... As of this
1609 * writing, the EXPEDITE flag can be used only with new requests for NL
1610 * mode locks. This flag is not valid for conversion requests.
1612 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1613 * conversion or used with a non-NL requested mode. We also know an
1614 * EXPEDITE request is always granted immediately, so now must always
1615 * be 1. The full condition to grant an expedite request: (now &&
1616 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1617 * therefore be shortened to just checking the flag.
1620 if (lkb
->lkb_exflags
& DLM_LKF_EXPEDITE
)
1624 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1625 * added to the remaining conditions.
1628 if (queue_conflict(&r
->res_grantqueue
, lkb
))
1632 * 6-3: By default, a conversion request is immediately granted if the
1633 * requested mode is compatible with the modes of all other granted
1637 if (queue_conflict(&r
->res_convertqueue
, lkb
))
1641 * 6-5: But the default algorithm for deciding whether to grant or
1642 * queue conversion requests does not by itself guarantee that such
1643 * requests are serviced on a "first come first serve" basis. This, in
1644 * turn, can lead to a phenomenon known as "indefinate postponement".
1646 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1647 * the system service employed to request a lock conversion. This flag
1648 * forces certain conversion requests to be queued, even if they are
1649 * compatible with the granted modes of other locks on the same
1650 * resource. Thus, the use of this flag results in conversion requests
1651 * being ordered on a "first come first servce" basis.
1653 * DCT: This condition is all about new conversions being able to occur
1654 * "in place" while the lock remains on the granted queue (assuming
1655 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1656 * doesn't _have_ to go onto the convert queue where it's processed in
1657 * order. The "now" variable is necessary to distinguish converts
1658 * being received and processed for the first time now, because once a
1659 * convert is moved to the conversion queue the condition below applies
1660 * requiring fifo granting.
1663 if (now
&& conv
&& !(lkb
->lkb_exflags
& DLM_LKF_QUECVT
))
1667 * The NOORDER flag is set to avoid the standard vms rules on grant
1671 if (lkb
->lkb_exflags
& DLM_LKF_NOORDER
)
1675 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1676 * granted until all other conversion requests ahead of it are granted
1680 if (!now
&& conv
&& first_in_list(lkb
, &r
->res_convertqueue
))
1684 * 6-4: By default, a new request is immediately granted only if all
1685 * three of the following conditions are satisfied when the request is
1687 * - The queue of ungranted conversion requests for the resource is
1689 * - The queue of ungranted new requests for the resource is empty.
1690 * - The mode of the new request is compatible with the most
1691 * restrictive mode of all granted locks on the resource.
1694 if (now
&& !conv
&& list_empty(&r
->res_convertqueue
) &&
1695 list_empty(&r
->res_waitqueue
))
1699 * 6-4: Once a lock request is in the queue of ungranted new requests,
1700 * it cannot be granted until the queue of ungranted conversion
1701 * requests is empty, all ungranted new requests ahead of it are
1702 * granted and/or canceled, and it is compatible with the granted mode
1703 * of the most restrictive lock granted on the resource.
1706 if (!now
&& !conv
&& list_empty(&r
->res_convertqueue
) &&
1707 first_in_list(lkb
, &r
->res_waitqueue
))
1713 static int can_be_granted(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int now
,
1717 int8_t alt
= 0, rqmode
= lkb
->lkb_rqmode
;
1718 int8_t is_convert
= (lkb
->lkb_grmode
!= DLM_LOCK_IV
);
1723 rv
= _can_be_granted(r
, lkb
, now
);
1728 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1729 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1730 * cancels one of the locks.
1733 if (is_convert
&& can_be_queued(lkb
) &&
1734 conversion_deadlock_detect(r
, lkb
)) {
1735 if (lkb
->lkb_exflags
& DLM_LKF_CONVDEADLK
) {
1736 lkb
->lkb_grmode
= DLM_LOCK_NL
;
1737 lkb
->lkb_sbflags
|= DLM_SBF_DEMOTED
;
1738 } else if (!(lkb
->lkb_exflags
& DLM_LKF_NODLCKWT
)) {
1742 log_print("can_be_granted deadlock %x now %d",
1751 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1752 * to grant a request in a mode other than the normal rqmode. It's a
1753 * simple way to provide a big optimization to applications that can
1757 if (rqmode
!= DLM_LOCK_PR
&& (lkb
->lkb_exflags
& DLM_LKF_ALTPR
))
1759 else if (rqmode
!= DLM_LOCK_CW
&& (lkb
->lkb_exflags
& DLM_LKF_ALTCW
))
1763 lkb
->lkb_rqmode
= alt
;
1764 rv
= _can_be_granted(r
, lkb
, now
);
1766 lkb
->lkb_sbflags
|= DLM_SBF_ALTMODE
;
1768 lkb
->lkb_rqmode
= rqmode
;
1774 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1775 for locks pending on the convert list. Once verified (watch for these
1776 log_prints), we should be able to just call _can_be_granted() and not
1777 bother with the demote/deadlk cases here (and there's no easy way to deal
1778 with a deadlk here, we'd have to generate something like grant_lock with
1779 the deadlk error.) */
1781 /* Returns the highest requested mode of all blocked conversions; sets
1782 cw if there's a blocked conversion to DLM_LOCK_CW. */
1784 static int grant_pending_convert(struct dlm_rsb
*r
, int high
, int *cw
)
1786 struct dlm_lkb
*lkb
, *s
;
1787 int hi
, demoted
, quit
, grant_restart
, demote_restart
;
1796 list_for_each_entry_safe(lkb
, s
, &r
->res_convertqueue
, lkb_statequeue
) {
1797 demoted
= is_demoted(lkb
);
1800 if (can_be_granted(r
, lkb
, 0, &deadlk
)) {
1801 grant_lock_pending(r
, lkb
);
1806 if (!demoted
&& is_demoted(lkb
)) {
1807 log_print("WARN: pending demoted %x node %d %s",
1808 lkb
->lkb_id
, lkb
->lkb_nodeid
, r
->res_name
);
1814 log_print("WARN: pending deadlock %x node %d %s",
1815 lkb
->lkb_id
, lkb
->lkb_nodeid
, r
->res_name
);
1820 hi
= max_t(int, lkb
->lkb_rqmode
, hi
);
1822 if (cw
&& lkb
->lkb_rqmode
== DLM_LOCK_CW
)
1828 if (demote_restart
&& !quit
) {
1833 return max_t(int, high
, hi
);
1836 static int grant_pending_wait(struct dlm_rsb
*r
, int high
, int *cw
)
1838 struct dlm_lkb
*lkb
, *s
;
1840 list_for_each_entry_safe(lkb
, s
, &r
->res_waitqueue
, lkb_statequeue
) {
1841 if (can_be_granted(r
, lkb
, 0, NULL
))
1842 grant_lock_pending(r
, lkb
);
1844 high
= max_t(int, lkb
->lkb_rqmode
, high
);
1845 if (lkb
->lkb_rqmode
== DLM_LOCK_CW
)
1853 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1854 on either the convert or waiting queue.
1855 high is the largest rqmode of all locks blocked on the convert or
1858 static int lock_requires_bast(struct dlm_lkb
*gr
, int high
, int cw
)
1860 if (gr
->lkb_grmode
== DLM_LOCK_PR
&& cw
) {
1861 if (gr
->lkb_highbast
< DLM_LOCK_EX
)
1866 if (gr
->lkb_highbast
< high
&&
1867 !__dlm_compat_matrix
[gr
->lkb_grmode
+1][high
+1])
1872 static void grant_pending_locks(struct dlm_rsb
*r
)
1874 struct dlm_lkb
*lkb
, *s
;
1875 int high
= DLM_LOCK_IV
;
1878 DLM_ASSERT(is_master(r
), dlm_dump_rsb(r
););
1880 high
= grant_pending_convert(r
, high
, &cw
);
1881 high
= grant_pending_wait(r
, high
, &cw
);
1883 if (high
== DLM_LOCK_IV
)
1887 * If there are locks left on the wait/convert queue then send blocking
1888 * ASTs to granted locks based on the largest requested mode (high)
1892 list_for_each_entry_safe(lkb
, s
, &r
->res_grantqueue
, lkb_statequeue
) {
1893 if (lkb
->lkb_bastfn
&& lock_requires_bast(lkb
, high
, cw
)) {
1894 if (cw
&& high
== DLM_LOCK_PR
&&
1895 lkb
->lkb_grmode
== DLM_LOCK_PR
)
1896 queue_bast(r
, lkb
, DLM_LOCK_CW
);
1898 queue_bast(r
, lkb
, high
);
1899 lkb
->lkb_highbast
= high
;
1904 static int modes_require_bast(struct dlm_lkb
*gr
, struct dlm_lkb
*rq
)
1906 if ((gr
->lkb_grmode
== DLM_LOCK_PR
&& rq
->lkb_rqmode
== DLM_LOCK_CW
) ||
1907 (gr
->lkb_grmode
== DLM_LOCK_CW
&& rq
->lkb_rqmode
== DLM_LOCK_PR
)) {
1908 if (gr
->lkb_highbast
< DLM_LOCK_EX
)
1913 if (gr
->lkb_highbast
< rq
->lkb_rqmode
&& !modes_compat(gr
, rq
))
1918 static void send_bast_queue(struct dlm_rsb
*r
, struct list_head
*head
,
1919 struct dlm_lkb
*lkb
)
1923 list_for_each_entry(gr
, head
, lkb_statequeue
) {
1924 /* skip self when sending basts to convertqueue */
1927 if (gr
->lkb_bastfn
&& modes_require_bast(gr
, lkb
)) {
1928 queue_bast(r
, gr
, lkb
->lkb_rqmode
);
1929 gr
->lkb_highbast
= lkb
->lkb_rqmode
;
1934 static void send_blocking_asts(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1936 send_bast_queue(r
, &r
->res_grantqueue
, lkb
);
1939 static void send_blocking_asts_all(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1941 send_bast_queue(r
, &r
->res_grantqueue
, lkb
);
1942 send_bast_queue(r
, &r
->res_convertqueue
, lkb
);
1945 /* set_master(r, lkb) -- set the master nodeid of a resource
1947 The purpose of this function is to set the nodeid field in the given
1948 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1949 known, it can just be copied to the lkb and the function will return
1950 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1951 before it can be copied to the lkb.
1953 When the rsb nodeid is being looked up remotely, the initial lkb
1954 causing the lookup is kept on the ls_waiters list waiting for the
1955 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1956 on the rsb's res_lookup list until the master is verified.
1959 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1960 1: the rsb master is not available and the lkb has been placed on
1964 static int set_master(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1966 struct dlm_ls
*ls
= r
->res_ls
;
1967 int i
, error
, dir_nodeid
, ret_nodeid
, our_nodeid
= dlm_our_nodeid();
1969 if (rsb_flag(r
, RSB_MASTER_UNCERTAIN
)) {
1970 rsb_clear_flag(r
, RSB_MASTER_UNCERTAIN
);
1971 r
->res_first_lkid
= lkb
->lkb_id
;
1972 lkb
->lkb_nodeid
= r
->res_nodeid
;
1976 if (r
->res_first_lkid
&& r
->res_first_lkid
!= lkb
->lkb_id
) {
1977 list_add_tail(&lkb
->lkb_rsb_lookup
, &r
->res_lookup
);
1981 if (r
->res_nodeid
== 0) {
1982 lkb
->lkb_nodeid
= 0;
1986 if (r
->res_nodeid
> 0) {
1987 lkb
->lkb_nodeid
= r
->res_nodeid
;
1991 DLM_ASSERT(r
->res_nodeid
== -1, dlm_dump_rsb(r
););
1993 dir_nodeid
= dlm_dir_nodeid(r
);
1995 if (dir_nodeid
!= our_nodeid
) {
1996 r
->res_first_lkid
= lkb
->lkb_id
;
1997 send_lookup(r
, lkb
);
2001 for (i
= 0; i
< 2; i
++) {
2002 /* It's possible for dlm_scand to remove an old rsb for
2003 this same resource from the toss list, us to create
2004 a new one, look up the master locally, and find it
2005 already exists just before dlm_scand does the
2006 dir_remove() on the previous rsb. */
2008 error
= dlm_dir_lookup(ls
, our_nodeid
, r
->res_name
,
2009 r
->res_length
, &ret_nodeid
);
2012 log_debug(ls
, "dir_lookup error %d %s", error
, r
->res_name
);
2015 if (error
&& error
!= -EEXIST
)
2018 if (ret_nodeid
== our_nodeid
) {
2019 r
->res_first_lkid
= 0;
2021 lkb
->lkb_nodeid
= 0;
2023 r
->res_first_lkid
= lkb
->lkb_id
;
2024 r
->res_nodeid
= ret_nodeid
;
2025 lkb
->lkb_nodeid
= ret_nodeid
;
2030 static void process_lookup_list(struct dlm_rsb
*r
)
2032 struct dlm_lkb
*lkb
, *safe
;
2034 list_for_each_entry_safe(lkb
, safe
, &r
->res_lookup
, lkb_rsb_lookup
) {
2035 list_del_init(&lkb
->lkb_rsb_lookup
);
2036 _request_lock(r
, lkb
);
2041 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2043 static void confirm_master(struct dlm_rsb
*r
, int error
)
2045 struct dlm_lkb
*lkb
;
2047 if (!r
->res_first_lkid
)
2053 r
->res_first_lkid
= 0;
2054 process_lookup_list(r
);
2060 /* the remote request failed and won't be retried (it was
2061 a NOQUEUE, or has been canceled/unlocked); make a waiting
2062 lkb the first_lkid */
2064 r
->res_first_lkid
= 0;
2066 if (!list_empty(&r
->res_lookup
)) {
2067 lkb
= list_entry(r
->res_lookup
.next
, struct dlm_lkb
,
2069 list_del_init(&lkb
->lkb_rsb_lookup
);
2070 r
->res_first_lkid
= lkb
->lkb_id
;
2071 _request_lock(r
, lkb
);
2076 log_error(r
->res_ls
, "confirm_master unknown error %d", error
);
2080 static int set_lock_args(int mode
, struct dlm_lksb
*lksb
, uint32_t flags
,
2081 int namelen
, unsigned long timeout_cs
,
2082 void (*ast
) (void *astparam
),
2084 void (*bast
) (void *astparam
, int mode
),
2085 struct dlm_args
*args
)
2089 /* check for invalid arg usage */
2091 if (mode
< 0 || mode
> DLM_LOCK_EX
)
2094 if (!(flags
& DLM_LKF_CONVERT
) && (namelen
> DLM_RESNAME_MAXLEN
))
2097 if (flags
& DLM_LKF_CANCEL
)
2100 if (flags
& DLM_LKF_QUECVT
&& !(flags
& DLM_LKF_CONVERT
))
2103 if (flags
& DLM_LKF_CONVDEADLK
&& !(flags
& DLM_LKF_CONVERT
))
2106 if (flags
& DLM_LKF_CONVDEADLK
&& flags
& DLM_LKF_NOQUEUE
)
2109 if (flags
& DLM_LKF_EXPEDITE
&& flags
& DLM_LKF_CONVERT
)
2112 if (flags
& DLM_LKF_EXPEDITE
&& flags
& DLM_LKF_QUECVT
)
2115 if (flags
& DLM_LKF_EXPEDITE
&& flags
& DLM_LKF_NOQUEUE
)
2118 if (flags
& DLM_LKF_EXPEDITE
&& mode
!= DLM_LOCK_NL
)
2124 if (flags
& DLM_LKF_VALBLK
&& !lksb
->sb_lvbptr
)
2127 if (flags
& DLM_LKF_CONVERT
&& !lksb
->sb_lkid
)
2130 /* these args will be copied to the lkb in validate_lock_args,
2131 it cannot be done now because when converting locks, fields in
2132 an active lkb cannot be modified before locking the rsb */
2134 args
->flags
= flags
;
2136 args
->astparam
= astparam
;
2137 args
->bastfn
= bast
;
2138 args
->timeout
= timeout_cs
;
2146 static int set_unlock_args(uint32_t flags
, void *astarg
, struct dlm_args
*args
)
2148 if (flags
& ~(DLM_LKF_CANCEL
| DLM_LKF_VALBLK
| DLM_LKF_IVVALBLK
|
2149 DLM_LKF_FORCEUNLOCK
))
2152 if (flags
& DLM_LKF_CANCEL
&& flags
& DLM_LKF_FORCEUNLOCK
)
2155 args
->flags
= flags
;
2156 args
->astparam
= astarg
;
2160 static int validate_lock_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2161 struct dlm_args
*args
)
2165 if (args
->flags
& DLM_LKF_CONVERT
) {
2166 if (lkb
->lkb_flags
& DLM_IFL_MSTCPY
)
2169 if (args
->flags
& DLM_LKF_QUECVT
&&
2170 !__quecvt_compat_matrix
[lkb
->lkb_grmode
+1][args
->mode
+1])
2174 if (lkb
->lkb_status
!= DLM_LKSTS_GRANTED
)
2177 if (lkb
->lkb_wait_type
)
2180 if (is_overlap(lkb
))
2184 lkb
->lkb_exflags
= args
->flags
;
2185 lkb
->lkb_sbflags
= 0;
2186 lkb
->lkb_astfn
= args
->astfn
;
2187 lkb
->lkb_astparam
= args
->astparam
;
2188 lkb
->lkb_bastfn
= args
->bastfn
;
2189 lkb
->lkb_rqmode
= args
->mode
;
2190 lkb
->lkb_lksb
= args
->lksb
;
2191 lkb
->lkb_lvbptr
= args
->lksb
->sb_lvbptr
;
2192 lkb
->lkb_ownpid
= (int) current
->pid
;
2193 lkb
->lkb_timeout_cs
= args
->timeout
;
2197 log_debug(ls
, "validate_lock_args %d %x %x %x %d %d %s",
2198 rv
, lkb
->lkb_id
, lkb
->lkb_flags
, args
->flags
,
2199 lkb
->lkb_status
, lkb
->lkb_wait_type
,
2200 lkb
->lkb_resource
->res_name
);
2204 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2207 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2208 because there may be a lookup in progress and it's valid to do
2209 cancel/unlockf on it */
2211 static int validate_unlock_args(struct dlm_lkb
*lkb
, struct dlm_args
*args
)
2213 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
2216 if (lkb
->lkb_flags
& DLM_IFL_MSTCPY
) {
2217 log_error(ls
, "unlock on MSTCPY %x", lkb
->lkb_id
);
2222 /* an lkb may still exist even though the lock is EOL'ed due to a
2223 cancel, unlock or failed noqueue request; an app can't use these
2224 locks; return same error as if the lkid had not been found at all */
2226 if (lkb
->lkb_flags
& DLM_IFL_ENDOFLIFE
) {
2227 log_debug(ls
, "unlock on ENDOFLIFE %x", lkb
->lkb_id
);
2232 /* an lkb may be waiting for an rsb lookup to complete where the
2233 lookup was initiated by another lock */
2235 if (!list_empty(&lkb
->lkb_rsb_lookup
)) {
2236 if (args
->flags
& (DLM_LKF_CANCEL
| DLM_LKF_FORCEUNLOCK
)) {
2237 log_debug(ls
, "unlock on rsb_lookup %x", lkb
->lkb_id
);
2238 list_del_init(&lkb
->lkb_rsb_lookup
);
2239 queue_cast(lkb
->lkb_resource
, lkb
,
2240 args
->flags
& DLM_LKF_CANCEL
?
2241 -DLM_ECANCEL
: -DLM_EUNLOCK
);
2242 unhold_lkb(lkb
); /* undoes create_lkb() */
2244 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2249 /* cancel not allowed with another cancel/unlock in progress */
2251 if (args
->flags
& DLM_LKF_CANCEL
) {
2252 if (lkb
->lkb_exflags
& DLM_LKF_CANCEL
)
2255 if (is_overlap(lkb
))
2258 /* don't let scand try to do a cancel */
2261 if (lkb
->lkb_flags
& DLM_IFL_RESEND
) {
2262 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_CANCEL
;
2267 /* there's nothing to cancel */
2268 if (lkb
->lkb_status
== DLM_LKSTS_GRANTED
&&
2269 !lkb
->lkb_wait_type
) {
2274 switch (lkb
->lkb_wait_type
) {
2275 case DLM_MSG_LOOKUP
:
2276 case DLM_MSG_REQUEST
:
2277 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_CANCEL
;
2280 case DLM_MSG_UNLOCK
:
2281 case DLM_MSG_CANCEL
:
2284 /* add_to_waiters() will set OVERLAP_CANCEL */
2288 /* do we need to allow a force-unlock if there's a normal unlock
2289 already in progress? in what conditions could the normal unlock
2290 fail such that we'd want to send a force-unlock to be sure? */
2292 if (args
->flags
& DLM_LKF_FORCEUNLOCK
) {
2293 if (lkb
->lkb_exflags
& DLM_LKF_FORCEUNLOCK
)
2296 if (is_overlap_unlock(lkb
))
2299 /* don't let scand try to do a cancel */
2302 if (lkb
->lkb_flags
& DLM_IFL_RESEND
) {
2303 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_UNLOCK
;
2308 switch (lkb
->lkb_wait_type
) {
2309 case DLM_MSG_LOOKUP
:
2310 case DLM_MSG_REQUEST
:
2311 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_UNLOCK
;
2314 case DLM_MSG_UNLOCK
:
2317 /* add_to_waiters() will set OVERLAP_UNLOCK */
2321 /* normal unlock not allowed if there's any op in progress */
2323 if (lkb
->lkb_wait_type
|| lkb
->lkb_wait_count
)
2327 /* an overlapping op shouldn't blow away exflags from other op */
2328 lkb
->lkb_exflags
|= args
->flags
;
2329 lkb
->lkb_sbflags
= 0;
2330 lkb
->lkb_astparam
= args
->astparam
;
2334 log_debug(ls
, "validate_unlock_args %d %x %x %x %x %d %s", rv
,
2335 lkb
->lkb_id
, lkb
->lkb_flags
, lkb
->lkb_exflags
,
2336 args
->flags
, lkb
->lkb_wait_type
,
2337 lkb
->lkb_resource
->res_name
);
2342 * Four stage 4 varieties:
2343 * do_request(), do_convert(), do_unlock(), do_cancel()
2344 * These are called on the master node for the given lock and
2345 * from the central locking logic.
2348 static int do_request(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2352 if (can_be_granted(r
, lkb
, 1, NULL
)) {
2354 queue_cast(r
, lkb
, 0);
2358 if (can_be_queued(lkb
)) {
2359 error
= -EINPROGRESS
;
2360 add_lkb(r
, lkb
, DLM_LKSTS_WAITING
);
2366 queue_cast(r
, lkb
, -EAGAIN
);
2371 static void do_request_effects(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2376 if (force_blocking_asts(lkb
))
2377 send_blocking_asts_all(r
, lkb
);
2380 send_blocking_asts(r
, lkb
);
2385 static int do_convert(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2390 /* changing an existing lock may allow others to be granted */
2392 if (can_be_granted(r
, lkb
, 1, &deadlk
)) {
2394 queue_cast(r
, lkb
, 0);
2398 /* can_be_granted() detected that this lock would block in a conversion
2399 deadlock, so we leave it on the granted queue and return EDEADLK in
2400 the ast for the convert. */
2403 /* it's left on the granted queue */
2404 log_debug(r
->res_ls
, "deadlock %x node %d sts%d g%d r%d %s",
2405 lkb
->lkb_id
, lkb
->lkb_nodeid
, lkb
->lkb_status
,
2406 lkb
->lkb_grmode
, lkb
->lkb_rqmode
, r
->res_name
);
2407 revert_lock(r
, lkb
);
2408 queue_cast(r
, lkb
, -EDEADLK
);
2413 /* is_demoted() means the can_be_granted() above set the grmode
2414 to NL, and left us on the granted queue. This auto-demotion
2415 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2416 now grantable. We have to try to grant other converting locks
2417 before we try again to grant this one. */
2419 if (is_demoted(lkb
)) {
2420 grant_pending_convert(r
, DLM_LOCK_IV
, NULL
);
2421 if (_can_be_granted(r
, lkb
, 1)) {
2423 queue_cast(r
, lkb
, 0);
2426 /* else fall through and move to convert queue */
2429 if (can_be_queued(lkb
)) {
2430 error
= -EINPROGRESS
;
2432 add_lkb(r
, lkb
, DLM_LKSTS_CONVERT
);
2438 queue_cast(r
, lkb
, -EAGAIN
);
2443 static void do_convert_effects(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2448 grant_pending_locks(r
);
2449 /* grant_pending_locks also sends basts */
2452 if (force_blocking_asts(lkb
))
2453 send_blocking_asts_all(r
, lkb
);
2456 send_blocking_asts(r
, lkb
);
2461 static int do_unlock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2463 remove_lock(r
, lkb
);
2464 queue_cast(r
, lkb
, -DLM_EUNLOCK
);
2465 return -DLM_EUNLOCK
;
2468 static void do_unlock_effects(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2471 grant_pending_locks(r
);
2474 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2476 static int do_cancel(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2480 error
= revert_lock(r
, lkb
);
2482 queue_cast(r
, lkb
, -DLM_ECANCEL
);
2483 return -DLM_ECANCEL
;
2488 static void do_cancel_effects(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2492 grant_pending_locks(r
);
2496 * Four stage 3 varieties:
2497 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2500 /* add a new lkb to a possibly new rsb, called by requesting process */
2502 static int _request_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2506 /* set_master: sets lkb nodeid from r */
2508 error
= set_master(r
, lkb
);
2517 /* receive_request() calls do_request() on remote node */
2518 error
= send_request(r
, lkb
);
2520 error
= do_request(r
, lkb
);
2521 /* for remote locks the request_reply is sent
2522 between do_request and do_request_effects */
2523 do_request_effects(r
, lkb
, error
);
2529 /* change some property of an existing lkb, e.g. mode */
2531 static int _convert_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2536 /* receive_convert() calls do_convert() on remote node */
2537 error
= send_convert(r
, lkb
);
2539 error
= do_convert(r
, lkb
);
2540 /* for remote locks the convert_reply is sent
2541 between do_convert and do_convert_effects */
2542 do_convert_effects(r
, lkb
, error
);
2548 /* remove an existing lkb from the granted queue */
2550 static int _unlock_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2555 /* receive_unlock() calls do_unlock() on remote node */
2556 error
= send_unlock(r
, lkb
);
2558 error
= do_unlock(r
, lkb
);
2559 /* for remote locks the unlock_reply is sent
2560 between do_unlock and do_unlock_effects */
2561 do_unlock_effects(r
, lkb
, error
);
2567 /* remove an existing lkb from the convert or wait queue */
2569 static int _cancel_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2574 /* receive_cancel() calls do_cancel() on remote node */
2575 error
= send_cancel(r
, lkb
);
2577 error
= do_cancel(r
, lkb
);
2578 /* for remote locks the cancel_reply is sent
2579 between do_cancel and do_cancel_effects */
2580 do_cancel_effects(r
, lkb
, error
);
2587 * Four stage 2 varieties:
2588 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2591 static int request_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
, char *name
,
2592 int len
, struct dlm_args
*args
)
2597 error
= validate_lock_args(ls
, lkb
, args
);
2601 error
= find_rsb(ls
, name
, len
, R_CREATE
, &r
);
2608 lkb
->lkb_lksb
->sb_lkid
= lkb
->lkb_id
;
2610 error
= _request_lock(r
, lkb
);
2619 static int convert_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2620 struct dlm_args
*args
)
2625 r
= lkb
->lkb_resource
;
2630 error
= validate_lock_args(ls
, lkb
, args
);
2634 error
= _convert_lock(r
, lkb
);
2641 static int unlock_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2642 struct dlm_args
*args
)
2647 r
= lkb
->lkb_resource
;
2652 error
= validate_unlock_args(lkb
, args
);
2656 error
= _unlock_lock(r
, lkb
);
2663 static int cancel_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2664 struct dlm_args
*args
)
2669 r
= lkb
->lkb_resource
;
2674 error
= validate_unlock_args(lkb
, args
);
2678 error
= _cancel_lock(r
, lkb
);
2686 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2689 int dlm_lock(dlm_lockspace_t
*lockspace
,
2691 struct dlm_lksb
*lksb
,
2694 unsigned int namelen
,
2695 uint32_t parent_lkid
,
2696 void (*ast
) (void *astarg
),
2698 void (*bast
) (void *astarg
, int mode
))
2701 struct dlm_lkb
*lkb
;
2702 struct dlm_args args
;
2703 int error
, convert
= flags
& DLM_LKF_CONVERT
;
2705 ls
= dlm_find_lockspace_local(lockspace
);
2709 dlm_lock_recovery(ls
);
2712 error
= find_lkb(ls
, lksb
->sb_lkid
, &lkb
);
2714 error
= create_lkb(ls
, &lkb
);
2719 error
= set_lock_args(mode
, lksb
, flags
, namelen
, 0, ast
,
2720 astarg
, bast
, &args
);
2725 error
= convert_lock(ls
, lkb
, &args
);
2727 error
= request_lock(ls
, lkb
, name
, namelen
, &args
);
2729 if (error
== -EINPROGRESS
)
2732 if (convert
|| error
)
2734 if (error
== -EAGAIN
|| error
== -EDEADLK
)
2737 dlm_unlock_recovery(ls
);
2738 dlm_put_lockspace(ls
);
2742 int dlm_unlock(dlm_lockspace_t
*lockspace
,
2745 struct dlm_lksb
*lksb
,
2749 struct dlm_lkb
*lkb
;
2750 struct dlm_args args
;
2753 ls
= dlm_find_lockspace_local(lockspace
);
2757 dlm_lock_recovery(ls
);
2759 error
= find_lkb(ls
, lkid
, &lkb
);
2763 error
= set_unlock_args(flags
, astarg
, &args
);
2767 if (flags
& DLM_LKF_CANCEL
)
2768 error
= cancel_lock(ls
, lkb
, &args
);
2770 error
= unlock_lock(ls
, lkb
, &args
);
2772 if (error
== -DLM_EUNLOCK
|| error
== -DLM_ECANCEL
)
2774 if (error
== -EBUSY
&& (flags
& (DLM_LKF_CANCEL
| DLM_LKF_FORCEUNLOCK
)))
2779 dlm_unlock_recovery(ls
);
2780 dlm_put_lockspace(ls
);
2785 * send/receive routines for remote operations and replies
2789 * send_request receive_request
2790 * send_convert receive_convert
2791 * send_unlock receive_unlock
2792 * send_cancel receive_cancel
2793 * send_grant receive_grant
2794 * send_bast receive_bast
2795 * send_lookup receive_lookup
2796 * send_remove receive_remove
2799 * receive_request_reply send_request_reply
2800 * receive_convert_reply send_convert_reply
2801 * receive_unlock_reply send_unlock_reply
2802 * receive_cancel_reply send_cancel_reply
2803 * receive_lookup_reply send_lookup_reply
2806 static int _create_message(struct dlm_ls
*ls
, int mb_len
,
2807 int to_nodeid
, int mstype
,
2808 struct dlm_message
**ms_ret
,
2809 struct dlm_mhandle
**mh_ret
)
2811 struct dlm_message
*ms
;
2812 struct dlm_mhandle
*mh
;
2815 /* get_buffer gives us a message handle (mh) that we need to
2816 pass into lowcomms_commit and a message buffer (mb) that we
2817 write our data into */
2819 mh
= dlm_lowcomms_get_buffer(to_nodeid
, mb_len
, GFP_NOFS
, &mb
);
2823 memset(mb
, 0, mb_len
);
2825 ms
= (struct dlm_message
*) mb
;
2827 ms
->m_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
2828 ms
->m_header
.h_lockspace
= ls
->ls_global_id
;
2829 ms
->m_header
.h_nodeid
= dlm_our_nodeid();
2830 ms
->m_header
.h_length
= mb_len
;
2831 ms
->m_header
.h_cmd
= DLM_MSG
;
2833 ms
->m_type
= mstype
;
2840 static int create_message(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2841 int to_nodeid
, int mstype
,
2842 struct dlm_message
**ms_ret
,
2843 struct dlm_mhandle
**mh_ret
)
2845 int mb_len
= sizeof(struct dlm_message
);
2848 case DLM_MSG_REQUEST
:
2849 case DLM_MSG_LOOKUP
:
2850 case DLM_MSG_REMOVE
:
2851 mb_len
+= r
->res_length
;
2853 case DLM_MSG_CONVERT
:
2854 case DLM_MSG_UNLOCK
:
2855 case DLM_MSG_REQUEST_REPLY
:
2856 case DLM_MSG_CONVERT_REPLY
:
2858 if (lkb
&& lkb
->lkb_lvbptr
)
2859 mb_len
+= r
->res_ls
->ls_lvblen
;
2863 return _create_message(r
->res_ls
, mb_len
, to_nodeid
, mstype
,
2867 /* further lowcomms enhancements or alternate implementations may make
2868 the return value from this function useful at some point */
2870 static int send_message(struct dlm_mhandle
*mh
, struct dlm_message
*ms
)
2872 dlm_message_out(ms
);
2873 dlm_lowcomms_commit_buffer(mh
);
2877 static void send_args(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2878 struct dlm_message
*ms
)
2880 ms
->m_nodeid
= lkb
->lkb_nodeid
;
2881 ms
->m_pid
= lkb
->lkb_ownpid
;
2882 ms
->m_lkid
= lkb
->lkb_id
;
2883 ms
->m_remid
= lkb
->lkb_remid
;
2884 ms
->m_exflags
= lkb
->lkb_exflags
;
2885 ms
->m_sbflags
= lkb
->lkb_sbflags
;
2886 ms
->m_flags
= lkb
->lkb_flags
;
2887 ms
->m_lvbseq
= lkb
->lkb_lvbseq
;
2888 ms
->m_status
= lkb
->lkb_status
;
2889 ms
->m_grmode
= lkb
->lkb_grmode
;
2890 ms
->m_rqmode
= lkb
->lkb_rqmode
;
2891 ms
->m_hash
= r
->res_hash
;
2893 /* m_result and m_bastmode are set from function args,
2894 not from lkb fields */
2896 if (lkb
->lkb_bastfn
)
2897 ms
->m_asts
|= DLM_CB_BAST
;
2899 ms
->m_asts
|= DLM_CB_CAST
;
2901 /* compare with switch in create_message; send_remove() doesn't
2904 switch (ms
->m_type
) {
2905 case DLM_MSG_REQUEST
:
2906 case DLM_MSG_LOOKUP
:
2907 memcpy(ms
->m_extra
, r
->res_name
, r
->res_length
);
2909 case DLM_MSG_CONVERT
:
2910 case DLM_MSG_UNLOCK
:
2911 case DLM_MSG_REQUEST_REPLY
:
2912 case DLM_MSG_CONVERT_REPLY
:
2914 if (!lkb
->lkb_lvbptr
)
2916 memcpy(ms
->m_extra
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
2921 static int send_common(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int mstype
)
2923 struct dlm_message
*ms
;
2924 struct dlm_mhandle
*mh
;
2925 int to_nodeid
, error
;
2927 to_nodeid
= r
->res_nodeid
;
2929 error
= add_to_waiters(lkb
, mstype
, to_nodeid
);
2933 error
= create_message(r
, lkb
, to_nodeid
, mstype
, &ms
, &mh
);
2937 send_args(r
, lkb
, ms
);
2939 error
= send_message(mh
, ms
);
2945 remove_from_waiters(lkb
, msg_reply_type(mstype
));
2949 static int send_request(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2951 return send_common(r
, lkb
, DLM_MSG_REQUEST
);
2954 static int send_convert(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2958 error
= send_common(r
, lkb
, DLM_MSG_CONVERT
);
2960 /* down conversions go without a reply from the master */
2961 if (!error
&& down_conversion(lkb
)) {
2962 remove_from_waiters(lkb
, DLM_MSG_CONVERT_REPLY
);
2963 r
->res_ls
->ls_stub_ms
.m_flags
= DLM_IFL_STUB_MS
;
2964 r
->res_ls
->ls_stub_ms
.m_type
= DLM_MSG_CONVERT_REPLY
;
2965 r
->res_ls
->ls_stub_ms
.m_result
= 0;
2966 __receive_convert_reply(r
, lkb
, &r
->res_ls
->ls_stub_ms
);
2972 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2973 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2974 that the master is still correct. */
2976 static int send_unlock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2978 return send_common(r
, lkb
, DLM_MSG_UNLOCK
);
2981 static int send_cancel(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2983 return send_common(r
, lkb
, DLM_MSG_CANCEL
);
2986 static int send_grant(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2988 struct dlm_message
*ms
;
2989 struct dlm_mhandle
*mh
;
2990 int to_nodeid
, error
;
2992 to_nodeid
= lkb
->lkb_nodeid
;
2994 error
= create_message(r
, lkb
, to_nodeid
, DLM_MSG_GRANT
, &ms
, &mh
);
2998 send_args(r
, lkb
, ms
);
3002 error
= send_message(mh
, ms
);
3007 static int send_bast(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int mode
)
3009 struct dlm_message
*ms
;
3010 struct dlm_mhandle
*mh
;
3011 int to_nodeid
, error
;
3013 to_nodeid
= lkb
->lkb_nodeid
;
3015 error
= create_message(r
, NULL
, to_nodeid
, DLM_MSG_BAST
, &ms
, &mh
);
3019 send_args(r
, lkb
, ms
);
3021 ms
->m_bastmode
= mode
;
3023 error
= send_message(mh
, ms
);
3028 static int send_lookup(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
3030 struct dlm_message
*ms
;
3031 struct dlm_mhandle
*mh
;
3032 int to_nodeid
, error
;
3034 to_nodeid
= dlm_dir_nodeid(r
);
3036 error
= add_to_waiters(lkb
, DLM_MSG_LOOKUP
, to_nodeid
);
3040 error
= create_message(r
, NULL
, to_nodeid
, DLM_MSG_LOOKUP
, &ms
, &mh
);
3044 send_args(r
, lkb
, ms
);
3046 error
= send_message(mh
, ms
);
3052 remove_from_waiters(lkb
, DLM_MSG_LOOKUP_REPLY
);
3056 static int send_remove(struct dlm_rsb
*r
)
3058 struct dlm_message
*ms
;
3059 struct dlm_mhandle
*mh
;
3060 int to_nodeid
, error
;
3062 to_nodeid
= dlm_dir_nodeid(r
);
3064 error
= create_message(r
, NULL
, to_nodeid
, DLM_MSG_REMOVE
, &ms
, &mh
);
3068 memcpy(ms
->m_extra
, r
->res_name
, r
->res_length
);
3069 ms
->m_hash
= r
->res_hash
;
3071 error
= send_message(mh
, ms
);
3076 static int send_common_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
3079 struct dlm_message
*ms
;
3080 struct dlm_mhandle
*mh
;
3081 int to_nodeid
, error
;
3083 to_nodeid
= lkb
->lkb_nodeid
;
3085 error
= create_message(r
, lkb
, to_nodeid
, mstype
, &ms
, &mh
);
3089 send_args(r
, lkb
, ms
);
3093 error
= send_message(mh
, ms
);
3098 static int send_request_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
3100 return send_common_reply(r
, lkb
, DLM_MSG_REQUEST_REPLY
, rv
);
3103 static int send_convert_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
3105 return send_common_reply(r
, lkb
, DLM_MSG_CONVERT_REPLY
, rv
);
3108 static int send_unlock_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
3110 return send_common_reply(r
, lkb
, DLM_MSG_UNLOCK_REPLY
, rv
);
3113 static int send_cancel_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
3115 return send_common_reply(r
, lkb
, DLM_MSG_CANCEL_REPLY
, rv
);
3118 static int send_lookup_reply(struct dlm_ls
*ls
, struct dlm_message
*ms_in
,
3119 int ret_nodeid
, int rv
)
3121 struct dlm_rsb
*r
= &ls
->ls_stub_rsb
;
3122 struct dlm_message
*ms
;
3123 struct dlm_mhandle
*mh
;
3124 int error
, nodeid
= ms_in
->m_header
.h_nodeid
;
3126 error
= create_message(r
, NULL
, nodeid
, DLM_MSG_LOOKUP_REPLY
, &ms
, &mh
);
3130 ms
->m_lkid
= ms_in
->m_lkid
;
3132 ms
->m_nodeid
= ret_nodeid
;
3134 error
= send_message(mh
, ms
);
3139 /* which args we save from a received message depends heavily on the type
3140 of message, unlike the send side where we can safely send everything about
3141 the lkb for any type of message */
3143 static void receive_flags(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3145 lkb
->lkb_exflags
= ms
->m_exflags
;
3146 lkb
->lkb_sbflags
= ms
->m_sbflags
;
3147 lkb
->lkb_flags
= (lkb
->lkb_flags
& 0xFFFF0000) |
3148 (ms
->m_flags
& 0x0000FFFF);
3151 static void receive_flags_reply(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3153 if (ms
->m_flags
== DLM_IFL_STUB_MS
)
3156 lkb
->lkb_sbflags
= ms
->m_sbflags
;
3157 lkb
->lkb_flags
= (lkb
->lkb_flags
& 0xFFFF0000) |
3158 (ms
->m_flags
& 0x0000FFFF);
3161 static int receive_extralen(struct dlm_message
*ms
)
3163 return (ms
->m_header
.h_length
- sizeof(struct dlm_message
));
3166 static int receive_lvb(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
3167 struct dlm_message
*ms
)
3171 if (lkb
->lkb_exflags
& DLM_LKF_VALBLK
) {
3172 if (!lkb
->lkb_lvbptr
)
3173 lkb
->lkb_lvbptr
= dlm_allocate_lvb(ls
);
3174 if (!lkb
->lkb_lvbptr
)
3176 len
= receive_extralen(ms
);
3177 if (len
> DLM_RESNAME_MAXLEN
)
3178 len
= DLM_RESNAME_MAXLEN
;
3179 memcpy(lkb
->lkb_lvbptr
, ms
->m_extra
, len
);
3184 static void fake_bastfn(void *astparam
, int mode
)
3186 log_print("fake_bastfn should not be called");
3189 static void fake_astfn(void *astparam
)
3191 log_print("fake_astfn should not be called");
3194 static int receive_request_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
3195 struct dlm_message
*ms
)
3197 lkb
->lkb_nodeid
= ms
->m_header
.h_nodeid
;
3198 lkb
->lkb_ownpid
= ms
->m_pid
;
3199 lkb
->lkb_remid
= ms
->m_lkid
;
3200 lkb
->lkb_grmode
= DLM_LOCK_IV
;
3201 lkb
->lkb_rqmode
= ms
->m_rqmode
;
3203 lkb
->lkb_bastfn
= (ms
->m_asts
& DLM_CB_BAST
) ? &fake_bastfn
: NULL
;
3204 lkb
->lkb_astfn
= (ms
->m_asts
& DLM_CB_CAST
) ? &fake_astfn
: NULL
;
3206 if (lkb
->lkb_exflags
& DLM_LKF_VALBLK
) {
3207 /* lkb was just created so there won't be an lvb yet */
3208 lkb
->lkb_lvbptr
= dlm_allocate_lvb(ls
);
3209 if (!lkb
->lkb_lvbptr
)
3216 static int receive_convert_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
3217 struct dlm_message
*ms
)
3219 if (lkb
->lkb_status
!= DLM_LKSTS_GRANTED
)
3222 if (receive_lvb(ls
, lkb
, ms
))
3225 lkb
->lkb_rqmode
= ms
->m_rqmode
;
3226 lkb
->lkb_lvbseq
= ms
->m_lvbseq
;
3231 static int receive_unlock_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
3232 struct dlm_message
*ms
)
3234 if (receive_lvb(ls
, lkb
, ms
))
3239 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3240 uses to send a reply and that the remote end uses to process the reply. */
3242 static void setup_stub_lkb(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3244 struct dlm_lkb
*lkb
= &ls
->ls_stub_lkb
;
3245 lkb
->lkb_nodeid
= ms
->m_header
.h_nodeid
;
3246 lkb
->lkb_remid
= ms
->m_lkid
;
3249 /* This is called after the rsb is locked so that we can safely inspect
3250 fields in the lkb. */
3252 static int validate_message(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3254 int from
= ms
->m_header
.h_nodeid
;
3257 switch (ms
->m_type
) {
3258 case DLM_MSG_CONVERT
:
3259 case DLM_MSG_UNLOCK
:
3260 case DLM_MSG_CANCEL
:
3261 if (!is_master_copy(lkb
) || lkb
->lkb_nodeid
!= from
)
3265 case DLM_MSG_CONVERT_REPLY
:
3266 case DLM_MSG_UNLOCK_REPLY
:
3267 case DLM_MSG_CANCEL_REPLY
:
3270 if (!is_process_copy(lkb
) || lkb
->lkb_nodeid
!= from
)
3274 case DLM_MSG_REQUEST_REPLY
:
3275 if (!is_process_copy(lkb
))
3277 else if (lkb
->lkb_nodeid
!= -1 && lkb
->lkb_nodeid
!= from
)
3286 log_error(lkb
->lkb_resource
->res_ls
,
3287 "ignore invalid message %d from %d %x %x %x %d",
3288 ms
->m_type
, from
, lkb
->lkb_id
, lkb
->lkb_remid
,
3289 lkb
->lkb_flags
, lkb
->lkb_nodeid
);
3293 static void receive_request(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3295 struct dlm_lkb
*lkb
;
3299 error
= create_lkb(ls
, &lkb
);
3303 receive_flags(lkb
, ms
);
3304 lkb
->lkb_flags
|= DLM_IFL_MSTCPY
;
3305 error
= receive_request_args(ls
, lkb
, ms
);
3311 namelen
= receive_extralen(ms
);
3313 error
= find_rsb(ls
, ms
->m_extra
, namelen
, R_MASTER
, &r
);
3322 error
= do_request(r
, lkb
);
3323 send_request_reply(r
, lkb
, error
);
3324 do_request_effects(r
, lkb
, error
);
3329 if (error
== -EINPROGRESS
)
3336 setup_stub_lkb(ls
, ms
);
3337 send_request_reply(&ls
->ls_stub_rsb
, &ls
->ls_stub_lkb
, error
);
3340 static void receive_convert(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3342 struct dlm_lkb
*lkb
;
3344 int error
, reply
= 1;
3346 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3350 r
= lkb
->lkb_resource
;
3355 error
= validate_message(lkb
, ms
);
3359 receive_flags(lkb
, ms
);
3361 error
= receive_convert_args(ls
, lkb
, ms
);
3363 send_convert_reply(r
, lkb
, error
);
3367 reply
= !down_conversion(lkb
);
3369 error
= do_convert(r
, lkb
);
3371 send_convert_reply(r
, lkb
, error
);
3372 do_convert_effects(r
, lkb
, error
);
3380 setup_stub_lkb(ls
, ms
);
3381 send_convert_reply(&ls
->ls_stub_rsb
, &ls
->ls_stub_lkb
, error
);
3384 static void receive_unlock(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3386 struct dlm_lkb
*lkb
;
3390 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3394 r
= lkb
->lkb_resource
;
3399 error
= validate_message(lkb
, ms
);
3403 receive_flags(lkb
, ms
);
3405 error
= receive_unlock_args(ls
, lkb
, ms
);
3407 send_unlock_reply(r
, lkb
, error
);
3411 error
= do_unlock(r
, lkb
);
3412 send_unlock_reply(r
, lkb
, error
);
3413 do_unlock_effects(r
, lkb
, error
);
3421 setup_stub_lkb(ls
, ms
);
3422 send_unlock_reply(&ls
->ls_stub_rsb
, &ls
->ls_stub_lkb
, error
);
3425 static void receive_cancel(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3427 struct dlm_lkb
*lkb
;
3431 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3435 receive_flags(lkb
, ms
);
3437 r
= lkb
->lkb_resource
;
3442 error
= validate_message(lkb
, ms
);
3446 error
= do_cancel(r
, lkb
);
3447 send_cancel_reply(r
, lkb
, error
);
3448 do_cancel_effects(r
, lkb
, error
);
3456 setup_stub_lkb(ls
, ms
);
3457 send_cancel_reply(&ls
->ls_stub_rsb
, &ls
->ls_stub_lkb
, error
);
3460 static void receive_grant(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3462 struct dlm_lkb
*lkb
;
3466 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3468 log_debug(ls
, "receive_grant from %d no lkb %x",
3469 ms
->m_header
.h_nodeid
, ms
->m_remid
);
3473 r
= lkb
->lkb_resource
;
3478 error
= validate_message(lkb
, ms
);
3482 receive_flags_reply(lkb
, ms
);
3483 if (is_altmode(lkb
))
3484 munge_altmode(lkb
, ms
);
3485 grant_lock_pc(r
, lkb
, ms
);
3486 queue_cast(r
, lkb
, 0);
3493 static void receive_bast(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3495 struct dlm_lkb
*lkb
;
3499 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3501 log_debug(ls
, "receive_bast from %d no lkb %x",
3502 ms
->m_header
.h_nodeid
, ms
->m_remid
);
3506 r
= lkb
->lkb_resource
;
3511 error
= validate_message(lkb
, ms
);
3515 queue_bast(r
, lkb
, ms
->m_bastmode
);
3522 static void receive_lookup(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3524 int len
, error
, ret_nodeid
, dir_nodeid
, from_nodeid
, our_nodeid
;
3526 from_nodeid
= ms
->m_header
.h_nodeid
;
3527 our_nodeid
= dlm_our_nodeid();
3529 len
= receive_extralen(ms
);
3531 dir_nodeid
= dlm_hash2nodeid(ls
, ms
->m_hash
);
3532 if (dir_nodeid
!= our_nodeid
) {
3533 log_error(ls
, "lookup dir_nodeid %d from %d",
3534 dir_nodeid
, from_nodeid
);
3540 error
= dlm_dir_lookup(ls
, from_nodeid
, ms
->m_extra
, len
, &ret_nodeid
);
3542 /* Optimization: we're master so treat lookup as a request */
3543 if (!error
&& ret_nodeid
== our_nodeid
) {
3544 receive_request(ls
, ms
);
3548 send_lookup_reply(ls
, ms
, ret_nodeid
, error
);
3551 static void receive_remove(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3553 int len
, dir_nodeid
, from_nodeid
;
3555 from_nodeid
= ms
->m_header
.h_nodeid
;
3557 len
= receive_extralen(ms
);
3559 dir_nodeid
= dlm_hash2nodeid(ls
, ms
->m_hash
);
3560 if (dir_nodeid
!= dlm_our_nodeid()) {
3561 log_error(ls
, "remove dir entry dir_nodeid %d from %d",
3562 dir_nodeid
, from_nodeid
);
3566 dlm_dir_remove_entry(ls
, from_nodeid
, ms
->m_extra
, len
);
3569 static void receive_purge(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3571 do_purge(ls
, ms
->m_nodeid
, ms
->m_pid
);
3574 static void receive_request_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3576 struct dlm_lkb
*lkb
;
3578 int error
, mstype
, result
;
3580 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3582 log_debug(ls
, "receive_request_reply from %d no lkb %x",
3583 ms
->m_header
.h_nodeid
, ms
->m_remid
);
3587 r
= lkb
->lkb_resource
;
3591 error
= validate_message(lkb
, ms
);
3595 mstype
= lkb
->lkb_wait_type
;
3596 error
= remove_from_waiters(lkb
, DLM_MSG_REQUEST_REPLY
);
3600 /* Optimization: the dir node was also the master, so it took our
3601 lookup as a request and sent request reply instead of lookup reply */
3602 if (mstype
== DLM_MSG_LOOKUP
) {
3603 r
->res_nodeid
= ms
->m_header
.h_nodeid
;
3604 lkb
->lkb_nodeid
= r
->res_nodeid
;
3607 /* this is the value returned from do_request() on the master */
3608 result
= ms
->m_result
;
3612 /* request would block (be queued) on remote master */
3613 queue_cast(r
, lkb
, -EAGAIN
);
3614 confirm_master(r
, -EAGAIN
);
3615 unhold_lkb(lkb
); /* undoes create_lkb() */
3620 /* request was queued or granted on remote master */
3621 receive_flags_reply(lkb
, ms
);
3622 lkb
->lkb_remid
= ms
->m_lkid
;
3623 if (is_altmode(lkb
))
3624 munge_altmode(lkb
, ms
);
3626 add_lkb(r
, lkb
, DLM_LKSTS_WAITING
);
3629 grant_lock_pc(r
, lkb
, ms
);
3630 queue_cast(r
, lkb
, 0);
3632 confirm_master(r
, result
);
3637 /* find_rsb failed to find rsb or rsb wasn't master */
3638 log_debug(ls
, "receive_request_reply %x %x master diff %d %d",
3639 lkb
->lkb_id
, lkb
->lkb_flags
, r
->res_nodeid
, result
);
3641 lkb
->lkb_nodeid
= -1;
3643 if (is_overlap(lkb
)) {
3644 /* we'll ignore error in cancel/unlock reply */
3645 queue_cast_overlap(r
, lkb
);
3646 confirm_master(r
, result
);
3647 unhold_lkb(lkb
); /* undoes create_lkb() */
3649 _request_lock(r
, lkb
);
3653 log_error(ls
, "receive_request_reply %x error %d",
3654 lkb
->lkb_id
, result
);
3657 if (is_overlap_unlock(lkb
) && (result
== 0 || result
== -EINPROGRESS
)) {
3658 log_debug(ls
, "receive_request_reply %x result %d unlock",
3659 lkb
->lkb_id
, result
);
3660 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
3661 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
3662 send_unlock(r
, lkb
);
3663 } else if (is_overlap_cancel(lkb
) && (result
== -EINPROGRESS
)) {
3664 log_debug(ls
, "receive_request_reply %x cancel", lkb
->lkb_id
);
3665 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
3666 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
3667 send_cancel(r
, lkb
);
3669 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
3670 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
3678 static void __receive_convert_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
3679 struct dlm_message
*ms
)
3681 /* this is the value returned from do_convert() on the master */
3682 switch (ms
->m_result
) {
3684 /* convert would block (be queued) on remote master */
3685 queue_cast(r
, lkb
, -EAGAIN
);
3689 receive_flags_reply(lkb
, ms
);
3690 revert_lock_pc(r
, lkb
);
3691 queue_cast(r
, lkb
, -EDEADLK
);
3695 /* convert was queued on remote master */
3696 receive_flags_reply(lkb
, ms
);
3697 if (is_demoted(lkb
))
3700 add_lkb(r
, lkb
, DLM_LKSTS_CONVERT
);
3705 /* convert was granted on remote master */
3706 receive_flags_reply(lkb
, ms
);
3707 if (is_demoted(lkb
))
3709 grant_lock_pc(r
, lkb
, ms
);
3710 queue_cast(r
, lkb
, 0);
3714 log_error(r
->res_ls
, "receive_convert_reply %x error %d",
3715 lkb
->lkb_id
, ms
->m_result
);
3719 static void _receive_convert_reply(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3721 struct dlm_rsb
*r
= lkb
->lkb_resource
;
3727 error
= validate_message(lkb
, ms
);
3731 /* stub reply can happen with waiters_mutex held */
3732 error
= remove_from_waiters_ms(lkb
, ms
);
3736 __receive_convert_reply(r
, lkb
, ms
);
3742 static void receive_convert_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3744 struct dlm_lkb
*lkb
;
3747 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3749 log_debug(ls
, "receive_convert_reply from %d no lkb %x",
3750 ms
->m_header
.h_nodeid
, ms
->m_remid
);
3754 _receive_convert_reply(lkb
, ms
);
3758 static void _receive_unlock_reply(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3760 struct dlm_rsb
*r
= lkb
->lkb_resource
;
3766 error
= validate_message(lkb
, ms
);
3770 /* stub reply can happen with waiters_mutex held */
3771 error
= remove_from_waiters_ms(lkb
, ms
);
3775 /* this is the value returned from do_unlock() on the master */
3777 switch (ms
->m_result
) {
3779 receive_flags_reply(lkb
, ms
);
3780 remove_lock_pc(r
, lkb
);
3781 queue_cast(r
, lkb
, -DLM_EUNLOCK
);
3786 log_error(r
->res_ls
, "receive_unlock_reply %x error %d",
3787 lkb
->lkb_id
, ms
->m_result
);
3794 static void receive_unlock_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3796 struct dlm_lkb
*lkb
;
3799 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3801 log_debug(ls
, "receive_unlock_reply from %d no lkb %x",
3802 ms
->m_header
.h_nodeid
, ms
->m_remid
);
3806 _receive_unlock_reply(lkb
, ms
);
3810 static void _receive_cancel_reply(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3812 struct dlm_rsb
*r
= lkb
->lkb_resource
;
3818 error
= validate_message(lkb
, ms
);
3822 /* stub reply can happen with waiters_mutex held */
3823 error
= remove_from_waiters_ms(lkb
, ms
);
3827 /* this is the value returned from do_cancel() on the master */
3829 switch (ms
->m_result
) {
3831 receive_flags_reply(lkb
, ms
);
3832 revert_lock_pc(r
, lkb
);
3833 queue_cast(r
, lkb
, -DLM_ECANCEL
);
3838 log_error(r
->res_ls
, "receive_cancel_reply %x error %d",
3839 lkb
->lkb_id
, ms
->m_result
);
3846 static void receive_cancel_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3848 struct dlm_lkb
*lkb
;
3851 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3853 log_debug(ls
, "receive_cancel_reply from %d no lkb %x",
3854 ms
->m_header
.h_nodeid
, ms
->m_remid
);
3858 _receive_cancel_reply(lkb
, ms
);
3862 static void receive_lookup_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3864 struct dlm_lkb
*lkb
;
3866 int error
, ret_nodeid
;
3868 error
= find_lkb(ls
, ms
->m_lkid
, &lkb
);
3870 log_error(ls
, "receive_lookup_reply no lkb");
3874 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3875 FIXME: will a non-zero error ever be returned? */
3877 r
= lkb
->lkb_resource
;
3881 error
= remove_from_waiters(lkb
, DLM_MSG_LOOKUP_REPLY
);
3885 ret_nodeid
= ms
->m_nodeid
;
3886 if (ret_nodeid
== dlm_our_nodeid()) {
3889 r
->res_first_lkid
= 0;
3891 /* set_master() will copy res_nodeid to lkb_nodeid */
3892 r
->res_nodeid
= ret_nodeid
;
3895 if (is_overlap(lkb
)) {
3896 log_debug(ls
, "receive_lookup_reply %x unlock %x",
3897 lkb
->lkb_id
, lkb
->lkb_flags
);
3898 queue_cast_overlap(r
, lkb
);
3899 unhold_lkb(lkb
); /* undoes create_lkb() */
3903 _request_lock(r
, lkb
);
3907 process_lookup_list(r
);
3914 static void _receive_message(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3916 if (!dlm_is_member(ls
, ms
->m_header
.h_nodeid
)) {
3917 log_debug(ls
, "ignore non-member message %d from %d %x %x %d",
3918 ms
->m_type
, ms
->m_header
.h_nodeid
, ms
->m_lkid
,
3919 ms
->m_remid
, ms
->m_result
);
3923 switch (ms
->m_type
) {
3925 /* messages sent to a master node */
3927 case DLM_MSG_REQUEST
:
3928 receive_request(ls
, ms
);
3931 case DLM_MSG_CONVERT
:
3932 receive_convert(ls
, ms
);
3935 case DLM_MSG_UNLOCK
:
3936 receive_unlock(ls
, ms
);
3939 case DLM_MSG_CANCEL
:
3940 receive_cancel(ls
, ms
);
3943 /* messages sent from a master node (replies to above) */
3945 case DLM_MSG_REQUEST_REPLY
:
3946 receive_request_reply(ls
, ms
);
3949 case DLM_MSG_CONVERT_REPLY
:
3950 receive_convert_reply(ls
, ms
);
3953 case DLM_MSG_UNLOCK_REPLY
:
3954 receive_unlock_reply(ls
, ms
);
3957 case DLM_MSG_CANCEL_REPLY
:
3958 receive_cancel_reply(ls
, ms
);
3961 /* messages sent from a master node (only two types of async msg) */
3964 receive_grant(ls
, ms
);
3968 receive_bast(ls
, ms
);
3971 /* messages sent to a dir node */
3973 case DLM_MSG_LOOKUP
:
3974 receive_lookup(ls
, ms
);
3977 case DLM_MSG_REMOVE
:
3978 receive_remove(ls
, ms
);
3981 /* messages sent from a dir node (remove has no reply) */
3983 case DLM_MSG_LOOKUP_REPLY
:
3984 receive_lookup_reply(ls
, ms
);
3987 /* other messages */
3990 receive_purge(ls
, ms
);
3994 log_error(ls
, "unknown message type %d", ms
->m_type
);
4000 /* If the lockspace is in recovery mode (locking stopped), then normal
4001 messages are saved on the requestqueue for processing after recovery is
4002 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4003 messages off the requestqueue before we process new ones. This occurs right
4004 after recovery completes when we transition from saving all messages on
4005 requestqueue, to processing all the saved messages, to processing new
4006 messages as they arrive. */
4008 static void dlm_receive_message(struct dlm_ls
*ls
, struct dlm_message
*ms
,
4011 if (dlm_locking_stopped(ls
)) {
4012 dlm_add_requestqueue(ls
, nodeid
, ms
);
4014 dlm_wait_requestqueue(ls
);
4015 _receive_message(ls
, ms
);
4019 /* This is called by dlm_recoverd to process messages that were saved on
4020 the requestqueue. */
4022 void dlm_receive_message_saved(struct dlm_ls
*ls
, struct dlm_message
*ms
)
4024 _receive_message(ls
, ms
);
4027 /* This is called by the midcomms layer when something is received for
4028 the lockspace. It could be either a MSG (normal message sent as part of
4029 standard locking activity) or an RCOM (recovery message sent as part of
4030 lockspace recovery). */
4032 void dlm_receive_buffer(union dlm_packet
*p
, int nodeid
)
4034 struct dlm_header
*hd
= &p
->header
;
4038 switch (hd
->h_cmd
) {
4040 dlm_message_in(&p
->message
);
4041 type
= p
->message
.m_type
;
4044 dlm_rcom_in(&p
->rcom
);
4045 type
= p
->rcom
.rc_type
;
4048 log_print("invalid h_cmd %d from %u", hd
->h_cmd
, nodeid
);
4052 if (hd
->h_nodeid
!= nodeid
) {
4053 log_print("invalid h_nodeid %d from %d lockspace %x",
4054 hd
->h_nodeid
, nodeid
, hd
->h_lockspace
);
4058 ls
= dlm_find_lockspace_global(hd
->h_lockspace
);
4060 if (dlm_config
.ci_log_debug
)
4061 log_print("invalid lockspace %x from %d cmd %d type %d",
4062 hd
->h_lockspace
, nodeid
, hd
->h_cmd
, type
);
4064 if (hd
->h_cmd
== DLM_RCOM
&& type
== DLM_RCOM_STATUS
)
4065 dlm_send_ls_not_ready(nodeid
, &p
->rcom
);
4069 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4070 be inactive (in this ls) before transitioning to recovery mode */
4072 down_read(&ls
->ls_recv_active
);
4073 if (hd
->h_cmd
== DLM_MSG
)
4074 dlm_receive_message(ls
, &p
->message
, nodeid
);
4076 dlm_receive_rcom(ls
, &p
->rcom
, nodeid
);
4077 up_read(&ls
->ls_recv_active
);
4079 dlm_put_lockspace(ls
);
4082 static void recover_convert_waiter(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
4083 struct dlm_message
*ms_stub
)
4085 if (middle_conversion(lkb
)) {
4087 memset(ms_stub
, 0, sizeof(struct dlm_message
));
4088 ms_stub
->m_flags
= DLM_IFL_STUB_MS
;
4089 ms_stub
->m_type
= DLM_MSG_CONVERT_REPLY
;
4090 ms_stub
->m_result
= -EINPROGRESS
;
4091 ms_stub
->m_header
.h_nodeid
= lkb
->lkb_nodeid
;
4092 _receive_convert_reply(lkb
, ms_stub
);
4094 /* Same special case as in receive_rcom_lock_args() */
4095 lkb
->lkb_grmode
= DLM_LOCK_IV
;
4096 rsb_set_flag(lkb
->lkb_resource
, RSB_RECOVER_CONVERT
);
4099 } else if (lkb
->lkb_rqmode
>= lkb
->lkb_grmode
) {
4100 lkb
->lkb_flags
|= DLM_IFL_RESEND
;
4103 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4104 conversions are async; there's no reply from the remote master */
4107 /* A waiting lkb needs recovery if the master node has failed, or
4108 the master node is changing (only when no directory is used) */
4110 static int waiter_needs_recovery(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
4112 if (dlm_is_removed(ls
, lkb
->lkb_nodeid
))
4115 if (!dlm_no_directory(ls
))
4118 if (dlm_dir_nodeid(lkb
->lkb_resource
) != lkb
->lkb_nodeid
)
4124 /* Recovery for locks that are waiting for replies from nodes that are now
4125 gone. We can just complete unlocks and cancels by faking a reply from the
4126 dead node. Requests and up-conversions we flag to be resent after
4127 recovery. Down-conversions can just be completed with a fake reply like
4128 unlocks. Conversions between PR and CW need special attention. */
4130 void dlm_recover_waiters_pre(struct dlm_ls
*ls
)
4132 struct dlm_lkb
*lkb
, *safe
;
4133 struct dlm_message
*ms_stub
;
4134 int wait_type
, stub_unlock_result
, stub_cancel_result
;
4136 ms_stub
= kmalloc(GFP_KERNEL
, sizeof(struct dlm_message
));
4138 log_error(ls
, "dlm_recover_waiters_pre no mem");
4142 mutex_lock(&ls
->ls_waiters_mutex
);
4144 list_for_each_entry_safe(lkb
, safe
, &ls
->ls_waiters
, lkb_wait_reply
) {
4146 /* exclude debug messages about unlocks because there can be so
4147 many and they aren't very interesting */
4149 if (lkb
->lkb_wait_type
!= DLM_MSG_UNLOCK
) {
4150 log_debug(ls
, "recover_waiter %x nodeid %d "
4151 "msg %d to %d", lkb
->lkb_id
, lkb
->lkb_nodeid
,
4152 lkb
->lkb_wait_type
, lkb
->lkb_wait_nodeid
);
4155 /* all outstanding lookups, regardless of destination will be
4156 resent after recovery is done */
4158 if (lkb
->lkb_wait_type
== DLM_MSG_LOOKUP
) {
4159 lkb
->lkb_flags
|= DLM_IFL_RESEND
;
4163 if (!waiter_needs_recovery(ls
, lkb
))
4166 wait_type
= lkb
->lkb_wait_type
;
4167 stub_unlock_result
= -DLM_EUNLOCK
;
4168 stub_cancel_result
= -DLM_ECANCEL
;
4170 /* Main reply may have been received leaving a zero wait_type,
4171 but a reply for the overlapping op may not have been
4172 received. In that case we need to fake the appropriate
4173 reply for the overlap op. */
4176 if (is_overlap_cancel(lkb
)) {
4177 wait_type
= DLM_MSG_CANCEL
;
4178 if (lkb
->lkb_grmode
== DLM_LOCK_IV
)
4179 stub_cancel_result
= 0;
4181 if (is_overlap_unlock(lkb
)) {
4182 wait_type
= DLM_MSG_UNLOCK
;
4183 if (lkb
->lkb_grmode
== DLM_LOCK_IV
)
4184 stub_unlock_result
= -ENOENT
;
4187 log_debug(ls
, "rwpre overlap %x %x %d %d %d",
4188 lkb
->lkb_id
, lkb
->lkb_flags
, wait_type
,
4189 stub_cancel_result
, stub_unlock_result
);
4192 switch (wait_type
) {
4194 case DLM_MSG_REQUEST
:
4195 lkb
->lkb_flags
|= DLM_IFL_RESEND
;
4198 case DLM_MSG_CONVERT
:
4199 recover_convert_waiter(ls
, lkb
, ms_stub
);
4202 case DLM_MSG_UNLOCK
:
4204 memset(ms_stub
, 0, sizeof(struct dlm_message
));
4205 ms_stub
->m_flags
= DLM_IFL_STUB_MS
;
4206 ms_stub
->m_type
= DLM_MSG_UNLOCK_REPLY
;
4207 ms_stub
->m_result
= stub_unlock_result
;
4208 ms_stub
->m_header
.h_nodeid
= lkb
->lkb_nodeid
;
4209 _receive_unlock_reply(lkb
, ms_stub
);
4213 case DLM_MSG_CANCEL
:
4215 memset(ms_stub
, 0, sizeof(struct dlm_message
));
4216 ms_stub
->m_flags
= DLM_IFL_STUB_MS
;
4217 ms_stub
->m_type
= DLM_MSG_CANCEL_REPLY
;
4218 ms_stub
->m_result
= stub_cancel_result
;
4219 ms_stub
->m_header
.h_nodeid
= lkb
->lkb_nodeid
;
4220 _receive_cancel_reply(lkb
, ms_stub
);
4225 log_error(ls
, "invalid lkb wait_type %d %d",
4226 lkb
->lkb_wait_type
, wait_type
);
4230 mutex_unlock(&ls
->ls_waiters_mutex
);
4234 static struct dlm_lkb
*find_resend_waiter(struct dlm_ls
*ls
)
4236 struct dlm_lkb
*lkb
;
4239 mutex_lock(&ls
->ls_waiters_mutex
);
4240 list_for_each_entry(lkb
, &ls
->ls_waiters
, lkb_wait_reply
) {
4241 if (lkb
->lkb_flags
& DLM_IFL_RESEND
) {
4247 mutex_unlock(&ls
->ls_waiters_mutex
);
4254 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4255 master or dir-node for r. Processing the lkb may result in it being placed
4258 /* We do this after normal locking has been enabled and any saved messages
4259 (in requestqueue) have been processed. We should be confident that at
4260 this point we won't get or process a reply to any of these waiting
4261 operations. But, new ops may be coming in on the rsbs/locks here from
4262 userspace or remotely. */
4264 /* there may have been an overlap unlock/cancel prior to recovery or after
4265 recovery. if before, the lkb may still have a pos wait_count; if after, the
4266 overlap flag would just have been set and nothing new sent. we can be
4267 confident here than any replies to either the initial op or overlap ops
4268 prior to recovery have been received. */
4270 int dlm_recover_waiters_post(struct dlm_ls
*ls
)
4272 struct dlm_lkb
*lkb
;
4274 int error
= 0, mstype
, err
, oc
, ou
;
4277 if (dlm_locking_stopped(ls
)) {
4278 log_debug(ls
, "recover_waiters_post aborted");
4283 lkb
= find_resend_waiter(ls
);
4287 r
= lkb
->lkb_resource
;
4291 mstype
= lkb
->lkb_wait_type
;
4292 oc
= is_overlap_cancel(lkb
);
4293 ou
= is_overlap_unlock(lkb
);
4296 log_debug(ls
, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4297 lkb
->lkb_id
, lkb
->lkb_nodeid
, mstype
, r
->res_nodeid
);
4299 /* At this point we assume that we won't get a reply to any
4300 previous op or overlap op on this lock. First, do a big
4301 remove_from_waiters() for all previous ops. */
4303 lkb
->lkb_flags
&= ~DLM_IFL_RESEND
;
4304 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
4305 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
4306 lkb
->lkb_wait_type
= 0;
4307 lkb
->lkb_wait_count
= 0;
4308 mutex_lock(&ls
->ls_waiters_mutex
);
4309 list_del_init(&lkb
->lkb_wait_reply
);
4310 mutex_unlock(&ls
->ls_waiters_mutex
);
4311 unhold_lkb(lkb
); /* for waiters list */
4314 /* do an unlock or cancel instead of resending */
4316 case DLM_MSG_LOOKUP
:
4317 case DLM_MSG_REQUEST
:
4318 queue_cast(r
, lkb
, ou
? -DLM_EUNLOCK
:
4320 unhold_lkb(lkb
); /* undoes create_lkb() */
4322 case DLM_MSG_CONVERT
:
4324 queue_cast(r
, lkb
, -DLM_ECANCEL
);
4326 lkb
->lkb_exflags
|= DLM_LKF_FORCEUNLOCK
;
4327 _unlock_lock(r
, lkb
);
4335 case DLM_MSG_LOOKUP
:
4336 case DLM_MSG_REQUEST
:
4337 _request_lock(r
, lkb
);
4339 confirm_master(r
, 0);
4341 case DLM_MSG_CONVERT
:
4342 _convert_lock(r
, lkb
);
4350 log_error(ls
, "recover_waiters_post %x %d %x %d %d",
4351 lkb
->lkb_id
, mstype
, lkb
->lkb_flags
, oc
, ou
);
4360 static void purge_queue(struct dlm_rsb
*r
, struct list_head
*queue
,
4361 int (*test
)(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
))
4363 struct dlm_ls
*ls
= r
->res_ls
;
4364 struct dlm_lkb
*lkb
, *safe
;
4366 list_for_each_entry_safe(lkb
, safe
, queue
, lkb_statequeue
) {
4367 if (test(ls
, lkb
)) {
4368 rsb_set_flag(r
, RSB_LOCKS_PURGED
);
4370 /* this put should free the lkb */
4371 if (!dlm_put_lkb(lkb
))
4372 log_error(ls
, "purged lkb not released");
4377 static int purge_dead_test(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
4379 return (is_master_copy(lkb
) && dlm_is_removed(ls
, lkb
->lkb_nodeid
));
4382 static int purge_mstcpy_test(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
4384 return is_master_copy(lkb
);
4387 static void purge_dead_locks(struct dlm_rsb
*r
)
4389 purge_queue(r
, &r
->res_grantqueue
, &purge_dead_test
);
4390 purge_queue(r
, &r
->res_convertqueue
, &purge_dead_test
);
4391 purge_queue(r
, &r
->res_waitqueue
, &purge_dead_test
);
4394 void dlm_purge_mstcpy_locks(struct dlm_rsb
*r
)
4396 purge_queue(r
, &r
->res_grantqueue
, &purge_mstcpy_test
);
4397 purge_queue(r
, &r
->res_convertqueue
, &purge_mstcpy_test
);
4398 purge_queue(r
, &r
->res_waitqueue
, &purge_mstcpy_test
);
4401 /* Get rid of locks held by nodes that are gone. */
4403 int dlm_purge_locks(struct dlm_ls
*ls
)
4407 log_debug(ls
, "dlm_purge_locks");
4409 down_write(&ls
->ls_root_sem
);
4410 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
4414 purge_dead_locks(r
);
4420 up_write(&ls
->ls_root_sem
);
4425 static struct dlm_rsb
*find_purged_rsb(struct dlm_ls
*ls
, int bucket
)
4427 struct dlm_rsb
*r
, *r_ret
= NULL
;
4429 spin_lock(&ls
->ls_rsbtbl
[bucket
].lock
);
4430 list_for_each_entry(r
, &ls
->ls_rsbtbl
[bucket
].list
, res_hashchain
) {
4431 if (!rsb_flag(r
, RSB_LOCKS_PURGED
))
4434 rsb_clear_flag(r
, RSB_LOCKS_PURGED
);
4438 spin_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
4442 void dlm_grant_after_purge(struct dlm_ls
*ls
)
4448 r
= find_purged_rsb(ls
, bucket
);
4450 if (bucket
== ls
->ls_rsbtbl_size
- 1)
4457 grant_pending_locks(r
);
4458 confirm_master(r
, 0);
4466 static struct dlm_lkb
*search_remid_list(struct list_head
*head
, int nodeid
,
4469 struct dlm_lkb
*lkb
;
4471 list_for_each_entry(lkb
, head
, lkb_statequeue
) {
4472 if (lkb
->lkb_nodeid
== nodeid
&& lkb
->lkb_remid
== remid
)
4478 static struct dlm_lkb
*search_remid(struct dlm_rsb
*r
, int nodeid
,
4481 struct dlm_lkb
*lkb
;
4483 lkb
= search_remid_list(&r
->res_grantqueue
, nodeid
, remid
);
4486 lkb
= search_remid_list(&r
->res_convertqueue
, nodeid
, remid
);
4489 lkb
= search_remid_list(&r
->res_waitqueue
, nodeid
, remid
);
4495 /* needs at least dlm_rcom + rcom_lock */
4496 static int receive_rcom_lock_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
4497 struct dlm_rsb
*r
, struct dlm_rcom
*rc
)
4499 struct rcom_lock
*rl
= (struct rcom_lock
*) rc
->rc_buf
;
4501 lkb
->lkb_nodeid
= rc
->rc_header
.h_nodeid
;
4502 lkb
->lkb_ownpid
= le32_to_cpu(rl
->rl_ownpid
);
4503 lkb
->lkb_remid
= le32_to_cpu(rl
->rl_lkid
);
4504 lkb
->lkb_exflags
= le32_to_cpu(rl
->rl_exflags
);
4505 lkb
->lkb_flags
= le32_to_cpu(rl
->rl_flags
) & 0x0000FFFF;
4506 lkb
->lkb_flags
|= DLM_IFL_MSTCPY
;
4507 lkb
->lkb_lvbseq
= le32_to_cpu(rl
->rl_lvbseq
);
4508 lkb
->lkb_rqmode
= rl
->rl_rqmode
;
4509 lkb
->lkb_grmode
= rl
->rl_grmode
;
4510 /* don't set lkb_status because add_lkb wants to itself */
4512 lkb
->lkb_bastfn
= (rl
->rl_asts
& DLM_CB_BAST
) ? &fake_bastfn
: NULL
;
4513 lkb
->lkb_astfn
= (rl
->rl_asts
& DLM_CB_CAST
) ? &fake_astfn
: NULL
;
4515 if (lkb
->lkb_exflags
& DLM_LKF_VALBLK
) {
4516 int lvblen
= rc
->rc_header
.h_length
- sizeof(struct dlm_rcom
) -
4517 sizeof(struct rcom_lock
);
4518 if (lvblen
> ls
->ls_lvblen
)
4520 lkb
->lkb_lvbptr
= dlm_allocate_lvb(ls
);
4521 if (!lkb
->lkb_lvbptr
)
4523 memcpy(lkb
->lkb_lvbptr
, rl
->rl_lvb
, lvblen
);
4526 /* Conversions between PR and CW (middle modes) need special handling.
4527 The real granted mode of these converting locks cannot be determined
4528 until all locks have been rebuilt on the rsb (recover_conversion) */
4530 if (rl
->rl_wait_type
== cpu_to_le16(DLM_MSG_CONVERT
) &&
4531 middle_conversion(lkb
)) {
4532 rl
->rl_status
= DLM_LKSTS_CONVERT
;
4533 lkb
->lkb_grmode
= DLM_LOCK_IV
;
4534 rsb_set_flag(r
, RSB_RECOVER_CONVERT
);
4540 /* This lkb may have been recovered in a previous aborted recovery so we need
4541 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4542 If so we just send back a standard reply. If not, we create a new lkb with
4543 the given values and send back our lkid. We send back our lkid by sending
4544 back the rcom_lock struct we got but with the remid field filled in. */
4546 /* needs at least dlm_rcom + rcom_lock */
4547 int dlm_recover_master_copy(struct dlm_ls
*ls
, struct dlm_rcom
*rc
)
4549 struct rcom_lock
*rl
= (struct rcom_lock
*) rc
->rc_buf
;
4551 struct dlm_lkb
*lkb
;
4554 if (rl
->rl_parent_lkid
) {
4555 error
= -EOPNOTSUPP
;
4559 error
= find_rsb(ls
, rl
->rl_name
, le16_to_cpu(rl
->rl_namelen
),
4566 lkb
= search_remid(r
, rc
->rc_header
.h_nodeid
, le32_to_cpu(rl
->rl_lkid
));
4572 error
= create_lkb(ls
, &lkb
);
4576 error
= receive_rcom_lock_args(ls
, lkb
, r
, rc
);
4583 add_lkb(r
, lkb
, rl
->rl_status
);
4587 /* this is the new value returned to the lock holder for
4588 saving in its process-copy lkb */
4589 rl
->rl_remid
= cpu_to_le32(lkb
->lkb_id
);
4596 log_debug(ls
, "recover_master_copy %d %x", error
,
4597 le32_to_cpu(rl
->rl_lkid
));
4598 rl
->rl_result
= cpu_to_le32(error
);
4602 /* needs at least dlm_rcom + rcom_lock */
4603 int dlm_recover_process_copy(struct dlm_ls
*ls
, struct dlm_rcom
*rc
)
4605 struct rcom_lock
*rl
= (struct rcom_lock
*) rc
->rc_buf
;
4607 struct dlm_lkb
*lkb
;
4610 error
= find_lkb(ls
, le32_to_cpu(rl
->rl_lkid
), &lkb
);
4612 log_error(ls
, "recover_process_copy no lkid %x",
4613 le32_to_cpu(rl
->rl_lkid
));
4617 DLM_ASSERT(is_process_copy(lkb
), dlm_print_lkb(lkb
););
4619 error
= le32_to_cpu(rl
->rl_result
);
4621 r
= lkb
->lkb_resource
;
4627 /* There's a chance the new master received our lock before
4628 dlm_recover_master_reply(), this wouldn't happen if we did
4629 a barrier between recover_masters and recover_locks. */
4630 log_debug(ls
, "master copy not ready %x r %lx %s", lkb
->lkb_id
,
4631 (unsigned long)r
, r
->res_name
);
4632 dlm_send_rcom_lock(r
, lkb
);
4635 log_debug(ls
, "master copy exists %x", lkb
->lkb_id
);
4638 lkb
->lkb_remid
= le32_to_cpu(rl
->rl_remid
);
4641 log_error(ls
, "dlm_recover_process_copy unknown error %d %x",
4642 error
, lkb
->lkb_id
);
4645 /* an ack for dlm_recover_locks() which waits for replies from
4646 all the locks it sends to new masters */
4647 dlm_recovered_lock(r
);
4656 int dlm_user_request(struct dlm_ls
*ls
, struct dlm_user_args
*ua
,
4657 int mode
, uint32_t flags
, void *name
, unsigned int namelen
,
4658 unsigned long timeout_cs
)
4660 struct dlm_lkb
*lkb
;
4661 struct dlm_args args
;
4664 dlm_lock_recovery(ls
);
4666 error
= create_lkb(ls
, &lkb
);
4672 if (flags
& DLM_LKF_VALBLK
) {
4673 ua
->lksb
.sb_lvbptr
= kzalloc(DLM_USER_LVB_LEN
, GFP_NOFS
);
4674 if (!ua
->lksb
.sb_lvbptr
) {
4682 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4683 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4684 lock and that lkb_astparam is the dlm_user_args structure. */
4686 error
= set_lock_args(mode
, &ua
->lksb
, flags
, namelen
, timeout_cs
,
4687 fake_astfn
, ua
, fake_bastfn
, &args
);
4688 lkb
->lkb_flags
|= DLM_IFL_USER
;
4695 error
= request_lock(ls
, lkb
, name
, namelen
, &args
);
4711 /* add this new lkb to the per-process list of locks */
4712 spin_lock(&ua
->proc
->locks_spin
);
4714 list_add_tail(&lkb
->lkb_ownqueue
, &ua
->proc
->locks
);
4715 spin_unlock(&ua
->proc
->locks_spin
);
4717 dlm_unlock_recovery(ls
);
4721 int dlm_user_convert(struct dlm_ls
*ls
, struct dlm_user_args
*ua_tmp
,
4722 int mode
, uint32_t flags
, uint32_t lkid
, char *lvb_in
,
4723 unsigned long timeout_cs
)
4725 struct dlm_lkb
*lkb
;
4726 struct dlm_args args
;
4727 struct dlm_user_args
*ua
;
4730 dlm_lock_recovery(ls
);
4732 error
= find_lkb(ls
, lkid
, &lkb
);
4736 /* user can change the params on its lock when it converts it, or
4737 add an lvb that didn't exist before */
4741 if (flags
& DLM_LKF_VALBLK
&& !ua
->lksb
.sb_lvbptr
) {
4742 ua
->lksb
.sb_lvbptr
= kzalloc(DLM_USER_LVB_LEN
, GFP_NOFS
);
4743 if (!ua
->lksb
.sb_lvbptr
) {
4748 if (lvb_in
&& ua
->lksb
.sb_lvbptr
)
4749 memcpy(ua
->lksb
.sb_lvbptr
, lvb_in
, DLM_USER_LVB_LEN
);
4751 ua
->xid
= ua_tmp
->xid
;
4752 ua
->castparam
= ua_tmp
->castparam
;
4753 ua
->castaddr
= ua_tmp
->castaddr
;
4754 ua
->bastparam
= ua_tmp
->bastparam
;
4755 ua
->bastaddr
= ua_tmp
->bastaddr
;
4756 ua
->user_lksb
= ua_tmp
->user_lksb
;
4758 error
= set_lock_args(mode
, &ua
->lksb
, flags
, 0, timeout_cs
,
4759 fake_astfn
, ua
, fake_bastfn
, &args
);
4763 error
= convert_lock(ls
, lkb
, &args
);
4765 if (error
== -EINPROGRESS
|| error
== -EAGAIN
|| error
== -EDEADLK
)
4770 dlm_unlock_recovery(ls
);
4775 int dlm_user_unlock(struct dlm_ls
*ls
, struct dlm_user_args
*ua_tmp
,
4776 uint32_t flags
, uint32_t lkid
, char *lvb_in
)
4778 struct dlm_lkb
*lkb
;
4779 struct dlm_args args
;
4780 struct dlm_user_args
*ua
;
4783 dlm_lock_recovery(ls
);
4785 error
= find_lkb(ls
, lkid
, &lkb
);
4791 if (lvb_in
&& ua
->lksb
.sb_lvbptr
)
4792 memcpy(ua
->lksb
.sb_lvbptr
, lvb_in
, DLM_USER_LVB_LEN
);
4793 if (ua_tmp
->castparam
)
4794 ua
->castparam
= ua_tmp
->castparam
;
4795 ua
->user_lksb
= ua_tmp
->user_lksb
;
4797 error
= set_unlock_args(flags
, ua
, &args
);
4801 error
= unlock_lock(ls
, lkb
, &args
);
4803 if (error
== -DLM_EUNLOCK
)
4805 /* from validate_unlock_args() */
4806 if (error
== -EBUSY
&& (flags
& DLM_LKF_FORCEUNLOCK
))
4811 spin_lock(&ua
->proc
->locks_spin
);
4812 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4813 if (!list_empty(&lkb
->lkb_ownqueue
))
4814 list_move(&lkb
->lkb_ownqueue
, &ua
->proc
->unlocking
);
4815 spin_unlock(&ua
->proc
->locks_spin
);
4819 dlm_unlock_recovery(ls
);
4824 int dlm_user_cancel(struct dlm_ls
*ls
, struct dlm_user_args
*ua_tmp
,
4825 uint32_t flags
, uint32_t lkid
)
4827 struct dlm_lkb
*lkb
;
4828 struct dlm_args args
;
4829 struct dlm_user_args
*ua
;
4832 dlm_lock_recovery(ls
);
4834 error
= find_lkb(ls
, lkid
, &lkb
);
4839 if (ua_tmp
->castparam
)
4840 ua
->castparam
= ua_tmp
->castparam
;
4841 ua
->user_lksb
= ua_tmp
->user_lksb
;
4843 error
= set_unlock_args(flags
, ua
, &args
);
4847 error
= cancel_lock(ls
, lkb
, &args
);
4849 if (error
== -DLM_ECANCEL
)
4851 /* from validate_unlock_args() */
4852 if (error
== -EBUSY
)
4857 dlm_unlock_recovery(ls
);
4862 int dlm_user_deadlock(struct dlm_ls
*ls
, uint32_t flags
, uint32_t lkid
)
4864 struct dlm_lkb
*lkb
;
4865 struct dlm_args args
;
4866 struct dlm_user_args
*ua
;
4870 dlm_lock_recovery(ls
);
4872 error
= find_lkb(ls
, lkid
, &lkb
);
4878 error
= set_unlock_args(flags
, ua
, &args
);
4882 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4884 r
= lkb
->lkb_resource
;
4888 error
= validate_unlock_args(lkb
, &args
);
4891 lkb
->lkb_flags
|= DLM_IFL_DEADLOCK_CANCEL
;
4893 error
= _cancel_lock(r
, lkb
);
4898 if (error
== -DLM_ECANCEL
)
4900 /* from validate_unlock_args() */
4901 if (error
== -EBUSY
)
4906 dlm_unlock_recovery(ls
);
4910 /* lkb's that are removed from the waiters list by revert are just left on the
4911 orphans list with the granted orphan locks, to be freed by purge */
4913 static int orphan_proc_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
4915 struct dlm_args args
;
4919 mutex_lock(&ls
->ls_orphans_mutex
);
4920 list_add_tail(&lkb
->lkb_ownqueue
, &ls
->ls_orphans
);
4921 mutex_unlock(&ls
->ls_orphans_mutex
);
4923 set_unlock_args(0, lkb
->lkb_ua
, &args
);
4925 error
= cancel_lock(ls
, lkb
, &args
);
4926 if (error
== -DLM_ECANCEL
)
4931 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4932 Regardless of what rsb queue the lock is on, it's removed and freed. */
4934 static int unlock_proc_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
4936 struct dlm_args args
;
4939 set_unlock_args(DLM_LKF_FORCEUNLOCK
, lkb
->lkb_ua
, &args
);
4941 error
= unlock_lock(ls
, lkb
, &args
);
4942 if (error
== -DLM_EUNLOCK
)
4947 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4948 (which does lock_rsb) due to deadlock with receiving a message that does
4949 lock_rsb followed by dlm_user_add_ast() */
4951 static struct dlm_lkb
*del_proc_lock(struct dlm_ls
*ls
,
4952 struct dlm_user_proc
*proc
)
4954 struct dlm_lkb
*lkb
= NULL
;
4956 mutex_lock(&ls
->ls_clear_proc_locks
);
4957 if (list_empty(&proc
->locks
))
4960 lkb
= list_entry(proc
->locks
.next
, struct dlm_lkb
, lkb_ownqueue
);
4961 list_del_init(&lkb
->lkb_ownqueue
);
4963 if (lkb
->lkb_exflags
& DLM_LKF_PERSISTENT
)
4964 lkb
->lkb_flags
|= DLM_IFL_ORPHAN
;
4966 lkb
->lkb_flags
|= DLM_IFL_DEAD
;
4968 mutex_unlock(&ls
->ls_clear_proc_locks
);
4972 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4973 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4974 which we clear here. */
4976 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4977 list, and no more device_writes should add lkb's to proc->locks list; so we
4978 shouldn't need to take asts_spin or locks_spin here. this assumes that
4979 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4982 void dlm_clear_proc_locks(struct dlm_ls
*ls
, struct dlm_user_proc
*proc
)
4984 struct dlm_lkb
*lkb
, *safe
;
4986 dlm_lock_recovery(ls
);
4989 lkb
= del_proc_lock(ls
, proc
);
4993 if (lkb
->lkb_exflags
& DLM_LKF_PERSISTENT
)
4994 orphan_proc_lock(ls
, lkb
);
4996 unlock_proc_lock(ls
, lkb
);
4998 /* this removes the reference for the proc->locks list
4999 added by dlm_user_request, it may result in the lkb
5005 mutex_lock(&ls
->ls_clear_proc_locks
);
5007 /* in-progress unlocks */
5008 list_for_each_entry_safe(lkb
, safe
, &proc
->unlocking
, lkb_ownqueue
) {
5009 list_del_init(&lkb
->lkb_ownqueue
);
5010 lkb
->lkb_flags
|= DLM_IFL_DEAD
;
5014 list_for_each_entry_safe(lkb
, safe
, &proc
->asts
, lkb_astqueue
) {
5015 memset(&lkb
->lkb_callbacks
, 0,
5016 sizeof(struct dlm_callback
) * DLM_CALLBACKS_SIZE
);
5017 list_del_init(&lkb
->lkb_astqueue
);
5021 mutex_unlock(&ls
->ls_clear_proc_locks
);
5022 dlm_unlock_recovery(ls
);
5025 static void purge_proc_locks(struct dlm_ls
*ls
, struct dlm_user_proc
*proc
)
5027 struct dlm_lkb
*lkb
, *safe
;
5031 spin_lock(&proc
->locks_spin
);
5032 if (!list_empty(&proc
->locks
)) {
5033 lkb
= list_entry(proc
->locks
.next
, struct dlm_lkb
,
5035 list_del_init(&lkb
->lkb_ownqueue
);
5037 spin_unlock(&proc
->locks_spin
);
5042 lkb
->lkb_flags
|= DLM_IFL_DEAD
;
5043 unlock_proc_lock(ls
, lkb
);
5044 dlm_put_lkb(lkb
); /* ref from proc->locks list */
5047 spin_lock(&proc
->locks_spin
);
5048 list_for_each_entry_safe(lkb
, safe
, &proc
->unlocking
, lkb_ownqueue
) {
5049 list_del_init(&lkb
->lkb_ownqueue
);
5050 lkb
->lkb_flags
|= DLM_IFL_DEAD
;
5053 spin_unlock(&proc
->locks_spin
);
5055 spin_lock(&proc
->asts_spin
);
5056 list_for_each_entry_safe(lkb
, safe
, &proc
->asts
, lkb_astqueue
) {
5057 memset(&lkb
->lkb_callbacks
, 0,
5058 sizeof(struct dlm_callback
) * DLM_CALLBACKS_SIZE
);
5059 list_del_init(&lkb
->lkb_astqueue
);
5062 spin_unlock(&proc
->asts_spin
);
5065 /* pid of 0 means purge all orphans */
5067 static void do_purge(struct dlm_ls
*ls
, int nodeid
, int pid
)
5069 struct dlm_lkb
*lkb
, *safe
;
5071 mutex_lock(&ls
->ls_orphans_mutex
);
5072 list_for_each_entry_safe(lkb
, safe
, &ls
->ls_orphans
, lkb_ownqueue
) {
5073 if (pid
&& lkb
->lkb_ownpid
!= pid
)
5075 unlock_proc_lock(ls
, lkb
);
5076 list_del_init(&lkb
->lkb_ownqueue
);
5079 mutex_unlock(&ls
->ls_orphans_mutex
);
5082 static int send_purge(struct dlm_ls
*ls
, int nodeid
, int pid
)
5084 struct dlm_message
*ms
;
5085 struct dlm_mhandle
*mh
;
5088 error
= _create_message(ls
, sizeof(struct dlm_message
), nodeid
,
5089 DLM_MSG_PURGE
, &ms
, &mh
);
5092 ms
->m_nodeid
= nodeid
;
5095 return send_message(mh
, ms
);
5098 int dlm_user_purge(struct dlm_ls
*ls
, struct dlm_user_proc
*proc
,
5099 int nodeid
, int pid
)
5103 if (nodeid
!= dlm_our_nodeid()) {
5104 error
= send_purge(ls
, nodeid
, pid
);
5106 dlm_lock_recovery(ls
);
5107 if (pid
== current
->pid
)
5108 purge_proc_locks(ls
, proc
);
5110 do_purge(ls
, nodeid
, pid
);
5111 dlm_unlock_recovery(ls
);