1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
63 #include "requestqueue.h"
67 #include "lockspace.h"
72 #include "lvb_table.h"
76 static int send_request(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
77 static int send_convert(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
78 static int send_unlock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
79 static int send_cancel(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
80 static int send_grant(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
81 static int send_bast(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int mode
);
82 static int send_lookup(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
83 static int send_remove(struct dlm_rsb
*r
);
84 static int _request_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
);
85 static void __receive_convert_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
86 struct dlm_message
*ms
);
87 static int receive_extralen(struct dlm_message
*ms
);
88 static void do_purge(struct dlm_ls
*ls
, int nodeid
, int pid
);
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
98 static const int __dlm_compat_matrix
[8][8] = {
99 /* UN NL CR CW PR PW EX PD */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
119 const int dlm_lvb_operations
[8][8] = {
120 /* UN NL CR CW PR PW EX PD*/
121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
131 #define modes_compat(gr, rq) \
132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
134 int dlm_modes_compat(int mode1
, int mode2
)
136 return __dlm_compat_matrix
[mode1
+ 1][mode2
+ 1];
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
145 static const int __quecvt_compat_matrix
[8][8] = {
146 /* UN NL CR CW PR PW EX PD */
147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
157 void dlm_print_lkb(struct dlm_lkb
*lkb
)
159 printk(KERN_ERR
"lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 lkb
->lkb_nodeid
, lkb
->lkb_id
, lkb
->lkb_remid
, lkb
->lkb_exflags
,
162 lkb
->lkb_flags
, lkb
->lkb_status
, lkb
->lkb_rqmode
,
163 lkb
->lkb_grmode
, lkb
->lkb_wait_type
, lkb
->lkb_ast_type
);
166 void dlm_print_rsb(struct dlm_rsb
*r
)
168 printk(KERN_ERR
"rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 r
->res_nodeid
, r
->res_flags
, r
->res_first_lkid
,
170 r
->res_recover_locks_count
, r
->res_name
);
173 void dlm_dump_rsb(struct dlm_rsb
*r
)
179 printk(KERN_ERR
"rsb: root_list empty %d recover_list empty %d\n",
180 list_empty(&r
->res_root_list
), list_empty(&r
->res_recover_list
));
181 printk(KERN_ERR
"rsb lookup list\n");
182 list_for_each_entry(lkb
, &r
->res_lookup
, lkb_rsb_lookup
)
184 printk(KERN_ERR
"rsb grant queue:\n");
185 list_for_each_entry(lkb
, &r
->res_grantqueue
, lkb_statequeue
)
187 printk(KERN_ERR
"rsb convert queue:\n");
188 list_for_each_entry(lkb
, &r
->res_convertqueue
, lkb_statequeue
)
190 printk(KERN_ERR
"rsb wait queue:\n");
191 list_for_each_entry(lkb
, &r
->res_waitqueue
, lkb_statequeue
)
195 /* Threads cannot use the lockspace while it's being recovered */
197 static inline void lock_recovery(struct dlm_ls
*ls
)
199 down_read(&ls
->ls_in_recovery
);
202 static inline void unlock_recovery(struct dlm_ls
*ls
)
204 up_read(&ls
->ls_in_recovery
);
207 static inline int lock_recovery_try(struct dlm_ls
*ls
)
209 return down_read_trylock(&ls
->ls_in_recovery
);
212 static inline int can_be_queued(struct dlm_lkb
*lkb
)
214 return !(lkb
->lkb_exflags
& DLM_LKF_NOQUEUE
);
217 static inline int force_blocking_asts(struct dlm_lkb
*lkb
)
219 return (lkb
->lkb_exflags
& DLM_LKF_NOQUEUEBAST
);
222 static inline int is_demoted(struct dlm_lkb
*lkb
)
224 return (lkb
->lkb_sbflags
& DLM_SBF_DEMOTED
);
227 static inline int is_altmode(struct dlm_lkb
*lkb
)
229 return (lkb
->lkb_sbflags
& DLM_SBF_ALTMODE
);
232 static inline int is_granted(struct dlm_lkb
*lkb
)
234 return (lkb
->lkb_status
== DLM_LKSTS_GRANTED
);
237 static inline int is_remote(struct dlm_rsb
*r
)
239 DLM_ASSERT(r
->res_nodeid
>= 0, dlm_print_rsb(r
););
240 return !!r
->res_nodeid
;
243 static inline int is_process_copy(struct dlm_lkb
*lkb
)
245 return (lkb
->lkb_nodeid
&& !(lkb
->lkb_flags
& DLM_IFL_MSTCPY
));
248 static inline int is_master_copy(struct dlm_lkb
*lkb
)
250 if (lkb
->lkb_flags
& DLM_IFL_MSTCPY
)
251 DLM_ASSERT(lkb
->lkb_nodeid
, dlm_print_lkb(lkb
););
252 return (lkb
->lkb_flags
& DLM_IFL_MSTCPY
) ? 1 : 0;
255 static inline int middle_conversion(struct dlm_lkb
*lkb
)
257 if ((lkb
->lkb_grmode
==DLM_LOCK_PR
&& lkb
->lkb_rqmode
==DLM_LOCK_CW
) ||
258 (lkb
->lkb_rqmode
==DLM_LOCK_PR
&& lkb
->lkb_grmode
==DLM_LOCK_CW
))
263 static inline int down_conversion(struct dlm_lkb
*lkb
)
265 return (!middle_conversion(lkb
) && lkb
->lkb_rqmode
< lkb
->lkb_grmode
);
268 static inline int is_overlap_unlock(struct dlm_lkb
*lkb
)
270 return lkb
->lkb_flags
& DLM_IFL_OVERLAP_UNLOCK
;
273 static inline int is_overlap_cancel(struct dlm_lkb
*lkb
)
275 return lkb
->lkb_flags
& DLM_IFL_OVERLAP_CANCEL
;
278 static inline int is_overlap(struct dlm_lkb
*lkb
)
280 return (lkb
->lkb_flags
& (DLM_IFL_OVERLAP_UNLOCK
|
281 DLM_IFL_OVERLAP_CANCEL
));
284 static void queue_cast(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
286 if (is_master_copy(lkb
))
289 DLM_ASSERT(lkb
->lkb_lksb
, dlm_print_lkb(lkb
););
291 lkb
->lkb_lksb
->sb_status
= rv
;
292 lkb
->lkb_lksb
->sb_flags
= lkb
->lkb_sbflags
;
294 dlm_add_ast(lkb
, AST_COMP
);
297 static inline void queue_cast_overlap(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
300 is_overlap_unlock(lkb
) ? -DLM_EUNLOCK
: -DLM_ECANCEL
);
303 static void queue_bast(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rqmode
)
305 if (is_master_copy(lkb
))
306 send_bast(r
, lkb
, rqmode
);
308 lkb
->lkb_bastmode
= rqmode
;
309 dlm_add_ast(lkb
, AST_BAST
);
314 * Basic operations on rsb's and lkb's
317 static struct dlm_rsb
*create_rsb(struct dlm_ls
*ls
, char *name
, int len
)
321 r
= allocate_rsb(ls
, len
);
327 memcpy(r
->res_name
, name
, len
);
328 mutex_init(&r
->res_mutex
);
330 INIT_LIST_HEAD(&r
->res_lookup
);
331 INIT_LIST_HEAD(&r
->res_grantqueue
);
332 INIT_LIST_HEAD(&r
->res_convertqueue
);
333 INIT_LIST_HEAD(&r
->res_waitqueue
);
334 INIT_LIST_HEAD(&r
->res_root_list
);
335 INIT_LIST_HEAD(&r
->res_recover_list
);
340 static int search_rsb_list(struct list_head
*head
, char *name
, int len
,
341 unsigned int flags
, struct dlm_rsb
**r_ret
)
346 list_for_each_entry(r
, head
, res_hashchain
) {
347 if (len
== r
->res_length
&& !memcmp(name
, r
->res_name
, len
))
353 if (r
->res_nodeid
&& (flags
& R_MASTER
))
359 static int _search_rsb(struct dlm_ls
*ls
, char *name
, int len
, int b
,
360 unsigned int flags
, struct dlm_rsb
**r_ret
)
365 error
= search_rsb_list(&ls
->ls_rsbtbl
[b
].list
, name
, len
, flags
, &r
);
367 kref_get(&r
->res_ref
);
370 error
= search_rsb_list(&ls
->ls_rsbtbl
[b
].toss
, name
, len
, flags
, &r
);
374 list_move(&r
->res_hashchain
, &ls
->ls_rsbtbl
[b
].list
);
376 if (dlm_no_directory(ls
))
379 if (r
->res_nodeid
== -1) {
380 rsb_clear_flag(r
, RSB_MASTER_UNCERTAIN
);
381 r
->res_first_lkid
= 0;
382 } else if (r
->res_nodeid
> 0) {
383 rsb_set_flag(r
, RSB_MASTER_UNCERTAIN
);
384 r
->res_first_lkid
= 0;
386 DLM_ASSERT(r
->res_nodeid
== 0, dlm_print_rsb(r
););
387 DLM_ASSERT(!rsb_flag(r
, RSB_MASTER_UNCERTAIN
),);
394 static int search_rsb(struct dlm_ls
*ls
, char *name
, int len
, int b
,
395 unsigned int flags
, struct dlm_rsb
**r_ret
)
398 write_lock(&ls
->ls_rsbtbl
[b
].lock
);
399 error
= _search_rsb(ls
, name
, len
, b
, flags
, r_ret
);
400 write_unlock(&ls
->ls_rsbtbl
[b
].lock
);
405 * Find rsb in rsbtbl and potentially create/add one
407 * Delaying the release of rsb's has a similar benefit to applications keeping
408 * NL locks on an rsb, but without the guarantee that the cached master value
409 * will still be valid when the rsb is reused. Apps aren't always smart enough
410 * to keep NL locks on an rsb that they may lock again shortly; this can lead
411 * to excessive master lookups and removals if we don't delay the release.
413 * Searching for an rsb means looking through both the normal list and toss
414 * list. When found on the toss list the rsb is moved to the normal list with
415 * ref count of 1; when found on normal list the ref count is incremented.
418 static int find_rsb(struct dlm_ls
*ls
, char *name
, int namelen
,
419 unsigned int flags
, struct dlm_rsb
**r_ret
)
421 struct dlm_rsb
*r
, *tmp
;
422 uint32_t hash
, bucket
;
425 if (dlm_no_directory(ls
))
428 hash
= jhash(name
, namelen
, 0);
429 bucket
= hash
& (ls
->ls_rsbtbl_size
- 1);
431 error
= search_rsb(ls
, name
, namelen
, bucket
, flags
, &r
);
435 if (error
== -EBADR
&& !(flags
& R_CREATE
))
438 /* the rsb was found but wasn't a master copy */
439 if (error
== -ENOTBLK
)
443 r
= create_rsb(ls
, name
, namelen
);
448 r
->res_bucket
= bucket
;
450 kref_init(&r
->res_ref
);
452 /* With no directory, the master can be set immediately */
453 if (dlm_no_directory(ls
)) {
454 int nodeid
= dlm_dir_nodeid(r
);
455 if (nodeid
== dlm_our_nodeid())
457 r
->res_nodeid
= nodeid
;
460 write_lock(&ls
->ls_rsbtbl
[bucket
].lock
);
461 error
= _search_rsb(ls
, name
, namelen
, bucket
, 0, &tmp
);
463 write_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
468 list_add(&r
->res_hashchain
, &ls
->ls_rsbtbl
[bucket
].list
);
469 write_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
476 int dlm_find_rsb(struct dlm_ls
*ls
, char *name
, int namelen
,
477 unsigned int flags
, struct dlm_rsb
**r_ret
)
479 return find_rsb(ls
, name
, namelen
, flags
, r_ret
);
482 /* This is only called to add a reference when the code already holds
483 a valid reference to the rsb, so there's no need for locking. */
485 static inline void hold_rsb(struct dlm_rsb
*r
)
487 kref_get(&r
->res_ref
);
490 void dlm_hold_rsb(struct dlm_rsb
*r
)
495 static void toss_rsb(struct kref
*kref
)
497 struct dlm_rsb
*r
= container_of(kref
, struct dlm_rsb
, res_ref
);
498 struct dlm_ls
*ls
= r
->res_ls
;
500 DLM_ASSERT(list_empty(&r
->res_root_list
), dlm_print_rsb(r
););
501 kref_init(&r
->res_ref
);
502 list_move(&r
->res_hashchain
, &ls
->ls_rsbtbl
[r
->res_bucket
].toss
);
503 r
->res_toss_time
= jiffies
;
505 free_lvb(r
->res_lvbptr
);
506 r
->res_lvbptr
= NULL
;
510 /* When all references to the rsb are gone it's transfered to
511 the tossed list for later disposal. */
513 static void put_rsb(struct dlm_rsb
*r
)
515 struct dlm_ls
*ls
= r
->res_ls
;
516 uint32_t bucket
= r
->res_bucket
;
518 write_lock(&ls
->ls_rsbtbl
[bucket
].lock
);
519 kref_put(&r
->res_ref
, toss_rsb
);
520 write_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
523 void dlm_put_rsb(struct dlm_rsb
*r
)
528 /* See comment for unhold_lkb */
530 static void unhold_rsb(struct dlm_rsb
*r
)
533 rv
= kref_put(&r
->res_ref
, toss_rsb
);
534 DLM_ASSERT(!rv
, dlm_dump_rsb(r
););
537 static void kill_rsb(struct kref
*kref
)
539 struct dlm_rsb
*r
= container_of(kref
, struct dlm_rsb
, res_ref
);
541 /* All work is done after the return from kref_put() so we
542 can release the write_lock before the remove and free. */
544 DLM_ASSERT(list_empty(&r
->res_lookup
), dlm_dump_rsb(r
););
545 DLM_ASSERT(list_empty(&r
->res_grantqueue
), dlm_dump_rsb(r
););
546 DLM_ASSERT(list_empty(&r
->res_convertqueue
), dlm_dump_rsb(r
););
547 DLM_ASSERT(list_empty(&r
->res_waitqueue
), dlm_dump_rsb(r
););
548 DLM_ASSERT(list_empty(&r
->res_root_list
), dlm_dump_rsb(r
););
549 DLM_ASSERT(list_empty(&r
->res_recover_list
), dlm_dump_rsb(r
););
552 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
553 The rsb must exist as long as any lkb's for it do. */
555 static void attach_lkb(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
558 lkb
->lkb_resource
= r
;
561 static void detach_lkb(struct dlm_lkb
*lkb
)
563 if (lkb
->lkb_resource
) {
564 put_rsb(lkb
->lkb_resource
);
565 lkb
->lkb_resource
= NULL
;
569 static int create_lkb(struct dlm_ls
*ls
, struct dlm_lkb
**lkb_ret
)
571 struct dlm_lkb
*lkb
, *tmp
;
575 lkb
= allocate_lkb(ls
);
579 lkb
->lkb_nodeid
= -1;
580 lkb
->lkb_grmode
= DLM_LOCK_IV
;
581 kref_init(&lkb
->lkb_ref
);
582 INIT_LIST_HEAD(&lkb
->lkb_ownqueue
);
583 INIT_LIST_HEAD(&lkb
->lkb_rsb_lookup
);
585 get_random_bytes(&bucket
, sizeof(bucket
));
586 bucket
&= (ls
->ls_lkbtbl_size
- 1);
588 write_lock(&ls
->ls_lkbtbl
[bucket
].lock
);
590 /* counter can roll over so we must verify lkid is not in use */
593 lkid
= (bucket
<< 16) | ls
->ls_lkbtbl
[bucket
].counter
++;
595 list_for_each_entry(tmp
, &ls
->ls_lkbtbl
[bucket
].list
,
597 if (tmp
->lkb_id
!= lkid
)
605 list_add(&lkb
->lkb_idtbl_list
, &ls
->ls_lkbtbl
[bucket
].list
);
606 write_unlock(&ls
->ls_lkbtbl
[bucket
].lock
);
612 static struct dlm_lkb
*__find_lkb(struct dlm_ls
*ls
, uint32_t lkid
)
615 uint16_t bucket
= (lkid
>> 16);
617 list_for_each_entry(lkb
, &ls
->ls_lkbtbl
[bucket
].list
, lkb_idtbl_list
) {
618 if (lkb
->lkb_id
== lkid
)
624 static int find_lkb(struct dlm_ls
*ls
, uint32_t lkid
, struct dlm_lkb
**lkb_ret
)
627 uint16_t bucket
= (lkid
>> 16);
629 if (bucket
>= ls
->ls_lkbtbl_size
)
632 read_lock(&ls
->ls_lkbtbl
[bucket
].lock
);
633 lkb
= __find_lkb(ls
, lkid
);
635 kref_get(&lkb
->lkb_ref
);
636 read_unlock(&ls
->ls_lkbtbl
[bucket
].lock
);
639 return lkb
? 0 : -ENOENT
;
642 static void kill_lkb(struct kref
*kref
)
644 struct dlm_lkb
*lkb
= container_of(kref
, struct dlm_lkb
, lkb_ref
);
646 /* All work is done after the return from kref_put() so we
647 can release the write_lock before the detach_lkb */
649 DLM_ASSERT(!lkb
->lkb_status
, dlm_print_lkb(lkb
););
652 /* __put_lkb() is used when an lkb may not have an rsb attached to
653 it so we need to provide the lockspace explicitly */
655 static int __put_lkb(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
657 uint16_t bucket
= (lkb
->lkb_id
>> 16);
659 write_lock(&ls
->ls_lkbtbl
[bucket
].lock
);
660 if (kref_put(&lkb
->lkb_ref
, kill_lkb
)) {
661 list_del(&lkb
->lkb_idtbl_list
);
662 write_unlock(&ls
->ls_lkbtbl
[bucket
].lock
);
666 /* for local/process lkbs, lvbptr points to caller's lksb */
667 if (lkb
->lkb_lvbptr
&& is_master_copy(lkb
))
668 free_lvb(lkb
->lkb_lvbptr
);
672 write_unlock(&ls
->ls_lkbtbl
[bucket
].lock
);
677 int dlm_put_lkb(struct dlm_lkb
*lkb
)
681 DLM_ASSERT(lkb
->lkb_resource
, dlm_print_lkb(lkb
););
682 DLM_ASSERT(lkb
->lkb_resource
->res_ls
, dlm_print_lkb(lkb
););
684 ls
= lkb
->lkb_resource
->res_ls
;
685 return __put_lkb(ls
, lkb
);
688 /* This is only called to add a reference when the code already holds
689 a valid reference to the lkb, so there's no need for locking. */
691 static inline void hold_lkb(struct dlm_lkb
*lkb
)
693 kref_get(&lkb
->lkb_ref
);
696 /* This is called when we need to remove a reference and are certain
697 it's not the last ref. e.g. del_lkb is always called between a
698 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
699 put_lkb would work fine, but would involve unnecessary locking */
701 static inline void unhold_lkb(struct dlm_lkb
*lkb
)
704 rv
= kref_put(&lkb
->lkb_ref
, kill_lkb
);
705 DLM_ASSERT(!rv
, dlm_print_lkb(lkb
););
708 static void lkb_add_ordered(struct list_head
*new, struct list_head
*head
,
711 struct dlm_lkb
*lkb
= NULL
;
713 list_for_each_entry(lkb
, head
, lkb_statequeue
)
714 if (lkb
->lkb_rqmode
< mode
)
718 list_add_tail(new, head
);
720 __list_add(new, lkb
->lkb_statequeue
.prev
, &lkb
->lkb_statequeue
);
723 /* add/remove lkb to rsb's grant/convert/wait queue */
725 static void add_lkb(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int status
)
727 kref_get(&lkb
->lkb_ref
);
729 DLM_ASSERT(!lkb
->lkb_status
, dlm_print_lkb(lkb
););
731 lkb
->lkb_status
= status
;
734 case DLM_LKSTS_WAITING
:
735 if (lkb
->lkb_exflags
& DLM_LKF_HEADQUE
)
736 list_add(&lkb
->lkb_statequeue
, &r
->res_waitqueue
);
738 list_add_tail(&lkb
->lkb_statequeue
, &r
->res_waitqueue
);
740 case DLM_LKSTS_GRANTED
:
741 /* convention says granted locks kept in order of grmode */
742 lkb_add_ordered(&lkb
->lkb_statequeue
, &r
->res_grantqueue
,
745 case DLM_LKSTS_CONVERT
:
746 if (lkb
->lkb_exflags
& DLM_LKF_HEADQUE
)
747 list_add(&lkb
->lkb_statequeue
, &r
->res_convertqueue
);
749 list_add_tail(&lkb
->lkb_statequeue
,
750 &r
->res_convertqueue
);
753 DLM_ASSERT(0, dlm_print_lkb(lkb
); printk("sts=%d\n", status
););
757 static void del_lkb(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
760 list_del(&lkb
->lkb_statequeue
);
764 static void move_lkb(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int sts
)
768 add_lkb(r
, lkb
, sts
);
772 static int msg_reply_type(int mstype
)
775 case DLM_MSG_REQUEST
:
776 return DLM_MSG_REQUEST_REPLY
;
777 case DLM_MSG_CONVERT
:
778 return DLM_MSG_CONVERT_REPLY
;
780 return DLM_MSG_UNLOCK_REPLY
;
782 return DLM_MSG_CANCEL_REPLY
;
784 return DLM_MSG_LOOKUP_REPLY
;
789 /* add/remove lkb from global waiters list of lkb's waiting for
790 a reply from a remote node */
792 static int add_to_waiters(struct dlm_lkb
*lkb
, int mstype
)
794 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
797 mutex_lock(&ls
->ls_waiters_mutex
);
799 if (is_overlap_unlock(lkb
) ||
800 (is_overlap_cancel(lkb
) && (mstype
== DLM_MSG_CANCEL
))) {
805 if (lkb
->lkb_wait_type
|| is_overlap_cancel(lkb
)) {
808 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_UNLOCK
;
811 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_CANCEL
;
817 lkb
->lkb_wait_count
++;
820 log_debug(ls
, "add overlap %x cur %d new %d count %d flags %x",
821 lkb
->lkb_id
, lkb
->lkb_wait_type
, mstype
,
822 lkb
->lkb_wait_count
, lkb
->lkb_flags
);
826 DLM_ASSERT(!lkb
->lkb_wait_count
,
828 printk("wait_count %d\n", lkb
->lkb_wait_count
););
830 lkb
->lkb_wait_count
++;
831 lkb
->lkb_wait_type
= mstype
;
833 list_add(&lkb
->lkb_wait_reply
, &ls
->ls_waiters
);
836 log_error(ls
, "add_to_waiters %x error %d flags %x %d %d %s",
837 lkb
->lkb_id
, error
, lkb
->lkb_flags
, mstype
,
838 lkb
->lkb_wait_type
, lkb
->lkb_resource
->res_name
);
839 mutex_unlock(&ls
->ls_waiters_mutex
);
843 /* We clear the RESEND flag because we might be taking an lkb off the waiters
844 list as part of process_requestqueue (e.g. a lookup that has an optimized
845 request reply on the requestqueue) between dlm_recover_waiters_pre() which
846 set RESEND and dlm_recover_waiters_post() */
848 static int _remove_from_waiters(struct dlm_lkb
*lkb
, int mstype
)
850 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
851 int overlap_done
= 0;
853 if (is_overlap_unlock(lkb
) && (mstype
== DLM_MSG_UNLOCK_REPLY
)) {
854 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
859 if (is_overlap_cancel(lkb
) && (mstype
== DLM_MSG_CANCEL_REPLY
)) {
860 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
865 /* N.B. type of reply may not always correspond to type of original
866 msg due to lookup->request optimization, verify others? */
868 if (lkb
->lkb_wait_type
) {
869 lkb
->lkb_wait_type
= 0;
873 log_error(ls
, "remove_from_waiters lkid %x flags %x types %d %d",
874 lkb
->lkb_id
, lkb
->lkb_flags
, mstype
, lkb
->lkb_wait_type
);
878 /* the force-unlock/cancel has completed and we haven't recvd a reply
879 to the op that was in progress prior to the unlock/cancel; we
880 give up on any reply to the earlier op. FIXME: not sure when/how
883 if (overlap_done
&& lkb
->lkb_wait_type
) {
884 log_error(ls
, "remove_from_waiters %x reply %d give up on %d",
885 lkb
->lkb_id
, mstype
, lkb
->lkb_wait_type
);
886 lkb
->lkb_wait_count
--;
887 lkb
->lkb_wait_type
= 0;
890 DLM_ASSERT(lkb
->lkb_wait_count
, dlm_print_lkb(lkb
););
892 lkb
->lkb_flags
&= ~DLM_IFL_RESEND
;
893 lkb
->lkb_wait_count
--;
894 if (!lkb
->lkb_wait_count
)
895 list_del_init(&lkb
->lkb_wait_reply
);
900 static int remove_from_waiters(struct dlm_lkb
*lkb
, int mstype
)
902 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
905 mutex_lock(&ls
->ls_waiters_mutex
);
906 error
= _remove_from_waiters(lkb
, mstype
);
907 mutex_unlock(&ls
->ls_waiters_mutex
);
911 /* Handles situations where we might be processing a "fake" or "stub" reply in
912 which we can't try to take waiters_mutex again. */
914 static int remove_from_waiters_ms(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
916 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
919 if (ms
!= &ls
->ls_stub_ms
)
920 mutex_lock(&ls
->ls_waiters_mutex
);
921 error
= _remove_from_waiters(lkb
, ms
->m_type
);
922 if (ms
!= &ls
->ls_stub_ms
)
923 mutex_unlock(&ls
->ls_waiters_mutex
);
927 static void dir_remove(struct dlm_rsb
*r
)
931 if (dlm_no_directory(r
->res_ls
))
934 to_nodeid
= dlm_dir_nodeid(r
);
935 if (to_nodeid
!= dlm_our_nodeid())
938 dlm_dir_remove_entry(r
->res_ls
, to_nodeid
,
939 r
->res_name
, r
->res_length
);
942 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
943 found since they are in order of newest to oldest? */
945 static int shrink_bucket(struct dlm_ls
*ls
, int b
)
948 int count
= 0, found
;
952 write_lock(&ls
->ls_rsbtbl
[b
].lock
);
953 list_for_each_entry_reverse(r
, &ls
->ls_rsbtbl
[b
].toss
,
955 if (!time_after_eq(jiffies
, r
->res_toss_time
+
956 dlm_config
.ci_toss_secs
* HZ
))
963 write_unlock(&ls
->ls_rsbtbl
[b
].lock
);
967 if (kref_put(&r
->res_ref
, kill_rsb
)) {
968 list_del(&r
->res_hashchain
);
969 write_unlock(&ls
->ls_rsbtbl
[b
].lock
);
976 write_unlock(&ls
->ls_rsbtbl
[b
].lock
);
977 log_error(ls
, "tossed rsb in use %s", r
->res_name
);
984 void dlm_scan_rsbs(struct dlm_ls
*ls
)
988 if (dlm_locking_stopped(ls
))
991 for (i
= 0; i
< ls
->ls_rsbtbl_size
; i
++) {
992 shrink_bucket(ls
, i
);
997 /* lkb is master or local copy */
999 static void set_lvb_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1001 int b
, len
= r
->res_ls
->ls_lvblen
;
1003 /* b=1 lvb returned to caller
1004 b=0 lvb written to rsb or invalidated
1007 b
= dlm_lvb_operations
[lkb
->lkb_grmode
+ 1][lkb
->lkb_rqmode
+ 1];
1010 if (!lkb
->lkb_lvbptr
)
1013 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
1019 memcpy(lkb
->lkb_lvbptr
, r
->res_lvbptr
, len
);
1020 lkb
->lkb_lvbseq
= r
->res_lvbseq
;
1022 } else if (b
== 0) {
1023 if (lkb
->lkb_exflags
& DLM_LKF_IVVALBLK
) {
1024 rsb_set_flag(r
, RSB_VALNOTVALID
);
1028 if (!lkb
->lkb_lvbptr
)
1031 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
1035 r
->res_lvbptr
= allocate_lvb(r
->res_ls
);
1040 memcpy(r
->res_lvbptr
, lkb
->lkb_lvbptr
, len
);
1042 lkb
->lkb_lvbseq
= r
->res_lvbseq
;
1043 rsb_clear_flag(r
, RSB_VALNOTVALID
);
1046 if (rsb_flag(r
, RSB_VALNOTVALID
))
1047 lkb
->lkb_sbflags
|= DLM_SBF_VALNOTVALID
;
1050 static void set_lvb_unlock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1052 if (lkb
->lkb_grmode
< DLM_LOCK_PW
)
1055 if (lkb
->lkb_exflags
& DLM_LKF_IVVALBLK
) {
1056 rsb_set_flag(r
, RSB_VALNOTVALID
);
1060 if (!lkb
->lkb_lvbptr
)
1063 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
1067 r
->res_lvbptr
= allocate_lvb(r
->res_ls
);
1072 memcpy(r
->res_lvbptr
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
1074 rsb_clear_flag(r
, RSB_VALNOTVALID
);
1077 /* lkb is process copy (pc) */
1079 static void set_lvb_lock_pc(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
1080 struct dlm_message
*ms
)
1084 if (!lkb
->lkb_lvbptr
)
1087 if (!(lkb
->lkb_exflags
& DLM_LKF_VALBLK
))
1090 b
= dlm_lvb_operations
[lkb
->lkb_grmode
+ 1][lkb
->lkb_rqmode
+ 1];
1092 int len
= receive_extralen(ms
);
1093 memcpy(lkb
->lkb_lvbptr
, ms
->m_extra
, len
);
1094 lkb
->lkb_lvbseq
= ms
->m_lvbseq
;
1098 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1099 remove_lock -- used for unlock, removes lkb from granted
1100 revert_lock -- used for cancel, moves lkb from convert to granted
1101 grant_lock -- used for request and convert, adds lkb to granted or
1102 moves lkb from convert or waiting to granted
1104 Each of these is used for master or local copy lkb's. There is
1105 also a _pc() variation used to make the corresponding change on
1106 a process copy (pc) lkb. */
1108 static void _remove_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1111 lkb
->lkb_grmode
= DLM_LOCK_IV
;
1112 /* this unhold undoes the original ref from create_lkb()
1113 so this leads to the lkb being freed */
1117 static void remove_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1119 set_lvb_unlock(r
, lkb
);
1120 _remove_lock(r
, lkb
);
1123 static void remove_lock_pc(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1125 _remove_lock(r
, lkb
);
1128 /* returns: 0 did nothing
1129 1 moved lock to granted
1132 static int revert_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1136 lkb
->lkb_rqmode
= DLM_LOCK_IV
;
1138 switch (lkb
->lkb_status
) {
1139 case DLM_LKSTS_GRANTED
:
1141 case DLM_LKSTS_CONVERT
:
1142 move_lkb(r
, lkb
, DLM_LKSTS_GRANTED
);
1145 case DLM_LKSTS_WAITING
:
1147 lkb
->lkb_grmode
= DLM_LOCK_IV
;
1148 /* this unhold undoes the original ref from create_lkb()
1149 so this leads to the lkb being freed */
1154 log_print("invalid status for revert %d", lkb
->lkb_status
);
1159 static int revert_lock_pc(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1161 return revert_lock(r
, lkb
);
1164 static void _grant_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1166 if (lkb
->lkb_grmode
!= lkb
->lkb_rqmode
) {
1167 lkb
->lkb_grmode
= lkb
->lkb_rqmode
;
1168 if (lkb
->lkb_status
)
1169 move_lkb(r
, lkb
, DLM_LKSTS_GRANTED
);
1171 add_lkb(r
, lkb
, DLM_LKSTS_GRANTED
);
1174 lkb
->lkb_rqmode
= DLM_LOCK_IV
;
1177 static void grant_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1179 set_lvb_lock(r
, lkb
);
1180 _grant_lock(r
, lkb
);
1181 lkb
->lkb_highbast
= 0;
1184 static void grant_lock_pc(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
1185 struct dlm_message
*ms
)
1187 set_lvb_lock_pc(r
, lkb
, ms
);
1188 _grant_lock(r
, lkb
);
1191 /* called by grant_pending_locks() which means an async grant message must
1192 be sent to the requesting node in addition to granting the lock if the
1193 lkb belongs to a remote node. */
1195 static void grant_lock_pending(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1198 if (is_master_copy(lkb
))
1201 queue_cast(r
, lkb
, 0);
1204 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1205 change the granted/requested modes. We're munging things accordingly in
1207 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1209 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1210 compatible with other granted locks */
1212 static void munge_demoted(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
1214 if (ms
->m_type
!= DLM_MSG_CONVERT_REPLY
) {
1215 log_print("munge_demoted %x invalid reply type %d",
1216 lkb
->lkb_id
, ms
->m_type
);
1220 if (lkb
->lkb_rqmode
== DLM_LOCK_IV
|| lkb
->lkb_grmode
== DLM_LOCK_IV
) {
1221 log_print("munge_demoted %x invalid modes gr %d rq %d",
1222 lkb
->lkb_id
, lkb
->lkb_grmode
, lkb
->lkb_rqmode
);
1226 lkb
->lkb_grmode
= DLM_LOCK_NL
;
1229 static void munge_altmode(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
1231 if (ms
->m_type
!= DLM_MSG_REQUEST_REPLY
&&
1232 ms
->m_type
!= DLM_MSG_GRANT
) {
1233 log_print("munge_altmode %x invalid reply type %d",
1234 lkb
->lkb_id
, ms
->m_type
);
1238 if (lkb
->lkb_exflags
& DLM_LKF_ALTPR
)
1239 lkb
->lkb_rqmode
= DLM_LOCK_PR
;
1240 else if (lkb
->lkb_exflags
& DLM_LKF_ALTCW
)
1241 lkb
->lkb_rqmode
= DLM_LOCK_CW
;
1243 log_print("munge_altmode invalid exflags %x", lkb
->lkb_exflags
);
1248 static inline int first_in_list(struct dlm_lkb
*lkb
, struct list_head
*head
)
1250 struct dlm_lkb
*first
= list_entry(head
->next
, struct dlm_lkb
,
1252 if (lkb
->lkb_id
== first
->lkb_id
)
1258 /* Check if the given lkb conflicts with another lkb on the queue. */
1260 static int queue_conflict(struct list_head
*head
, struct dlm_lkb
*lkb
)
1262 struct dlm_lkb
*this;
1264 list_for_each_entry(this, head
, lkb_statequeue
) {
1267 if (!modes_compat(this, lkb
))
1274 * "A conversion deadlock arises with a pair of lock requests in the converting
1275 * queue for one resource. The granted mode of each lock blocks the requested
1276 * mode of the other lock."
1278 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1279 * convert queue from being granted, then demote lkb (set grmode to NL).
1280 * This second form requires that we check for conv-deadlk even when
1281 * now == 0 in _can_be_granted().
1284 * Granted Queue: empty
1285 * Convert Queue: NL->EX (first lock)
1286 * PR->EX (second lock)
1288 * The first lock can't be granted because of the granted mode of the second
1289 * lock and the second lock can't be granted because it's not first in the
1290 * list. We demote the granted mode of the second lock (the lkb passed to this
1293 * After the resolution, the "grant pending" function needs to go back and try
1294 * to grant locks on the convert queue again since the first lock can now be
1298 static int conversion_deadlock_detect(struct dlm_rsb
*rsb
, struct dlm_lkb
*lkb
)
1300 struct dlm_lkb
*this, *first
= NULL
, *self
= NULL
;
1302 list_for_each_entry(this, &rsb
->res_convertqueue
, lkb_statequeue
) {
1310 if (!modes_compat(this, lkb
) && !modes_compat(lkb
, this))
1314 /* if lkb is on the convert queue and is preventing the first
1315 from being granted, then there's deadlock and we demote lkb.
1316 multiple converting locks may need to do this before the first
1317 converting lock can be granted. */
1319 if (self
&& self
!= first
) {
1320 if (!modes_compat(lkb
, first
) &&
1321 !queue_conflict(&rsb
->res_grantqueue
, first
))
1329 * Return 1 if the lock can be granted, 0 otherwise.
1330 * Also detect and resolve conversion deadlocks.
1332 * lkb is the lock to be granted
1334 * now is 1 if the function is being called in the context of the
1335 * immediate request, it is 0 if called later, after the lock has been
1338 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1341 static int _can_be_granted(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int now
)
1343 int8_t conv
= (lkb
->lkb_grmode
!= DLM_LOCK_IV
);
1346 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1347 * a new request for a NL mode lock being blocked.
1349 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1350 * request, then it would be granted. In essence, the use of this flag
1351 * tells the Lock Manager to expedite theis request by not considering
1352 * what may be in the CONVERTING or WAITING queues... As of this
1353 * writing, the EXPEDITE flag can be used only with new requests for NL
1354 * mode locks. This flag is not valid for conversion requests.
1356 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1357 * conversion or used with a non-NL requested mode. We also know an
1358 * EXPEDITE request is always granted immediately, so now must always
1359 * be 1. The full condition to grant an expedite request: (now &&
1360 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1361 * therefore be shortened to just checking the flag.
1364 if (lkb
->lkb_exflags
& DLM_LKF_EXPEDITE
)
1368 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1369 * added to the remaining conditions.
1372 if (queue_conflict(&r
->res_grantqueue
, lkb
))
1376 * 6-3: By default, a conversion request is immediately granted if the
1377 * requested mode is compatible with the modes of all other granted
1381 if (queue_conflict(&r
->res_convertqueue
, lkb
))
1385 * 6-5: But the default algorithm for deciding whether to grant or
1386 * queue conversion requests does not by itself guarantee that such
1387 * requests are serviced on a "first come first serve" basis. This, in
1388 * turn, can lead to a phenomenon known as "indefinate postponement".
1390 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1391 * the system service employed to request a lock conversion. This flag
1392 * forces certain conversion requests to be queued, even if they are
1393 * compatible with the granted modes of other locks on the same
1394 * resource. Thus, the use of this flag results in conversion requests
1395 * being ordered on a "first come first servce" basis.
1397 * DCT: This condition is all about new conversions being able to occur
1398 * "in place" while the lock remains on the granted queue (assuming
1399 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1400 * doesn't _have_ to go onto the convert queue where it's processed in
1401 * order. The "now" variable is necessary to distinguish converts
1402 * being received and processed for the first time now, because once a
1403 * convert is moved to the conversion queue the condition below applies
1404 * requiring fifo granting.
1407 if (now
&& conv
&& !(lkb
->lkb_exflags
& DLM_LKF_QUECVT
))
1411 * The NOORDER flag is set to avoid the standard vms rules on grant
1415 if (lkb
->lkb_exflags
& DLM_LKF_NOORDER
)
1419 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1420 * granted until all other conversion requests ahead of it are granted
1424 if (!now
&& conv
&& first_in_list(lkb
, &r
->res_convertqueue
))
1428 * 6-4: By default, a new request is immediately granted only if all
1429 * three of the following conditions are satisfied when the request is
1431 * - The queue of ungranted conversion requests for the resource is
1433 * - The queue of ungranted new requests for the resource is empty.
1434 * - The mode of the new request is compatible with the most
1435 * restrictive mode of all granted locks on the resource.
1438 if (now
&& !conv
&& list_empty(&r
->res_convertqueue
) &&
1439 list_empty(&r
->res_waitqueue
))
1443 * 6-4: Once a lock request is in the queue of ungranted new requests,
1444 * it cannot be granted until the queue of ungranted conversion
1445 * requests is empty, all ungranted new requests ahead of it are
1446 * granted and/or canceled, and it is compatible with the granted mode
1447 * of the most restrictive lock granted on the resource.
1450 if (!now
&& !conv
&& list_empty(&r
->res_convertqueue
) &&
1451 first_in_list(lkb
, &r
->res_waitqueue
))
1456 * The following, enabled by CONVDEADLK, departs from VMS.
1459 if (conv
&& (lkb
->lkb_exflags
& DLM_LKF_CONVDEADLK
) &&
1460 conversion_deadlock_detect(r
, lkb
)) {
1461 lkb
->lkb_grmode
= DLM_LOCK_NL
;
1462 lkb
->lkb_sbflags
|= DLM_SBF_DEMOTED
;
1469 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1470 * simple way to provide a big optimization to applications that can use them.
1473 static int can_be_granted(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int now
)
1475 uint32_t flags
= lkb
->lkb_exflags
;
1477 int8_t alt
= 0, rqmode
= lkb
->lkb_rqmode
;
1479 rv
= _can_be_granted(r
, lkb
, now
);
1483 if (lkb
->lkb_sbflags
& DLM_SBF_DEMOTED
)
1486 if (rqmode
!= DLM_LOCK_PR
&& flags
& DLM_LKF_ALTPR
)
1488 else if (rqmode
!= DLM_LOCK_CW
&& flags
& DLM_LKF_ALTCW
)
1492 lkb
->lkb_rqmode
= alt
;
1493 rv
= _can_be_granted(r
, lkb
, now
);
1495 lkb
->lkb_sbflags
|= DLM_SBF_ALTMODE
;
1497 lkb
->lkb_rqmode
= rqmode
;
1503 static int grant_pending_convert(struct dlm_rsb
*r
, int high
)
1505 struct dlm_lkb
*lkb
, *s
;
1506 int hi
, demoted
, quit
, grant_restart
, demote_restart
;
1514 list_for_each_entry_safe(lkb
, s
, &r
->res_convertqueue
, lkb_statequeue
) {
1515 demoted
= is_demoted(lkb
);
1516 if (can_be_granted(r
, lkb
, 0)) {
1517 grant_lock_pending(r
, lkb
);
1520 hi
= max_t(int, lkb
->lkb_rqmode
, hi
);
1521 if (!demoted
&& is_demoted(lkb
))
1528 if (demote_restart
&& !quit
) {
1533 return max_t(int, high
, hi
);
1536 static int grant_pending_wait(struct dlm_rsb
*r
, int high
)
1538 struct dlm_lkb
*lkb
, *s
;
1540 list_for_each_entry_safe(lkb
, s
, &r
->res_waitqueue
, lkb_statequeue
) {
1541 if (can_be_granted(r
, lkb
, 0))
1542 grant_lock_pending(r
, lkb
);
1544 high
= max_t(int, lkb
->lkb_rqmode
, high
);
1550 static void grant_pending_locks(struct dlm_rsb
*r
)
1552 struct dlm_lkb
*lkb
, *s
;
1553 int high
= DLM_LOCK_IV
;
1555 DLM_ASSERT(is_master(r
), dlm_dump_rsb(r
););
1557 high
= grant_pending_convert(r
, high
);
1558 high
= grant_pending_wait(r
, high
);
1560 if (high
== DLM_LOCK_IV
)
1564 * If there are locks left on the wait/convert queue then send blocking
1565 * ASTs to granted locks based on the largest requested mode (high)
1566 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1569 list_for_each_entry_safe(lkb
, s
, &r
->res_grantqueue
, lkb_statequeue
) {
1570 if (lkb
->lkb_bastaddr
&& (lkb
->lkb_highbast
< high
) &&
1571 !__dlm_compat_matrix
[lkb
->lkb_grmode
+1][high
+1]) {
1572 queue_bast(r
, lkb
, high
);
1573 lkb
->lkb_highbast
= high
;
1578 static void send_bast_queue(struct dlm_rsb
*r
, struct list_head
*head
,
1579 struct dlm_lkb
*lkb
)
1583 list_for_each_entry(gr
, head
, lkb_statequeue
) {
1584 if (gr
->lkb_bastaddr
&&
1585 gr
->lkb_highbast
< lkb
->lkb_rqmode
&&
1586 !modes_compat(gr
, lkb
)) {
1587 queue_bast(r
, gr
, lkb
->lkb_rqmode
);
1588 gr
->lkb_highbast
= lkb
->lkb_rqmode
;
1593 static void send_blocking_asts(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1595 send_bast_queue(r
, &r
->res_grantqueue
, lkb
);
1598 static void send_blocking_asts_all(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1600 send_bast_queue(r
, &r
->res_grantqueue
, lkb
);
1601 send_bast_queue(r
, &r
->res_convertqueue
, lkb
);
1604 /* set_master(r, lkb) -- set the master nodeid of a resource
1606 The purpose of this function is to set the nodeid field in the given
1607 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1608 known, it can just be copied to the lkb and the function will return
1609 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1610 before it can be copied to the lkb.
1612 When the rsb nodeid is being looked up remotely, the initial lkb
1613 causing the lookup is kept on the ls_waiters list waiting for the
1614 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1615 on the rsb's res_lookup list until the master is verified.
1618 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1619 1: the rsb master is not available and the lkb has been placed on
1623 static int set_master(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1625 struct dlm_ls
*ls
= r
->res_ls
;
1626 int error
, dir_nodeid
, ret_nodeid
, our_nodeid
= dlm_our_nodeid();
1628 if (rsb_flag(r
, RSB_MASTER_UNCERTAIN
)) {
1629 rsb_clear_flag(r
, RSB_MASTER_UNCERTAIN
);
1630 r
->res_first_lkid
= lkb
->lkb_id
;
1631 lkb
->lkb_nodeid
= r
->res_nodeid
;
1635 if (r
->res_first_lkid
&& r
->res_first_lkid
!= lkb
->lkb_id
) {
1636 list_add_tail(&lkb
->lkb_rsb_lookup
, &r
->res_lookup
);
1640 if (r
->res_nodeid
== 0) {
1641 lkb
->lkb_nodeid
= 0;
1645 if (r
->res_nodeid
> 0) {
1646 lkb
->lkb_nodeid
= r
->res_nodeid
;
1650 DLM_ASSERT(r
->res_nodeid
== -1, dlm_dump_rsb(r
););
1652 dir_nodeid
= dlm_dir_nodeid(r
);
1654 if (dir_nodeid
!= our_nodeid
) {
1655 r
->res_first_lkid
= lkb
->lkb_id
;
1656 send_lookup(r
, lkb
);
1661 /* It's possible for dlm_scand to remove an old rsb for
1662 this same resource from the toss list, us to create
1663 a new one, look up the master locally, and find it
1664 already exists just before dlm_scand does the
1665 dir_remove() on the previous rsb. */
1667 error
= dlm_dir_lookup(ls
, our_nodeid
, r
->res_name
,
1668 r
->res_length
, &ret_nodeid
);
1671 log_debug(ls
, "dir_lookup error %d %s", error
, r
->res_name
);
1675 if (ret_nodeid
== our_nodeid
) {
1676 r
->res_first_lkid
= 0;
1678 lkb
->lkb_nodeid
= 0;
1680 r
->res_first_lkid
= lkb
->lkb_id
;
1681 r
->res_nodeid
= ret_nodeid
;
1682 lkb
->lkb_nodeid
= ret_nodeid
;
1687 static void process_lookup_list(struct dlm_rsb
*r
)
1689 struct dlm_lkb
*lkb
, *safe
;
1691 list_for_each_entry_safe(lkb
, safe
, &r
->res_lookup
, lkb_rsb_lookup
) {
1692 list_del_init(&lkb
->lkb_rsb_lookup
);
1693 _request_lock(r
, lkb
);
1698 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1700 static void confirm_master(struct dlm_rsb
*r
, int error
)
1702 struct dlm_lkb
*lkb
;
1704 if (!r
->res_first_lkid
)
1710 r
->res_first_lkid
= 0;
1711 process_lookup_list(r
);
1715 /* the remote master didn't queue our NOQUEUE request;
1716 make a waiting lkb the first_lkid */
1718 r
->res_first_lkid
= 0;
1720 if (!list_empty(&r
->res_lookup
)) {
1721 lkb
= list_entry(r
->res_lookup
.next
, struct dlm_lkb
,
1723 list_del_init(&lkb
->lkb_rsb_lookup
);
1724 r
->res_first_lkid
= lkb
->lkb_id
;
1725 _request_lock(r
, lkb
);
1731 log_error(r
->res_ls
, "confirm_master unknown error %d", error
);
1735 static int set_lock_args(int mode
, struct dlm_lksb
*lksb
, uint32_t flags
,
1736 int namelen
, uint32_t parent_lkid
, void *ast
,
1737 void *astarg
, void *bast
, struct dlm_args
*args
)
1741 /* check for invalid arg usage */
1743 if (mode
< 0 || mode
> DLM_LOCK_EX
)
1746 if (!(flags
& DLM_LKF_CONVERT
) && (namelen
> DLM_RESNAME_MAXLEN
))
1749 if (flags
& DLM_LKF_CANCEL
)
1752 if (flags
& DLM_LKF_QUECVT
&& !(flags
& DLM_LKF_CONVERT
))
1755 if (flags
& DLM_LKF_CONVDEADLK
&& !(flags
& DLM_LKF_CONVERT
))
1758 if (flags
& DLM_LKF_CONVDEADLK
&& flags
& DLM_LKF_NOQUEUE
)
1761 if (flags
& DLM_LKF_EXPEDITE
&& flags
& DLM_LKF_CONVERT
)
1764 if (flags
& DLM_LKF_EXPEDITE
&& flags
& DLM_LKF_QUECVT
)
1767 if (flags
& DLM_LKF_EXPEDITE
&& flags
& DLM_LKF_NOQUEUE
)
1770 if (flags
& DLM_LKF_EXPEDITE
&& mode
!= DLM_LOCK_NL
)
1776 if (flags
& DLM_LKF_VALBLK
&& !lksb
->sb_lvbptr
)
1779 /* parent/child locks not yet supported */
1783 if (flags
& DLM_LKF_CONVERT
&& !lksb
->sb_lkid
)
1786 /* these args will be copied to the lkb in validate_lock_args,
1787 it cannot be done now because when converting locks, fields in
1788 an active lkb cannot be modified before locking the rsb */
1790 args
->flags
= flags
;
1791 args
->astaddr
= ast
;
1792 args
->astparam
= (long) astarg
;
1793 args
->bastaddr
= bast
;
1801 static int set_unlock_args(uint32_t flags
, void *astarg
, struct dlm_args
*args
)
1803 if (flags
& ~(DLM_LKF_CANCEL
| DLM_LKF_VALBLK
| DLM_LKF_IVVALBLK
|
1804 DLM_LKF_FORCEUNLOCK
))
1807 if (flags
& DLM_LKF_CANCEL
&& flags
& DLM_LKF_FORCEUNLOCK
)
1810 args
->flags
= flags
;
1811 args
->astparam
= (long) astarg
;
1815 static int validate_lock_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
1816 struct dlm_args
*args
)
1820 if (args
->flags
& DLM_LKF_CONVERT
) {
1821 if (lkb
->lkb_flags
& DLM_IFL_MSTCPY
)
1824 if (args
->flags
& DLM_LKF_QUECVT
&&
1825 !__quecvt_compat_matrix
[lkb
->lkb_grmode
+1][args
->mode
+1])
1829 if (lkb
->lkb_status
!= DLM_LKSTS_GRANTED
)
1832 if (lkb
->lkb_wait_type
)
1835 if (is_overlap(lkb
))
1839 lkb
->lkb_exflags
= args
->flags
;
1840 lkb
->lkb_sbflags
= 0;
1841 lkb
->lkb_astaddr
= args
->astaddr
;
1842 lkb
->lkb_astparam
= args
->astparam
;
1843 lkb
->lkb_bastaddr
= args
->bastaddr
;
1844 lkb
->lkb_rqmode
= args
->mode
;
1845 lkb
->lkb_lksb
= args
->lksb
;
1846 lkb
->lkb_lvbptr
= args
->lksb
->sb_lvbptr
;
1847 lkb
->lkb_ownpid
= (int) current
->pid
;
1853 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1856 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1857 because there may be a lookup in progress and it's valid to do
1858 cancel/unlockf on it */
1860 static int validate_unlock_args(struct dlm_lkb
*lkb
, struct dlm_args
*args
)
1862 struct dlm_ls
*ls
= lkb
->lkb_resource
->res_ls
;
1865 if (lkb
->lkb_flags
& DLM_IFL_MSTCPY
) {
1866 log_error(ls
, "unlock on MSTCPY %x", lkb
->lkb_id
);
1871 /* an lkb may still exist even though the lock is EOL'ed due to a
1872 cancel, unlock or failed noqueue request; an app can't use these
1873 locks; return same error as if the lkid had not been found at all */
1875 if (lkb
->lkb_flags
& DLM_IFL_ENDOFLIFE
) {
1876 log_debug(ls
, "unlock on ENDOFLIFE %x", lkb
->lkb_id
);
1881 /* an lkb may be waiting for an rsb lookup to complete where the
1882 lookup was initiated by another lock */
1884 if (args
->flags
& (DLM_LKF_CANCEL
| DLM_LKF_FORCEUNLOCK
)) {
1885 if (!list_empty(&lkb
->lkb_rsb_lookup
)) {
1886 log_debug(ls
, "unlock on rsb_lookup %x", lkb
->lkb_id
);
1887 list_del_init(&lkb
->lkb_rsb_lookup
);
1888 queue_cast(lkb
->lkb_resource
, lkb
,
1889 args
->flags
& DLM_LKF_CANCEL
?
1890 -DLM_ECANCEL
: -DLM_EUNLOCK
);
1891 unhold_lkb(lkb
); /* undoes create_lkb() */
1897 /* cancel not allowed with another cancel/unlock in progress */
1899 if (args
->flags
& DLM_LKF_CANCEL
) {
1900 if (lkb
->lkb_exflags
& DLM_LKF_CANCEL
)
1903 if (is_overlap(lkb
))
1906 if (lkb
->lkb_flags
& DLM_IFL_RESEND
) {
1907 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_CANCEL
;
1912 switch (lkb
->lkb_wait_type
) {
1913 case DLM_MSG_LOOKUP
:
1914 case DLM_MSG_REQUEST
:
1915 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_CANCEL
;
1918 case DLM_MSG_UNLOCK
:
1919 case DLM_MSG_CANCEL
:
1922 /* add_to_waiters() will set OVERLAP_CANCEL */
1926 /* do we need to allow a force-unlock if there's a normal unlock
1927 already in progress? in what conditions could the normal unlock
1928 fail such that we'd want to send a force-unlock to be sure? */
1930 if (args
->flags
& DLM_LKF_FORCEUNLOCK
) {
1931 if (lkb
->lkb_exflags
& DLM_LKF_FORCEUNLOCK
)
1934 if (is_overlap_unlock(lkb
))
1937 if (lkb
->lkb_flags
& DLM_IFL_RESEND
) {
1938 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_UNLOCK
;
1943 switch (lkb
->lkb_wait_type
) {
1944 case DLM_MSG_LOOKUP
:
1945 case DLM_MSG_REQUEST
:
1946 lkb
->lkb_flags
|= DLM_IFL_OVERLAP_UNLOCK
;
1949 case DLM_MSG_UNLOCK
:
1952 /* add_to_waiters() will set OVERLAP_UNLOCK */
1956 /* normal unlock not allowed if there's any op in progress */
1958 if (lkb
->lkb_wait_type
|| lkb
->lkb_wait_count
)
1962 /* an overlapping op shouldn't blow away exflags from other op */
1963 lkb
->lkb_exflags
|= args
->flags
;
1964 lkb
->lkb_sbflags
= 0;
1965 lkb
->lkb_astparam
= args
->astparam
;
1969 log_debug(ls
, "validate_unlock_args %d %x %x %x %x %d %s", rv
,
1970 lkb
->lkb_id
, lkb
->lkb_flags
, lkb
->lkb_exflags
,
1971 args
->flags
, lkb
->lkb_wait_type
,
1972 lkb
->lkb_resource
->res_name
);
1977 * Four stage 4 varieties:
1978 * do_request(), do_convert(), do_unlock(), do_cancel()
1979 * These are called on the master node for the given lock and
1980 * from the central locking logic.
1983 static int do_request(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
1987 if (can_be_granted(r
, lkb
, 1)) {
1989 queue_cast(r
, lkb
, 0);
1993 if (can_be_queued(lkb
)) {
1994 error
= -EINPROGRESS
;
1995 add_lkb(r
, lkb
, DLM_LKSTS_WAITING
);
1996 send_blocking_asts(r
, lkb
);
2001 if (force_blocking_asts(lkb
))
2002 send_blocking_asts_all(r
, lkb
);
2003 queue_cast(r
, lkb
, -EAGAIN
);
2009 static int do_convert(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2013 /* changing an existing lock may allow others to be granted */
2015 if (can_be_granted(r
, lkb
, 1)) {
2017 queue_cast(r
, lkb
, 0);
2018 grant_pending_locks(r
);
2022 /* is_demoted() means the can_be_granted() above set the grmode
2023 to NL, and left us on the granted queue. This auto-demotion
2024 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2025 now grantable. We have to try to grant other converting locks
2026 before we try again to grant this one. */
2028 if (is_demoted(lkb
)) {
2029 grant_pending_convert(r
, DLM_LOCK_IV
);
2030 if (_can_be_granted(r
, lkb
, 1)) {
2032 queue_cast(r
, lkb
, 0);
2033 grant_pending_locks(r
);
2036 /* else fall through and move to convert queue */
2039 if (can_be_queued(lkb
)) {
2040 error
= -EINPROGRESS
;
2042 add_lkb(r
, lkb
, DLM_LKSTS_CONVERT
);
2043 send_blocking_asts(r
, lkb
);
2048 if (force_blocking_asts(lkb
))
2049 send_blocking_asts_all(r
, lkb
);
2050 queue_cast(r
, lkb
, -EAGAIN
);
2056 static int do_unlock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2058 remove_lock(r
, lkb
);
2059 queue_cast(r
, lkb
, -DLM_EUNLOCK
);
2060 grant_pending_locks(r
);
2061 return -DLM_EUNLOCK
;
2064 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2066 static int do_cancel(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2070 error
= revert_lock(r
, lkb
);
2072 queue_cast(r
, lkb
, -DLM_ECANCEL
);
2073 grant_pending_locks(r
);
2074 return -DLM_ECANCEL
;
2080 * Four stage 3 varieties:
2081 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2084 /* add a new lkb to a possibly new rsb, called by requesting process */
2086 static int _request_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2090 /* set_master: sets lkb nodeid from r */
2092 error
= set_master(r
, lkb
);
2101 /* receive_request() calls do_request() on remote node */
2102 error
= send_request(r
, lkb
);
2104 error
= do_request(r
, lkb
);
2109 /* change some property of an existing lkb, e.g. mode */
2111 static int _convert_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2116 /* receive_convert() calls do_convert() on remote node */
2117 error
= send_convert(r
, lkb
);
2119 error
= do_convert(r
, lkb
);
2124 /* remove an existing lkb from the granted queue */
2126 static int _unlock_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2131 /* receive_unlock() calls do_unlock() on remote node */
2132 error
= send_unlock(r
, lkb
);
2134 error
= do_unlock(r
, lkb
);
2139 /* remove an existing lkb from the convert or wait queue */
2141 static int _cancel_lock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2146 /* receive_cancel() calls do_cancel() on remote node */
2147 error
= send_cancel(r
, lkb
);
2149 error
= do_cancel(r
, lkb
);
2155 * Four stage 2 varieties:
2156 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2159 static int request_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
, char *name
,
2160 int len
, struct dlm_args
*args
)
2165 error
= validate_lock_args(ls
, lkb
, args
);
2169 error
= find_rsb(ls
, name
, len
, R_CREATE
, &r
);
2176 lkb
->lkb_lksb
->sb_lkid
= lkb
->lkb_id
;
2178 error
= _request_lock(r
, lkb
);
2187 static int convert_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2188 struct dlm_args
*args
)
2193 r
= lkb
->lkb_resource
;
2198 error
= validate_lock_args(ls
, lkb
, args
);
2202 error
= _convert_lock(r
, lkb
);
2209 static int unlock_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2210 struct dlm_args
*args
)
2215 r
= lkb
->lkb_resource
;
2220 error
= validate_unlock_args(lkb
, args
);
2224 error
= _unlock_lock(r
, lkb
);
2231 static int cancel_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2232 struct dlm_args
*args
)
2237 r
= lkb
->lkb_resource
;
2242 error
= validate_unlock_args(lkb
, args
);
2246 error
= _cancel_lock(r
, lkb
);
2254 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2257 int dlm_lock(dlm_lockspace_t
*lockspace
,
2259 struct dlm_lksb
*lksb
,
2262 unsigned int namelen
,
2263 uint32_t parent_lkid
,
2264 void (*ast
) (void *astarg
),
2266 void (*bast
) (void *astarg
, int mode
))
2269 struct dlm_lkb
*lkb
;
2270 struct dlm_args args
;
2271 int error
, convert
= flags
& DLM_LKF_CONVERT
;
2273 ls
= dlm_find_lockspace_local(lockspace
);
2280 error
= find_lkb(ls
, lksb
->sb_lkid
, &lkb
);
2282 error
= create_lkb(ls
, &lkb
);
2287 error
= set_lock_args(mode
, lksb
, flags
, namelen
, parent_lkid
, ast
,
2288 astarg
, bast
, &args
);
2293 error
= convert_lock(ls
, lkb
, &args
);
2295 error
= request_lock(ls
, lkb
, name
, namelen
, &args
);
2297 if (error
== -EINPROGRESS
)
2300 if (convert
|| error
)
2302 if (error
== -EAGAIN
)
2305 unlock_recovery(ls
);
2306 dlm_put_lockspace(ls
);
2310 int dlm_unlock(dlm_lockspace_t
*lockspace
,
2313 struct dlm_lksb
*lksb
,
2317 struct dlm_lkb
*lkb
;
2318 struct dlm_args args
;
2321 ls
= dlm_find_lockspace_local(lockspace
);
2327 error
= find_lkb(ls
, lkid
, &lkb
);
2331 error
= set_unlock_args(flags
, astarg
, &args
);
2335 if (flags
& DLM_LKF_CANCEL
)
2336 error
= cancel_lock(ls
, lkb
, &args
);
2338 error
= unlock_lock(ls
, lkb
, &args
);
2340 if (error
== -DLM_EUNLOCK
|| error
== -DLM_ECANCEL
)
2342 if (error
== -EBUSY
&& (flags
& (DLM_LKF_CANCEL
| DLM_LKF_FORCEUNLOCK
)))
2347 unlock_recovery(ls
);
2348 dlm_put_lockspace(ls
);
2353 * send/receive routines for remote operations and replies
2357 * send_request receive_request
2358 * send_convert receive_convert
2359 * send_unlock receive_unlock
2360 * send_cancel receive_cancel
2361 * send_grant receive_grant
2362 * send_bast receive_bast
2363 * send_lookup receive_lookup
2364 * send_remove receive_remove
2367 * receive_request_reply send_request_reply
2368 * receive_convert_reply send_convert_reply
2369 * receive_unlock_reply send_unlock_reply
2370 * receive_cancel_reply send_cancel_reply
2371 * receive_lookup_reply send_lookup_reply
2374 static int _create_message(struct dlm_ls
*ls
, int mb_len
,
2375 int to_nodeid
, int mstype
,
2376 struct dlm_message
**ms_ret
,
2377 struct dlm_mhandle
**mh_ret
)
2379 struct dlm_message
*ms
;
2380 struct dlm_mhandle
*mh
;
2383 /* get_buffer gives us a message handle (mh) that we need to
2384 pass into lowcomms_commit and a message buffer (mb) that we
2385 write our data into */
2387 mh
= dlm_lowcomms_get_buffer(to_nodeid
, mb_len
, GFP_KERNEL
, &mb
);
2391 memset(mb
, 0, mb_len
);
2393 ms
= (struct dlm_message
*) mb
;
2395 ms
->m_header
.h_version
= (DLM_HEADER_MAJOR
| DLM_HEADER_MINOR
);
2396 ms
->m_header
.h_lockspace
= ls
->ls_global_id
;
2397 ms
->m_header
.h_nodeid
= dlm_our_nodeid();
2398 ms
->m_header
.h_length
= mb_len
;
2399 ms
->m_header
.h_cmd
= DLM_MSG
;
2401 ms
->m_type
= mstype
;
2408 static int create_message(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2409 int to_nodeid
, int mstype
,
2410 struct dlm_message
**ms_ret
,
2411 struct dlm_mhandle
**mh_ret
)
2413 int mb_len
= sizeof(struct dlm_message
);
2416 case DLM_MSG_REQUEST
:
2417 case DLM_MSG_LOOKUP
:
2418 case DLM_MSG_REMOVE
:
2419 mb_len
+= r
->res_length
;
2421 case DLM_MSG_CONVERT
:
2422 case DLM_MSG_UNLOCK
:
2423 case DLM_MSG_REQUEST_REPLY
:
2424 case DLM_MSG_CONVERT_REPLY
:
2426 if (lkb
&& lkb
->lkb_lvbptr
)
2427 mb_len
+= r
->res_ls
->ls_lvblen
;
2431 return _create_message(r
->res_ls
, mb_len
, to_nodeid
, mstype
,
2435 /* further lowcomms enhancements or alternate implementations may make
2436 the return value from this function useful at some point */
2438 static int send_message(struct dlm_mhandle
*mh
, struct dlm_message
*ms
)
2440 dlm_message_out(ms
);
2441 dlm_lowcomms_commit_buffer(mh
);
2445 static void send_args(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2446 struct dlm_message
*ms
)
2448 ms
->m_nodeid
= lkb
->lkb_nodeid
;
2449 ms
->m_pid
= lkb
->lkb_ownpid
;
2450 ms
->m_lkid
= lkb
->lkb_id
;
2451 ms
->m_remid
= lkb
->lkb_remid
;
2452 ms
->m_exflags
= lkb
->lkb_exflags
;
2453 ms
->m_sbflags
= lkb
->lkb_sbflags
;
2454 ms
->m_flags
= lkb
->lkb_flags
;
2455 ms
->m_lvbseq
= lkb
->lkb_lvbseq
;
2456 ms
->m_status
= lkb
->lkb_status
;
2457 ms
->m_grmode
= lkb
->lkb_grmode
;
2458 ms
->m_rqmode
= lkb
->lkb_rqmode
;
2459 ms
->m_hash
= r
->res_hash
;
2461 /* m_result and m_bastmode are set from function args,
2462 not from lkb fields */
2464 if (lkb
->lkb_bastaddr
)
2465 ms
->m_asts
|= AST_BAST
;
2466 if (lkb
->lkb_astaddr
)
2467 ms
->m_asts
|= AST_COMP
;
2469 /* compare with switch in create_message; send_remove() doesn't
2472 switch (ms
->m_type
) {
2473 case DLM_MSG_REQUEST
:
2474 case DLM_MSG_LOOKUP
:
2475 memcpy(ms
->m_extra
, r
->res_name
, r
->res_length
);
2477 case DLM_MSG_CONVERT
:
2478 case DLM_MSG_UNLOCK
:
2479 case DLM_MSG_REQUEST_REPLY
:
2480 case DLM_MSG_CONVERT_REPLY
:
2482 if (!lkb
->lkb_lvbptr
)
2484 memcpy(ms
->m_extra
, lkb
->lkb_lvbptr
, r
->res_ls
->ls_lvblen
);
2489 static int send_common(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int mstype
)
2491 struct dlm_message
*ms
;
2492 struct dlm_mhandle
*mh
;
2493 int to_nodeid
, error
;
2495 error
= add_to_waiters(lkb
, mstype
);
2499 to_nodeid
= r
->res_nodeid
;
2501 error
= create_message(r
, lkb
, to_nodeid
, mstype
, &ms
, &mh
);
2505 send_args(r
, lkb
, ms
);
2507 error
= send_message(mh
, ms
);
2513 remove_from_waiters(lkb
, msg_reply_type(mstype
));
2517 static int send_request(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2519 return send_common(r
, lkb
, DLM_MSG_REQUEST
);
2522 static int send_convert(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2526 error
= send_common(r
, lkb
, DLM_MSG_CONVERT
);
2528 /* down conversions go without a reply from the master */
2529 if (!error
&& down_conversion(lkb
)) {
2530 remove_from_waiters(lkb
, DLM_MSG_CONVERT_REPLY
);
2531 r
->res_ls
->ls_stub_ms
.m_type
= DLM_MSG_CONVERT_REPLY
;
2532 r
->res_ls
->ls_stub_ms
.m_result
= 0;
2533 r
->res_ls
->ls_stub_ms
.m_flags
= lkb
->lkb_flags
;
2534 __receive_convert_reply(r
, lkb
, &r
->res_ls
->ls_stub_ms
);
2540 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2541 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2542 that the master is still correct. */
2544 static int send_unlock(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2546 return send_common(r
, lkb
, DLM_MSG_UNLOCK
);
2549 static int send_cancel(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2551 return send_common(r
, lkb
, DLM_MSG_CANCEL
);
2554 static int send_grant(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2556 struct dlm_message
*ms
;
2557 struct dlm_mhandle
*mh
;
2558 int to_nodeid
, error
;
2560 to_nodeid
= lkb
->lkb_nodeid
;
2562 error
= create_message(r
, lkb
, to_nodeid
, DLM_MSG_GRANT
, &ms
, &mh
);
2566 send_args(r
, lkb
, ms
);
2570 error
= send_message(mh
, ms
);
2575 static int send_bast(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int mode
)
2577 struct dlm_message
*ms
;
2578 struct dlm_mhandle
*mh
;
2579 int to_nodeid
, error
;
2581 to_nodeid
= lkb
->lkb_nodeid
;
2583 error
= create_message(r
, NULL
, to_nodeid
, DLM_MSG_BAST
, &ms
, &mh
);
2587 send_args(r
, lkb
, ms
);
2589 ms
->m_bastmode
= mode
;
2591 error
= send_message(mh
, ms
);
2596 static int send_lookup(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
)
2598 struct dlm_message
*ms
;
2599 struct dlm_mhandle
*mh
;
2600 int to_nodeid
, error
;
2602 error
= add_to_waiters(lkb
, DLM_MSG_LOOKUP
);
2606 to_nodeid
= dlm_dir_nodeid(r
);
2608 error
= create_message(r
, NULL
, to_nodeid
, DLM_MSG_LOOKUP
, &ms
, &mh
);
2612 send_args(r
, lkb
, ms
);
2614 error
= send_message(mh
, ms
);
2620 remove_from_waiters(lkb
, DLM_MSG_LOOKUP_REPLY
);
2624 static int send_remove(struct dlm_rsb
*r
)
2626 struct dlm_message
*ms
;
2627 struct dlm_mhandle
*mh
;
2628 int to_nodeid
, error
;
2630 to_nodeid
= dlm_dir_nodeid(r
);
2632 error
= create_message(r
, NULL
, to_nodeid
, DLM_MSG_REMOVE
, &ms
, &mh
);
2636 memcpy(ms
->m_extra
, r
->res_name
, r
->res_length
);
2637 ms
->m_hash
= r
->res_hash
;
2639 error
= send_message(mh
, ms
);
2644 static int send_common_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
2647 struct dlm_message
*ms
;
2648 struct dlm_mhandle
*mh
;
2649 int to_nodeid
, error
;
2651 to_nodeid
= lkb
->lkb_nodeid
;
2653 error
= create_message(r
, lkb
, to_nodeid
, mstype
, &ms
, &mh
);
2657 send_args(r
, lkb
, ms
);
2661 error
= send_message(mh
, ms
);
2666 static int send_request_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
2668 return send_common_reply(r
, lkb
, DLM_MSG_REQUEST_REPLY
, rv
);
2671 static int send_convert_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
2673 return send_common_reply(r
, lkb
, DLM_MSG_CONVERT_REPLY
, rv
);
2676 static int send_unlock_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
2678 return send_common_reply(r
, lkb
, DLM_MSG_UNLOCK_REPLY
, rv
);
2681 static int send_cancel_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
, int rv
)
2683 return send_common_reply(r
, lkb
, DLM_MSG_CANCEL_REPLY
, rv
);
2686 static int send_lookup_reply(struct dlm_ls
*ls
, struct dlm_message
*ms_in
,
2687 int ret_nodeid
, int rv
)
2689 struct dlm_rsb
*r
= &ls
->ls_stub_rsb
;
2690 struct dlm_message
*ms
;
2691 struct dlm_mhandle
*mh
;
2692 int error
, nodeid
= ms_in
->m_header
.h_nodeid
;
2694 error
= create_message(r
, NULL
, nodeid
, DLM_MSG_LOOKUP_REPLY
, &ms
, &mh
);
2698 ms
->m_lkid
= ms_in
->m_lkid
;
2700 ms
->m_nodeid
= ret_nodeid
;
2702 error
= send_message(mh
, ms
);
2707 /* which args we save from a received message depends heavily on the type
2708 of message, unlike the send side where we can safely send everything about
2709 the lkb for any type of message */
2711 static void receive_flags(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
2713 lkb
->lkb_exflags
= ms
->m_exflags
;
2714 lkb
->lkb_sbflags
= ms
->m_sbflags
;
2715 lkb
->lkb_flags
= (lkb
->lkb_flags
& 0xFFFF0000) |
2716 (ms
->m_flags
& 0x0000FFFF);
2719 static void receive_flags_reply(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
2721 lkb
->lkb_sbflags
= ms
->m_sbflags
;
2722 lkb
->lkb_flags
= (lkb
->lkb_flags
& 0xFFFF0000) |
2723 (ms
->m_flags
& 0x0000FFFF);
2726 static int receive_extralen(struct dlm_message
*ms
)
2728 return (ms
->m_header
.h_length
- sizeof(struct dlm_message
));
2731 static int receive_lvb(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2732 struct dlm_message
*ms
)
2736 if (lkb
->lkb_exflags
& DLM_LKF_VALBLK
) {
2737 if (!lkb
->lkb_lvbptr
)
2738 lkb
->lkb_lvbptr
= allocate_lvb(ls
);
2739 if (!lkb
->lkb_lvbptr
)
2741 len
= receive_extralen(ms
);
2742 memcpy(lkb
->lkb_lvbptr
, ms
->m_extra
, len
);
2747 static int receive_request_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2748 struct dlm_message
*ms
)
2750 lkb
->lkb_nodeid
= ms
->m_header
.h_nodeid
;
2751 lkb
->lkb_ownpid
= ms
->m_pid
;
2752 lkb
->lkb_remid
= ms
->m_lkid
;
2753 lkb
->lkb_grmode
= DLM_LOCK_IV
;
2754 lkb
->lkb_rqmode
= ms
->m_rqmode
;
2755 lkb
->lkb_bastaddr
= (void *) (long) (ms
->m_asts
& AST_BAST
);
2756 lkb
->lkb_astaddr
= (void *) (long) (ms
->m_asts
& AST_COMP
);
2758 DLM_ASSERT(is_master_copy(lkb
), dlm_print_lkb(lkb
););
2760 if (lkb
->lkb_exflags
& DLM_LKF_VALBLK
) {
2761 /* lkb was just created so there won't be an lvb yet */
2762 lkb
->lkb_lvbptr
= allocate_lvb(ls
);
2763 if (!lkb
->lkb_lvbptr
)
2770 static int receive_convert_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2771 struct dlm_message
*ms
)
2773 if (lkb
->lkb_nodeid
!= ms
->m_header
.h_nodeid
) {
2774 log_error(ls
, "convert_args nodeid %d %d lkid %x %x",
2775 lkb
->lkb_nodeid
, ms
->m_header
.h_nodeid
,
2776 lkb
->lkb_id
, lkb
->lkb_remid
);
2780 if (!is_master_copy(lkb
))
2783 if (lkb
->lkb_status
!= DLM_LKSTS_GRANTED
)
2786 if (receive_lvb(ls
, lkb
, ms
))
2789 lkb
->lkb_rqmode
= ms
->m_rqmode
;
2790 lkb
->lkb_lvbseq
= ms
->m_lvbseq
;
2795 static int receive_unlock_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
2796 struct dlm_message
*ms
)
2798 if (!is_master_copy(lkb
))
2800 if (receive_lvb(ls
, lkb
, ms
))
2805 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2806 uses to send a reply and that the remote end uses to process the reply. */
2808 static void setup_stub_lkb(struct dlm_ls
*ls
, struct dlm_message
*ms
)
2810 struct dlm_lkb
*lkb
= &ls
->ls_stub_lkb
;
2811 lkb
->lkb_nodeid
= ms
->m_header
.h_nodeid
;
2812 lkb
->lkb_remid
= ms
->m_lkid
;
2815 static void receive_request(struct dlm_ls
*ls
, struct dlm_message
*ms
)
2817 struct dlm_lkb
*lkb
;
2821 error
= create_lkb(ls
, &lkb
);
2825 receive_flags(lkb
, ms
);
2826 lkb
->lkb_flags
|= DLM_IFL_MSTCPY
;
2827 error
= receive_request_args(ls
, lkb
, ms
);
2833 namelen
= receive_extralen(ms
);
2835 error
= find_rsb(ls
, ms
->m_extra
, namelen
, R_MASTER
, &r
);
2844 error
= do_request(r
, lkb
);
2845 send_request_reply(r
, lkb
, error
);
2850 if (error
== -EINPROGRESS
)
2857 setup_stub_lkb(ls
, ms
);
2858 send_request_reply(&ls
->ls_stub_rsb
, &ls
->ls_stub_lkb
, error
);
2861 static void receive_convert(struct dlm_ls
*ls
, struct dlm_message
*ms
)
2863 struct dlm_lkb
*lkb
;
2865 int error
, reply
= 1;
2867 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
2871 r
= lkb
->lkb_resource
;
2876 receive_flags(lkb
, ms
);
2877 error
= receive_convert_args(ls
, lkb
, ms
);
2880 reply
= !down_conversion(lkb
);
2882 error
= do_convert(r
, lkb
);
2885 send_convert_reply(r
, lkb
, error
);
2893 setup_stub_lkb(ls
, ms
);
2894 send_convert_reply(&ls
->ls_stub_rsb
, &ls
->ls_stub_lkb
, error
);
2897 static void receive_unlock(struct dlm_ls
*ls
, struct dlm_message
*ms
)
2899 struct dlm_lkb
*lkb
;
2903 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
2907 r
= lkb
->lkb_resource
;
2912 receive_flags(lkb
, ms
);
2913 error
= receive_unlock_args(ls
, lkb
, ms
);
2917 error
= do_unlock(r
, lkb
);
2919 send_unlock_reply(r
, lkb
, error
);
2927 setup_stub_lkb(ls
, ms
);
2928 send_unlock_reply(&ls
->ls_stub_rsb
, &ls
->ls_stub_lkb
, error
);
2931 static void receive_cancel(struct dlm_ls
*ls
, struct dlm_message
*ms
)
2933 struct dlm_lkb
*lkb
;
2937 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
2941 receive_flags(lkb
, ms
);
2943 r
= lkb
->lkb_resource
;
2948 error
= do_cancel(r
, lkb
);
2949 send_cancel_reply(r
, lkb
, error
);
2957 setup_stub_lkb(ls
, ms
);
2958 send_cancel_reply(&ls
->ls_stub_rsb
, &ls
->ls_stub_lkb
, error
);
2961 static void receive_grant(struct dlm_ls
*ls
, struct dlm_message
*ms
)
2963 struct dlm_lkb
*lkb
;
2967 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
2969 log_error(ls
, "receive_grant no lkb");
2972 DLM_ASSERT(is_process_copy(lkb
), dlm_print_lkb(lkb
););
2974 r
= lkb
->lkb_resource
;
2979 receive_flags_reply(lkb
, ms
);
2980 if (is_altmode(lkb
))
2981 munge_altmode(lkb
, ms
);
2982 grant_lock_pc(r
, lkb
, ms
);
2983 queue_cast(r
, lkb
, 0);
2990 static void receive_bast(struct dlm_ls
*ls
, struct dlm_message
*ms
)
2992 struct dlm_lkb
*lkb
;
2996 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
2998 log_error(ls
, "receive_bast no lkb");
3001 DLM_ASSERT(is_process_copy(lkb
), dlm_print_lkb(lkb
););
3003 r
= lkb
->lkb_resource
;
3008 queue_bast(r
, lkb
, ms
->m_bastmode
);
3015 static void receive_lookup(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3017 int len
, error
, ret_nodeid
, dir_nodeid
, from_nodeid
, our_nodeid
;
3019 from_nodeid
= ms
->m_header
.h_nodeid
;
3020 our_nodeid
= dlm_our_nodeid();
3022 len
= receive_extralen(ms
);
3024 dir_nodeid
= dlm_hash2nodeid(ls
, ms
->m_hash
);
3025 if (dir_nodeid
!= our_nodeid
) {
3026 log_error(ls
, "lookup dir_nodeid %d from %d",
3027 dir_nodeid
, from_nodeid
);
3033 error
= dlm_dir_lookup(ls
, from_nodeid
, ms
->m_extra
, len
, &ret_nodeid
);
3035 /* Optimization: we're master so treat lookup as a request */
3036 if (!error
&& ret_nodeid
== our_nodeid
) {
3037 receive_request(ls
, ms
);
3041 send_lookup_reply(ls
, ms
, ret_nodeid
, error
);
3044 static void receive_remove(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3046 int len
, dir_nodeid
, from_nodeid
;
3048 from_nodeid
= ms
->m_header
.h_nodeid
;
3050 len
= receive_extralen(ms
);
3052 dir_nodeid
= dlm_hash2nodeid(ls
, ms
->m_hash
);
3053 if (dir_nodeid
!= dlm_our_nodeid()) {
3054 log_error(ls
, "remove dir entry dir_nodeid %d from %d",
3055 dir_nodeid
, from_nodeid
);
3059 dlm_dir_remove_entry(ls
, from_nodeid
, ms
->m_extra
, len
);
3062 static void receive_purge(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3064 do_purge(ls
, ms
->m_nodeid
, ms
->m_pid
);
3067 static void receive_request_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3069 struct dlm_lkb
*lkb
;
3071 int error
, mstype
, result
;
3073 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3075 log_error(ls
, "receive_request_reply no lkb");
3078 DLM_ASSERT(is_process_copy(lkb
), dlm_print_lkb(lkb
););
3080 r
= lkb
->lkb_resource
;
3084 mstype
= lkb
->lkb_wait_type
;
3085 error
= remove_from_waiters(lkb
, DLM_MSG_REQUEST_REPLY
);
3089 /* Optimization: the dir node was also the master, so it took our
3090 lookup as a request and sent request reply instead of lookup reply */
3091 if (mstype
== DLM_MSG_LOOKUP
) {
3092 r
->res_nodeid
= ms
->m_header
.h_nodeid
;
3093 lkb
->lkb_nodeid
= r
->res_nodeid
;
3096 /* this is the value returned from do_request() on the master */
3097 result
= ms
->m_result
;
3101 /* request would block (be queued) on remote master */
3102 queue_cast(r
, lkb
, -EAGAIN
);
3103 confirm_master(r
, -EAGAIN
);
3104 unhold_lkb(lkb
); /* undoes create_lkb() */
3109 /* request was queued or granted on remote master */
3110 receive_flags_reply(lkb
, ms
);
3111 lkb
->lkb_remid
= ms
->m_lkid
;
3112 if (is_altmode(lkb
))
3113 munge_altmode(lkb
, ms
);
3115 add_lkb(r
, lkb
, DLM_LKSTS_WAITING
);
3117 grant_lock_pc(r
, lkb
, ms
);
3118 queue_cast(r
, lkb
, 0);
3120 confirm_master(r
, result
);
3125 /* find_rsb failed to find rsb or rsb wasn't master */
3126 log_debug(ls
, "receive_request_reply %x %x master diff %d %d",
3127 lkb
->lkb_id
, lkb
->lkb_flags
, r
->res_nodeid
, result
);
3129 lkb
->lkb_nodeid
= -1;
3131 if (is_overlap(lkb
)) {
3132 /* we'll ignore error in cancel/unlock reply */
3133 queue_cast_overlap(r
, lkb
);
3134 unhold_lkb(lkb
); /* undoes create_lkb() */
3136 _request_lock(r
, lkb
);
3140 log_error(ls
, "receive_request_reply %x error %d",
3141 lkb
->lkb_id
, result
);
3144 if (is_overlap_unlock(lkb
) && (result
== 0 || result
== -EINPROGRESS
)) {
3145 log_debug(ls
, "receive_request_reply %x result %d unlock",
3146 lkb
->lkb_id
, result
);
3147 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
3148 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
3149 send_unlock(r
, lkb
);
3150 } else if (is_overlap_cancel(lkb
) && (result
== -EINPROGRESS
)) {
3151 log_debug(ls
, "receive_request_reply %x cancel", lkb
->lkb_id
);
3152 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
3153 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
3154 send_cancel(r
, lkb
);
3156 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
3157 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
3165 static void __receive_convert_reply(struct dlm_rsb
*r
, struct dlm_lkb
*lkb
,
3166 struct dlm_message
*ms
)
3168 /* this is the value returned from do_convert() on the master */
3169 switch (ms
->m_result
) {
3171 /* convert would block (be queued) on remote master */
3172 queue_cast(r
, lkb
, -EAGAIN
);
3176 /* convert was queued on remote master */
3177 receive_flags_reply(lkb
, ms
);
3178 if (is_demoted(lkb
))
3179 munge_demoted(lkb
, ms
);
3181 add_lkb(r
, lkb
, DLM_LKSTS_CONVERT
);
3185 /* convert was granted on remote master */
3186 receive_flags_reply(lkb
, ms
);
3187 if (is_demoted(lkb
))
3188 munge_demoted(lkb
, ms
);
3189 grant_lock_pc(r
, lkb
, ms
);
3190 queue_cast(r
, lkb
, 0);
3194 log_error(r
->res_ls
, "receive_convert_reply %x error %d",
3195 lkb
->lkb_id
, ms
->m_result
);
3199 static void _receive_convert_reply(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3201 struct dlm_rsb
*r
= lkb
->lkb_resource
;
3207 /* stub reply can happen with waiters_mutex held */
3208 error
= remove_from_waiters_ms(lkb
, ms
);
3212 __receive_convert_reply(r
, lkb
, ms
);
3218 static void receive_convert_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3220 struct dlm_lkb
*lkb
;
3223 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3225 log_error(ls
, "receive_convert_reply no lkb");
3228 DLM_ASSERT(is_process_copy(lkb
), dlm_print_lkb(lkb
););
3230 _receive_convert_reply(lkb
, ms
);
3234 static void _receive_unlock_reply(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3236 struct dlm_rsb
*r
= lkb
->lkb_resource
;
3242 /* stub reply can happen with waiters_mutex held */
3243 error
= remove_from_waiters_ms(lkb
, ms
);
3247 /* this is the value returned from do_unlock() on the master */
3249 switch (ms
->m_result
) {
3251 receive_flags_reply(lkb
, ms
);
3252 remove_lock_pc(r
, lkb
);
3253 queue_cast(r
, lkb
, -DLM_EUNLOCK
);
3258 log_error(r
->res_ls
, "receive_unlock_reply %x error %d",
3259 lkb
->lkb_id
, ms
->m_result
);
3266 static void receive_unlock_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3268 struct dlm_lkb
*lkb
;
3271 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3273 log_error(ls
, "receive_unlock_reply no lkb");
3276 DLM_ASSERT(is_process_copy(lkb
), dlm_print_lkb(lkb
););
3278 _receive_unlock_reply(lkb
, ms
);
3282 static void _receive_cancel_reply(struct dlm_lkb
*lkb
, struct dlm_message
*ms
)
3284 struct dlm_rsb
*r
= lkb
->lkb_resource
;
3290 /* stub reply can happen with waiters_mutex held */
3291 error
= remove_from_waiters_ms(lkb
, ms
);
3295 /* this is the value returned from do_cancel() on the master */
3297 switch (ms
->m_result
) {
3299 receive_flags_reply(lkb
, ms
);
3300 revert_lock_pc(r
, lkb
);
3302 queue_cast(r
, lkb
, -DLM_ECANCEL
);
3307 log_error(r
->res_ls
, "receive_cancel_reply %x error %d",
3308 lkb
->lkb_id
, ms
->m_result
);
3315 static void receive_cancel_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3317 struct dlm_lkb
*lkb
;
3320 error
= find_lkb(ls
, ms
->m_remid
, &lkb
);
3322 log_error(ls
, "receive_cancel_reply no lkb");
3325 DLM_ASSERT(is_process_copy(lkb
), dlm_print_lkb(lkb
););
3327 _receive_cancel_reply(lkb
, ms
);
3331 static void receive_lookup_reply(struct dlm_ls
*ls
, struct dlm_message
*ms
)
3333 struct dlm_lkb
*lkb
;
3335 int error
, ret_nodeid
;
3337 error
= find_lkb(ls
, ms
->m_lkid
, &lkb
);
3339 log_error(ls
, "receive_lookup_reply no lkb");
3343 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3344 FIXME: will a non-zero error ever be returned? */
3346 r
= lkb
->lkb_resource
;
3350 error
= remove_from_waiters(lkb
, DLM_MSG_LOOKUP_REPLY
);
3354 ret_nodeid
= ms
->m_nodeid
;
3355 if (ret_nodeid
== dlm_our_nodeid()) {
3358 r
->res_first_lkid
= 0;
3360 /* set_master() will copy res_nodeid to lkb_nodeid */
3361 r
->res_nodeid
= ret_nodeid
;
3364 if (is_overlap(lkb
)) {
3365 log_debug(ls
, "receive_lookup_reply %x unlock %x",
3366 lkb
->lkb_id
, lkb
->lkb_flags
);
3367 queue_cast_overlap(r
, lkb
);
3368 unhold_lkb(lkb
); /* undoes create_lkb() */
3372 _request_lock(r
, lkb
);
3376 process_lookup_list(r
);
3383 int dlm_receive_message(struct dlm_header
*hd
, int nodeid
, int recovery
)
3385 struct dlm_message
*ms
= (struct dlm_message
*) hd
;
3392 ls
= dlm_find_lockspace_global(hd
->h_lockspace
);
3394 log_print("drop message %d from %d for unknown lockspace %d",
3395 ms
->m_type
, nodeid
, hd
->h_lockspace
);
3399 /* recovery may have just ended leaving a bunch of backed-up requests
3400 in the requestqueue; wait while dlm_recoverd clears them */
3403 dlm_wait_requestqueue(ls
);
3405 /* recovery may have just started while there were a bunch of
3406 in-flight requests -- save them in requestqueue to be processed
3407 after recovery. we can't let dlm_recvd block on the recovery
3408 lock. if dlm_recoverd is calling this function to clear the
3409 requestqueue, it needs to be interrupted (-EINTR) if another
3410 recovery operation is starting. */
3413 if (dlm_locking_stopped(ls
)) {
3418 error
= dlm_add_requestqueue(ls
, nodeid
, hd
);
3419 if (error
== -EAGAIN
)
3427 if (lock_recovery_try(ls
))
3432 switch (ms
->m_type
) {
3434 /* messages sent to a master node */
3436 case DLM_MSG_REQUEST
:
3437 receive_request(ls
, ms
);
3440 case DLM_MSG_CONVERT
:
3441 receive_convert(ls
, ms
);
3444 case DLM_MSG_UNLOCK
:
3445 receive_unlock(ls
, ms
);
3448 case DLM_MSG_CANCEL
:
3449 receive_cancel(ls
, ms
);
3452 /* messages sent from a master node (replies to above) */
3454 case DLM_MSG_REQUEST_REPLY
:
3455 receive_request_reply(ls
, ms
);
3458 case DLM_MSG_CONVERT_REPLY
:
3459 receive_convert_reply(ls
, ms
);
3462 case DLM_MSG_UNLOCK_REPLY
:
3463 receive_unlock_reply(ls
, ms
);
3466 case DLM_MSG_CANCEL_REPLY
:
3467 receive_cancel_reply(ls
, ms
);
3470 /* messages sent from a master node (only two types of async msg) */
3473 receive_grant(ls
, ms
);
3477 receive_bast(ls
, ms
);
3480 /* messages sent to a dir node */
3482 case DLM_MSG_LOOKUP
:
3483 receive_lookup(ls
, ms
);
3486 case DLM_MSG_REMOVE
:
3487 receive_remove(ls
, ms
);
3490 /* messages sent from a dir node (remove has no reply) */
3492 case DLM_MSG_LOOKUP_REPLY
:
3493 receive_lookup_reply(ls
, ms
);
3496 /* other messages */
3499 receive_purge(ls
, ms
);
3503 log_error(ls
, "unknown message type %d", ms
->m_type
);
3506 unlock_recovery(ls
);
3508 dlm_put_lockspace(ls
);
3518 static void recover_convert_waiter(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
3520 if (middle_conversion(lkb
)) {
3522 ls
->ls_stub_ms
.m_type
= DLM_MSG_CONVERT_REPLY
;
3523 ls
->ls_stub_ms
.m_result
= -EINPROGRESS
;
3524 ls
->ls_stub_ms
.m_flags
= lkb
->lkb_flags
;
3525 _receive_convert_reply(lkb
, &ls
->ls_stub_ms
);
3527 /* Same special case as in receive_rcom_lock_args() */
3528 lkb
->lkb_grmode
= DLM_LOCK_IV
;
3529 rsb_set_flag(lkb
->lkb_resource
, RSB_RECOVER_CONVERT
);
3532 } else if (lkb
->lkb_rqmode
>= lkb
->lkb_grmode
) {
3533 lkb
->lkb_flags
|= DLM_IFL_RESEND
;
3536 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3537 conversions are async; there's no reply from the remote master */
3540 /* A waiting lkb needs recovery if the master node has failed, or
3541 the master node is changing (only when no directory is used) */
3543 static int waiter_needs_recovery(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
3545 if (dlm_is_removed(ls
, lkb
->lkb_nodeid
))
3548 if (!dlm_no_directory(ls
))
3551 if (dlm_dir_nodeid(lkb
->lkb_resource
) != lkb
->lkb_nodeid
)
3557 /* Recovery for locks that are waiting for replies from nodes that are now
3558 gone. We can just complete unlocks and cancels by faking a reply from the
3559 dead node. Requests and up-conversions we flag to be resent after
3560 recovery. Down-conversions can just be completed with a fake reply like
3561 unlocks. Conversions between PR and CW need special attention. */
3563 void dlm_recover_waiters_pre(struct dlm_ls
*ls
)
3565 struct dlm_lkb
*lkb
, *safe
;
3567 mutex_lock(&ls
->ls_waiters_mutex
);
3569 list_for_each_entry_safe(lkb
, safe
, &ls
->ls_waiters
, lkb_wait_reply
) {
3570 log_debug(ls
, "pre recover waiter lkid %x type %d flags %x",
3571 lkb
->lkb_id
, lkb
->lkb_wait_type
, lkb
->lkb_flags
);
3573 /* all outstanding lookups, regardless of destination will be
3574 resent after recovery is done */
3576 if (lkb
->lkb_wait_type
== DLM_MSG_LOOKUP
) {
3577 lkb
->lkb_flags
|= DLM_IFL_RESEND
;
3581 if (!waiter_needs_recovery(ls
, lkb
))
3584 switch (lkb
->lkb_wait_type
) {
3586 case DLM_MSG_REQUEST
:
3587 lkb
->lkb_flags
|= DLM_IFL_RESEND
;
3590 case DLM_MSG_CONVERT
:
3591 recover_convert_waiter(ls
, lkb
);
3594 case DLM_MSG_UNLOCK
:
3596 ls
->ls_stub_ms
.m_type
= DLM_MSG_UNLOCK_REPLY
;
3597 ls
->ls_stub_ms
.m_result
= -DLM_EUNLOCK
;
3598 ls
->ls_stub_ms
.m_flags
= lkb
->lkb_flags
;
3599 _receive_unlock_reply(lkb
, &ls
->ls_stub_ms
);
3603 case DLM_MSG_CANCEL
:
3605 ls
->ls_stub_ms
.m_type
= DLM_MSG_CANCEL_REPLY
;
3606 ls
->ls_stub_ms
.m_result
= -DLM_ECANCEL
;
3607 ls
->ls_stub_ms
.m_flags
= lkb
->lkb_flags
;
3608 _receive_cancel_reply(lkb
, &ls
->ls_stub_ms
);
3613 log_error(ls
, "invalid lkb wait_type %d",
3614 lkb
->lkb_wait_type
);
3618 mutex_unlock(&ls
->ls_waiters_mutex
);
3621 static struct dlm_lkb
*find_resend_waiter(struct dlm_ls
*ls
)
3623 struct dlm_lkb
*lkb
;
3626 mutex_lock(&ls
->ls_waiters_mutex
);
3627 list_for_each_entry(lkb
, &ls
->ls_waiters
, lkb_wait_reply
) {
3628 if (lkb
->lkb_flags
& DLM_IFL_RESEND
) {
3634 mutex_unlock(&ls
->ls_waiters_mutex
);
3641 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3642 master or dir-node for r. Processing the lkb may result in it being placed
3645 /* We do this after normal locking has been enabled and any saved messages
3646 (in requestqueue) have been processed. We should be confident that at
3647 this point we won't get or process a reply to any of these waiting
3648 operations. But, new ops may be coming in on the rsbs/locks here from
3649 userspace or remotely. */
3651 /* there may have been an overlap unlock/cancel prior to recovery or after
3652 recovery. if before, the lkb may still have a pos wait_count; if after, the
3653 overlap flag would just have been set and nothing new sent. we can be
3654 confident here than any replies to either the initial op or overlap ops
3655 prior to recovery have been received. */
3657 int dlm_recover_waiters_post(struct dlm_ls
*ls
)
3659 struct dlm_lkb
*lkb
;
3661 int error
= 0, mstype
, err
, oc
, ou
;
3664 if (dlm_locking_stopped(ls
)) {
3665 log_debug(ls
, "recover_waiters_post aborted");
3670 lkb
= find_resend_waiter(ls
);
3674 r
= lkb
->lkb_resource
;
3678 mstype
= lkb
->lkb_wait_type
;
3679 oc
= is_overlap_cancel(lkb
);
3680 ou
= is_overlap_unlock(lkb
);
3683 log_debug(ls
, "recover_waiters_post %x type %d flags %x %s",
3684 lkb
->lkb_id
, mstype
, lkb
->lkb_flags
, r
->res_name
);
3686 /* At this point we assume that we won't get a reply to any
3687 previous op or overlap op on this lock. First, do a big
3688 remove_from_waiters() for all previous ops. */
3690 lkb
->lkb_flags
&= ~DLM_IFL_RESEND
;
3691 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_UNLOCK
;
3692 lkb
->lkb_flags
&= ~DLM_IFL_OVERLAP_CANCEL
;
3693 lkb
->lkb_wait_type
= 0;
3694 lkb
->lkb_wait_count
= 0;
3695 mutex_lock(&ls
->ls_waiters_mutex
);
3696 list_del_init(&lkb
->lkb_wait_reply
);
3697 mutex_unlock(&ls
->ls_waiters_mutex
);
3698 unhold_lkb(lkb
); /* for waiters list */
3701 /* do an unlock or cancel instead of resending */
3703 case DLM_MSG_LOOKUP
:
3704 case DLM_MSG_REQUEST
:
3705 queue_cast(r
, lkb
, ou
? -DLM_EUNLOCK
:
3707 unhold_lkb(lkb
); /* undoes create_lkb() */
3709 case DLM_MSG_CONVERT
:
3711 queue_cast(r
, lkb
, -DLM_ECANCEL
);
3713 lkb
->lkb_exflags
|= DLM_LKF_FORCEUNLOCK
;
3714 _unlock_lock(r
, lkb
);
3722 case DLM_MSG_LOOKUP
:
3723 case DLM_MSG_REQUEST
:
3724 _request_lock(r
, lkb
);
3726 confirm_master(r
, 0);
3728 case DLM_MSG_CONVERT
:
3729 _convert_lock(r
, lkb
);
3737 log_error(ls
, "recover_waiters_post %x %d %x %d %d",
3738 lkb
->lkb_id
, mstype
, lkb
->lkb_flags
, oc
, ou
);
3747 static void purge_queue(struct dlm_rsb
*r
, struct list_head
*queue
,
3748 int (*test
)(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
))
3750 struct dlm_ls
*ls
= r
->res_ls
;
3751 struct dlm_lkb
*lkb
, *safe
;
3753 list_for_each_entry_safe(lkb
, safe
, queue
, lkb_statequeue
) {
3754 if (test(ls
, lkb
)) {
3755 rsb_set_flag(r
, RSB_LOCKS_PURGED
);
3757 /* this put should free the lkb */
3758 if (!dlm_put_lkb(lkb
))
3759 log_error(ls
, "purged lkb not released");
3764 static int purge_dead_test(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
3766 return (is_master_copy(lkb
) && dlm_is_removed(ls
, lkb
->lkb_nodeid
));
3769 static int purge_mstcpy_test(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
3771 return is_master_copy(lkb
);
3774 static void purge_dead_locks(struct dlm_rsb
*r
)
3776 purge_queue(r
, &r
->res_grantqueue
, &purge_dead_test
);
3777 purge_queue(r
, &r
->res_convertqueue
, &purge_dead_test
);
3778 purge_queue(r
, &r
->res_waitqueue
, &purge_dead_test
);
3781 void dlm_purge_mstcpy_locks(struct dlm_rsb
*r
)
3783 purge_queue(r
, &r
->res_grantqueue
, &purge_mstcpy_test
);
3784 purge_queue(r
, &r
->res_convertqueue
, &purge_mstcpy_test
);
3785 purge_queue(r
, &r
->res_waitqueue
, &purge_mstcpy_test
);
3788 /* Get rid of locks held by nodes that are gone. */
3790 int dlm_purge_locks(struct dlm_ls
*ls
)
3794 log_debug(ls
, "dlm_purge_locks");
3796 down_write(&ls
->ls_root_sem
);
3797 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
3801 purge_dead_locks(r
);
3807 up_write(&ls
->ls_root_sem
);
3812 static struct dlm_rsb
*find_purged_rsb(struct dlm_ls
*ls
, int bucket
)
3814 struct dlm_rsb
*r
, *r_ret
= NULL
;
3816 read_lock(&ls
->ls_rsbtbl
[bucket
].lock
);
3817 list_for_each_entry(r
, &ls
->ls_rsbtbl
[bucket
].list
, res_hashchain
) {
3818 if (!rsb_flag(r
, RSB_LOCKS_PURGED
))
3821 rsb_clear_flag(r
, RSB_LOCKS_PURGED
);
3825 read_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
3829 void dlm_grant_after_purge(struct dlm_ls
*ls
)
3835 r
= find_purged_rsb(ls
, bucket
);
3837 if (bucket
== ls
->ls_rsbtbl_size
- 1)
3844 grant_pending_locks(r
);
3845 confirm_master(r
, 0);
3853 static struct dlm_lkb
*search_remid_list(struct list_head
*head
, int nodeid
,
3856 struct dlm_lkb
*lkb
;
3858 list_for_each_entry(lkb
, head
, lkb_statequeue
) {
3859 if (lkb
->lkb_nodeid
== nodeid
&& lkb
->lkb_remid
== remid
)
3865 static struct dlm_lkb
*search_remid(struct dlm_rsb
*r
, int nodeid
,
3868 struct dlm_lkb
*lkb
;
3870 lkb
= search_remid_list(&r
->res_grantqueue
, nodeid
, remid
);
3873 lkb
= search_remid_list(&r
->res_convertqueue
, nodeid
, remid
);
3876 lkb
= search_remid_list(&r
->res_waitqueue
, nodeid
, remid
);
3882 static int receive_rcom_lock_args(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
,
3883 struct dlm_rsb
*r
, struct dlm_rcom
*rc
)
3885 struct rcom_lock
*rl
= (struct rcom_lock
*) rc
->rc_buf
;
3888 lkb
->lkb_nodeid
= rc
->rc_header
.h_nodeid
;
3889 lkb
->lkb_ownpid
= rl
->rl_ownpid
;
3890 lkb
->lkb_remid
= rl
->rl_lkid
;
3891 lkb
->lkb_exflags
= rl
->rl_exflags
;
3892 lkb
->lkb_flags
= rl
->rl_flags
& 0x0000FFFF;
3893 lkb
->lkb_flags
|= DLM_IFL_MSTCPY
;
3894 lkb
->lkb_lvbseq
= rl
->rl_lvbseq
;
3895 lkb
->lkb_rqmode
= rl
->rl_rqmode
;
3896 lkb
->lkb_grmode
= rl
->rl_grmode
;
3897 /* don't set lkb_status because add_lkb wants to itself */
3899 lkb
->lkb_bastaddr
= (void *) (long) (rl
->rl_asts
& AST_BAST
);
3900 lkb
->lkb_astaddr
= (void *) (long) (rl
->rl_asts
& AST_COMP
);
3902 if (lkb
->lkb_exflags
& DLM_LKF_VALBLK
) {
3903 lkb
->lkb_lvbptr
= allocate_lvb(ls
);
3904 if (!lkb
->lkb_lvbptr
)
3906 lvblen
= rc
->rc_header
.h_length
- sizeof(struct dlm_rcom
) -
3907 sizeof(struct rcom_lock
);
3908 memcpy(lkb
->lkb_lvbptr
, rl
->rl_lvb
, lvblen
);
3911 /* Conversions between PR and CW (middle modes) need special handling.
3912 The real granted mode of these converting locks cannot be determined
3913 until all locks have been rebuilt on the rsb (recover_conversion) */
3915 if (rl
->rl_wait_type
== DLM_MSG_CONVERT
&& middle_conversion(lkb
)) {
3916 rl
->rl_status
= DLM_LKSTS_CONVERT
;
3917 lkb
->lkb_grmode
= DLM_LOCK_IV
;
3918 rsb_set_flag(r
, RSB_RECOVER_CONVERT
);
3924 /* This lkb may have been recovered in a previous aborted recovery so we need
3925 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3926 If so we just send back a standard reply. If not, we create a new lkb with
3927 the given values and send back our lkid. We send back our lkid by sending
3928 back the rcom_lock struct we got but with the remid field filled in. */
3930 int dlm_recover_master_copy(struct dlm_ls
*ls
, struct dlm_rcom
*rc
)
3932 struct rcom_lock
*rl
= (struct rcom_lock
*) rc
->rc_buf
;
3934 struct dlm_lkb
*lkb
;
3937 if (rl
->rl_parent_lkid
) {
3938 error
= -EOPNOTSUPP
;
3942 error
= find_rsb(ls
, rl
->rl_name
, rl
->rl_namelen
, R_MASTER
, &r
);
3948 lkb
= search_remid(r
, rc
->rc_header
.h_nodeid
, rl
->rl_lkid
);
3954 error
= create_lkb(ls
, &lkb
);
3958 error
= receive_rcom_lock_args(ls
, lkb
, r
, rc
);
3965 add_lkb(r
, lkb
, rl
->rl_status
);
3969 /* this is the new value returned to the lock holder for
3970 saving in its process-copy lkb */
3971 rl
->rl_remid
= lkb
->lkb_id
;
3978 log_print("recover_master_copy %d %x", error
, rl
->rl_lkid
);
3979 rl
->rl_result
= error
;
3983 int dlm_recover_process_copy(struct dlm_ls
*ls
, struct dlm_rcom
*rc
)
3985 struct rcom_lock
*rl
= (struct rcom_lock
*) rc
->rc_buf
;
3987 struct dlm_lkb
*lkb
;
3990 error
= find_lkb(ls
, rl
->rl_lkid
, &lkb
);
3992 log_error(ls
, "recover_process_copy no lkid %x", rl
->rl_lkid
);
3996 DLM_ASSERT(is_process_copy(lkb
), dlm_print_lkb(lkb
););
3998 error
= rl
->rl_result
;
4000 r
= lkb
->lkb_resource
;
4006 /* There's a chance the new master received our lock before
4007 dlm_recover_master_reply(), this wouldn't happen if we did
4008 a barrier between recover_masters and recover_locks. */
4009 log_debug(ls
, "master copy not ready %x r %lx %s", lkb
->lkb_id
,
4010 (unsigned long)r
, r
->res_name
);
4011 dlm_send_rcom_lock(r
, lkb
);
4014 log_debug(ls
, "master copy exists %x", lkb
->lkb_id
);
4017 lkb
->lkb_remid
= rl
->rl_remid
;
4020 log_error(ls
, "dlm_recover_process_copy unknown error %d %x",
4021 error
, lkb
->lkb_id
);
4024 /* an ack for dlm_recover_locks() which waits for replies from
4025 all the locks it sends to new masters */
4026 dlm_recovered_lock(r
);
4035 int dlm_user_request(struct dlm_ls
*ls
, struct dlm_user_args
*ua
,
4036 int mode
, uint32_t flags
, void *name
, unsigned int namelen
,
4037 uint32_t parent_lkid
)
4039 struct dlm_lkb
*lkb
;
4040 struct dlm_args args
;
4045 error
= create_lkb(ls
, &lkb
);
4051 if (flags
& DLM_LKF_VALBLK
) {
4052 ua
->lksb
.sb_lvbptr
= kzalloc(DLM_USER_LVB_LEN
, GFP_KERNEL
);
4053 if (!ua
->lksb
.sb_lvbptr
) {
4061 /* After ua is attached to lkb it will be freed by free_lkb().
4062 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4063 lock and that lkb_astparam is the dlm_user_args structure. */
4065 error
= set_lock_args(mode
, &ua
->lksb
, flags
, namelen
, parent_lkid
,
4066 DLM_FAKE_USER_AST
, ua
, DLM_FAKE_USER_AST
, &args
);
4067 lkb
->lkb_flags
|= DLM_IFL_USER
;
4068 ua
->old_mode
= DLM_LOCK_IV
;
4075 error
= request_lock(ls
, lkb
, name
, namelen
, &args
);
4091 /* add this new lkb to the per-process list of locks */
4092 spin_lock(&ua
->proc
->locks_spin
);
4094 list_add_tail(&lkb
->lkb_ownqueue
, &ua
->proc
->locks
);
4095 spin_unlock(&ua
->proc
->locks_spin
);
4097 unlock_recovery(ls
);
4101 int dlm_user_convert(struct dlm_ls
*ls
, struct dlm_user_args
*ua_tmp
,
4102 int mode
, uint32_t flags
, uint32_t lkid
, char *lvb_in
)
4104 struct dlm_lkb
*lkb
;
4105 struct dlm_args args
;
4106 struct dlm_user_args
*ua
;
4111 error
= find_lkb(ls
, lkid
, &lkb
);
4115 /* user can change the params on its lock when it converts it, or
4116 add an lvb that didn't exist before */
4118 ua
= (struct dlm_user_args
*)lkb
->lkb_astparam
;
4120 if (flags
& DLM_LKF_VALBLK
&& !ua
->lksb
.sb_lvbptr
) {
4121 ua
->lksb
.sb_lvbptr
= kzalloc(DLM_USER_LVB_LEN
, GFP_KERNEL
);
4122 if (!ua
->lksb
.sb_lvbptr
) {
4127 if (lvb_in
&& ua
->lksb
.sb_lvbptr
)
4128 memcpy(ua
->lksb
.sb_lvbptr
, lvb_in
, DLM_USER_LVB_LEN
);
4130 ua
->castparam
= ua_tmp
->castparam
;
4131 ua
->castaddr
= ua_tmp
->castaddr
;
4132 ua
->bastparam
= ua_tmp
->bastparam
;
4133 ua
->bastaddr
= ua_tmp
->bastaddr
;
4134 ua
->user_lksb
= ua_tmp
->user_lksb
;
4135 ua
->old_mode
= lkb
->lkb_grmode
;
4137 error
= set_lock_args(mode
, &ua
->lksb
, flags
, 0, 0, DLM_FAKE_USER_AST
,
4138 ua
, DLM_FAKE_USER_AST
, &args
);
4142 error
= convert_lock(ls
, lkb
, &args
);
4144 if (error
== -EINPROGRESS
|| error
== -EAGAIN
)
4149 unlock_recovery(ls
);
4154 int dlm_user_unlock(struct dlm_ls
*ls
, struct dlm_user_args
*ua_tmp
,
4155 uint32_t flags
, uint32_t lkid
, char *lvb_in
)
4157 struct dlm_lkb
*lkb
;
4158 struct dlm_args args
;
4159 struct dlm_user_args
*ua
;
4164 error
= find_lkb(ls
, lkid
, &lkb
);
4168 ua
= (struct dlm_user_args
*)lkb
->lkb_astparam
;
4170 if (lvb_in
&& ua
->lksb
.sb_lvbptr
)
4171 memcpy(ua
->lksb
.sb_lvbptr
, lvb_in
, DLM_USER_LVB_LEN
);
4172 ua
->castparam
= ua_tmp
->castparam
;
4173 ua
->user_lksb
= ua_tmp
->user_lksb
;
4175 error
= set_unlock_args(flags
, ua
, &args
);
4179 error
= unlock_lock(ls
, lkb
, &args
);
4181 if (error
== -DLM_EUNLOCK
)
4183 /* from validate_unlock_args() */
4184 if (error
== -EBUSY
&& (flags
& DLM_LKF_FORCEUNLOCK
))
4189 spin_lock(&ua
->proc
->locks_spin
);
4190 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4191 if (!list_empty(&lkb
->lkb_ownqueue
))
4192 list_move(&lkb
->lkb_ownqueue
, &ua
->proc
->unlocking
);
4193 spin_unlock(&ua
->proc
->locks_spin
);
4197 unlock_recovery(ls
);
4202 int dlm_user_cancel(struct dlm_ls
*ls
, struct dlm_user_args
*ua_tmp
,
4203 uint32_t flags
, uint32_t lkid
)
4205 struct dlm_lkb
*lkb
;
4206 struct dlm_args args
;
4207 struct dlm_user_args
*ua
;
4212 error
= find_lkb(ls
, lkid
, &lkb
);
4216 ua
= (struct dlm_user_args
*)lkb
->lkb_astparam
;
4217 ua
->castparam
= ua_tmp
->castparam
;
4218 ua
->user_lksb
= ua_tmp
->user_lksb
;
4220 error
= set_unlock_args(flags
, ua
, &args
);
4224 error
= cancel_lock(ls
, lkb
, &args
);
4226 if (error
== -DLM_ECANCEL
)
4228 /* from validate_unlock_args() */
4229 if (error
== -EBUSY
)
4234 unlock_recovery(ls
);
4239 /* lkb's that are removed from the waiters list by revert are just left on the
4240 orphans list with the granted orphan locks, to be freed by purge */
4242 static int orphan_proc_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
4244 struct dlm_user_args
*ua
= (struct dlm_user_args
*)lkb
->lkb_astparam
;
4245 struct dlm_args args
;
4249 mutex_lock(&ls
->ls_orphans_mutex
);
4250 list_add_tail(&lkb
->lkb_ownqueue
, &ls
->ls_orphans
);
4251 mutex_unlock(&ls
->ls_orphans_mutex
);
4253 set_unlock_args(0, ua
, &args
);
4255 error
= cancel_lock(ls
, lkb
, &args
);
4256 if (error
== -DLM_ECANCEL
)
4261 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4262 Regardless of what rsb queue the lock is on, it's removed and freed. */
4264 static int unlock_proc_lock(struct dlm_ls
*ls
, struct dlm_lkb
*lkb
)
4266 struct dlm_user_args
*ua
= (struct dlm_user_args
*)lkb
->lkb_astparam
;
4267 struct dlm_args args
;
4270 set_unlock_args(DLM_LKF_FORCEUNLOCK
, ua
, &args
);
4272 error
= unlock_lock(ls
, lkb
, &args
);
4273 if (error
== -DLM_EUNLOCK
)
4278 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4279 (which does lock_rsb) due to deadlock with receiving a message that does
4280 lock_rsb followed by dlm_user_add_ast() */
4282 static struct dlm_lkb
*del_proc_lock(struct dlm_ls
*ls
,
4283 struct dlm_user_proc
*proc
)
4285 struct dlm_lkb
*lkb
= NULL
;
4287 mutex_lock(&ls
->ls_clear_proc_locks
);
4288 if (list_empty(&proc
->locks
))
4291 lkb
= list_entry(proc
->locks
.next
, struct dlm_lkb
, lkb_ownqueue
);
4292 list_del_init(&lkb
->lkb_ownqueue
);
4294 if (lkb
->lkb_exflags
& DLM_LKF_PERSISTENT
)
4295 lkb
->lkb_flags
|= DLM_IFL_ORPHAN
;
4297 lkb
->lkb_flags
|= DLM_IFL_DEAD
;
4299 mutex_unlock(&ls
->ls_clear_proc_locks
);
4303 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4304 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4305 which we clear here. */
4307 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4308 list, and no more device_writes should add lkb's to proc->locks list; so we
4309 shouldn't need to take asts_spin or locks_spin here. this assumes that
4310 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4313 void dlm_clear_proc_locks(struct dlm_ls
*ls
, struct dlm_user_proc
*proc
)
4315 struct dlm_lkb
*lkb
, *safe
;
4320 lkb
= del_proc_lock(ls
, proc
);
4323 if (lkb
->lkb_exflags
& DLM_LKF_PERSISTENT
)
4324 orphan_proc_lock(ls
, lkb
);
4326 unlock_proc_lock(ls
, lkb
);
4328 /* this removes the reference for the proc->locks list
4329 added by dlm_user_request, it may result in the lkb
4335 mutex_lock(&ls
->ls_clear_proc_locks
);
4337 /* in-progress unlocks */
4338 list_for_each_entry_safe(lkb
, safe
, &proc
->unlocking
, lkb_ownqueue
) {
4339 list_del_init(&lkb
->lkb_ownqueue
);
4340 lkb
->lkb_flags
|= DLM_IFL_DEAD
;
4344 list_for_each_entry_safe(lkb
, safe
, &proc
->asts
, lkb_astqueue
) {
4345 list_del(&lkb
->lkb_astqueue
);
4349 mutex_unlock(&ls
->ls_clear_proc_locks
);
4350 unlock_recovery(ls
);
4353 static void purge_proc_locks(struct dlm_ls
*ls
, struct dlm_user_proc
*proc
)
4355 struct dlm_lkb
*lkb
, *safe
;
4359 spin_lock(&proc
->locks_spin
);
4360 if (!list_empty(&proc
->locks
)) {
4361 lkb
= list_entry(proc
->locks
.next
, struct dlm_lkb
,
4363 list_del_init(&lkb
->lkb_ownqueue
);
4365 spin_unlock(&proc
->locks_spin
);
4370 lkb
->lkb_flags
|= DLM_IFL_DEAD
;
4371 unlock_proc_lock(ls
, lkb
);
4372 dlm_put_lkb(lkb
); /* ref from proc->locks list */
4375 spin_lock(&proc
->locks_spin
);
4376 list_for_each_entry_safe(lkb
, safe
, &proc
->unlocking
, lkb_ownqueue
) {
4377 list_del_init(&lkb
->lkb_ownqueue
);
4378 lkb
->lkb_flags
|= DLM_IFL_DEAD
;
4381 spin_unlock(&proc
->locks_spin
);
4383 spin_lock(&proc
->asts_spin
);
4384 list_for_each_entry_safe(lkb
, safe
, &proc
->asts
, lkb_astqueue
) {
4385 list_del(&lkb
->lkb_astqueue
);
4388 spin_unlock(&proc
->asts_spin
);
4391 /* pid of 0 means purge all orphans */
4393 static void do_purge(struct dlm_ls
*ls
, int nodeid
, int pid
)
4395 struct dlm_lkb
*lkb
, *safe
;
4397 mutex_lock(&ls
->ls_orphans_mutex
);
4398 list_for_each_entry_safe(lkb
, safe
, &ls
->ls_orphans
, lkb_ownqueue
) {
4399 if (pid
&& lkb
->lkb_ownpid
!= pid
)
4401 unlock_proc_lock(ls
, lkb
);
4402 list_del_init(&lkb
->lkb_ownqueue
);
4405 mutex_unlock(&ls
->ls_orphans_mutex
);
4408 static int send_purge(struct dlm_ls
*ls
, int nodeid
, int pid
)
4410 struct dlm_message
*ms
;
4411 struct dlm_mhandle
*mh
;
4414 error
= _create_message(ls
, sizeof(struct dlm_message
), nodeid
,
4415 DLM_MSG_PURGE
, &ms
, &mh
);
4418 ms
->m_nodeid
= nodeid
;
4421 return send_message(mh
, ms
);
4424 int dlm_user_purge(struct dlm_ls
*ls
, struct dlm_user_proc
*proc
,
4425 int nodeid
, int pid
)
4429 if (nodeid
!= dlm_our_nodeid()) {
4430 error
= send_purge(ls
, nodeid
, pid
);
4433 if (pid
== current
->pid
)
4434 purge_proc_locks(ls
, proc
);
4436 do_purge(ls
, nodeid
, pid
);
4437 unlock_recovery(ls
);