1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include "dlm_internal.h"
13 #include "lockspace.h"
25 * We use the upper 16 bits of the hash value to select the directory node.
26 * Low bits are used for distribution of rsb's among hash buckets on each node.
28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29 * num_nodes to the hash value. This value in the desired range is used as an
30 * offset into the sorted list of nodeid's to give the particular nodeid.
33 int dlm_hash2nodeid(struct dlm_ls
*ls
, uint32_t hash
)
37 if (ls
->ls_num_nodes
== 1)
38 return dlm_our_nodeid();
40 node
= (hash
>> 16) % ls
->ls_total_weight
;
41 return ls
->ls_node_array
[node
];
45 int dlm_dir_nodeid(struct dlm_rsb
*r
)
47 return r
->res_dir_nodeid
;
50 void dlm_recover_dir_nodeid(struct dlm_ls
*ls
, const struct list_head
*root_list
)
54 list_for_each_entry(r
, root_list
, res_root_list
) {
55 r
->res_dir_nodeid
= dlm_hash2nodeid(ls
, r
->res_hash
);
59 int dlm_recover_directory(struct dlm_ls
*ls
, uint64_t seq
)
61 struct dlm_member
*memb
;
62 char *b
, *last_name
= NULL
;
63 int error
= -ENOMEM
, last_len
, nodeid
, result
;
65 unsigned int count
= 0, count_match
= 0, count_bad
= 0, count_add
= 0;
67 log_rinfo(ls
, "dlm_recover_directory");
69 if (dlm_no_directory(ls
))
72 last_name
= kmalloc(DLM_RESNAME_MAXLEN
, GFP_NOFS
);
76 list_for_each_entry(memb
, &ls
->ls_nodes
, list
) {
77 if (memb
->nodeid
== dlm_our_nodeid())
80 memset(last_name
, 0, DLM_RESNAME_MAXLEN
);
85 if (dlm_recovery_stopped(ls
)) {
90 error
= dlm_rcom_names(ls
, memb
->nodeid
,
91 last_name
, last_len
, seq
);
98 * pick namelen/name pairs out of received buffer
101 b
= ls
->ls_recover_buf
->rc_buf
;
102 left
= le16_to_cpu(ls
->ls_recover_buf
->rc_header
.h_length
);
103 left
-= sizeof(struct dlm_rcom
);
109 if (left
< sizeof(__be16
))
112 memcpy(&v
, b
, sizeof(__be16
));
113 namelen
= be16_to_cpu(v
);
115 left
-= sizeof(__be16
);
117 /* namelen of 0xFFFFF marks end of names for
118 this node; namelen of 0 marks end of the
121 if (namelen
== 0xFFFF)
129 if (namelen
> DLM_RESNAME_MAXLEN
)
132 error
= dlm_master_lookup(ls
, memb
->nodeid
,
137 log_error(ls
, "recover_dir lookup %d",
142 /* The name was found in rsbtbl, but the
143 * master nodeid is different from
144 * memb->nodeid which says it is the master.
145 * This should not happen. */
147 if (result
== DLM_LU_MATCH
&&
148 nodeid
!= memb
->nodeid
) {
150 log_error(ls
, "recover_dir lookup %d "
151 "nodeid %d memb %d bad %u",
152 result
, nodeid
, memb
->nodeid
,
154 print_hex_dump_bytes("dlm_recover_dir ",
159 /* The name was found in rsbtbl, and the
160 * master nodeid matches memb->nodeid. */
162 if (result
== DLM_LU_MATCH
&&
163 nodeid
== memb
->nodeid
) {
167 /* The name was not found in rsbtbl and was
168 * added with memb->nodeid as the master. */
170 if (result
== DLM_LU_ADD
) {
175 memcpy(last_name
, b
, namelen
);
187 dlm_set_recover_status(ls
, DLM_RS_DIR
);
189 log_rinfo(ls
, "dlm_recover_directory %u in %u new",
197 static struct dlm_rsb
*find_rsb_root(struct dlm_ls
*ls
, const char *name
,
203 read_lock_bh(&ls
->ls_rsbtbl_lock
);
204 rv
= dlm_search_rsb_tree(&ls
->ls_rsbtbl
, name
, len
, &r
);
205 read_unlock_bh(&ls
->ls_rsbtbl_lock
);
209 list_for_each_entry(r
, &ls
->ls_masters_list
, res_masters_list
) {
210 if (len
== r
->res_length
&& !memcmp(name
, r
->res_name
, len
)) {
211 log_debug(ls
, "find_rsb_root revert to root_list %s",
219 struct dlm_dir_dump
{
220 /* init values to match if whole
221 * dump fits to one seq. Sanity check only.
224 uint64_t nodeid_init
;
225 /* compare local pointer with last lookup,
226 * just a sanity check.
228 struct list_head
*last
;
230 unsigned int sent_res
; /* for log info */
231 unsigned int sent_msg
; /* for log info */
233 struct list_head list
;
236 static void drop_dir_ctx(struct dlm_ls
*ls
, int nodeid
)
238 struct dlm_dir_dump
*dd
, *safe
;
240 write_lock_bh(&ls
->ls_dir_dump_lock
);
241 list_for_each_entry_safe(dd
, safe
, &ls
->ls_dir_dump_list
, list
) {
242 if (dd
->nodeid_init
== nodeid
) {
243 log_error(ls
, "drop dump seq %llu",
244 (unsigned long long)dd
->seq_init
);
249 write_unlock_bh(&ls
->ls_dir_dump_lock
);
252 static struct dlm_dir_dump
*lookup_dir_dump(struct dlm_ls
*ls
, int nodeid
)
254 struct dlm_dir_dump
*iter
, *dd
= NULL
;
256 read_lock_bh(&ls
->ls_dir_dump_lock
);
257 list_for_each_entry(iter
, &ls
->ls_dir_dump_list
, list
) {
258 if (iter
->nodeid_init
== nodeid
) {
263 read_unlock_bh(&ls
->ls_dir_dump_lock
);
268 static struct dlm_dir_dump
*init_dir_dump(struct dlm_ls
*ls
, int nodeid
)
270 struct dlm_dir_dump
*dd
;
272 dd
= lookup_dir_dump(ls
, nodeid
);
274 log_error(ls
, "found ongoing dir dump for node %d, will drop it",
276 drop_dir_ctx(ls
, nodeid
);
279 dd
= kzalloc(sizeof(*dd
), GFP_ATOMIC
);
283 dd
->seq_init
= ls
->ls_recover_seq
;
284 dd
->nodeid_init
= nodeid
;
286 write_lock_bh(&ls
->ls_dir_dump_lock
);
287 list_add(&dd
->list
, &ls
->ls_dir_dump_list
);
288 write_unlock_bh(&ls
->ls_dir_dump_lock
);
293 /* Find the rsb where we left off (or start again), then send rsb names
294 for rsb's we're master of and whose directory node matches the requesting
295 node. inbuf is the rsb name last sent, inlen is the name's length */
297 void dlm_copy_master_names(struct dlm_ls
*ls
, const char *inbuf
, int inlen
,
298 char *outbuf
, int outlen
, int nodeid
)
300 struct list_head
*list
;
302 int offset
= 0, dir_nodeid
;
303 struct dlm_dir_dump
*dd
;
306 read_lock_bh(&ls
->ls_masters_lock
);
309 dd
= lookup_dir_dump(ls
, nodeid
);
311 log_error(ls
, "failed to lookup dir dump context nodeid: %d",
316 /* next chunk in dump */
317 r
= find_rsb_root(ls
, inbuf
, inlen
);
319 log_error(ls
, "copy_master_names from %d start %d %.*s",
320 nodeid
, inlen
, inlen
, inbuf
);
323 list
= r
->res_masters_list
.next
;
326 if (dd
->last
!= &r
->res_masters_list
||
327 dd
->seq_init
!= ls
->ls_recover_seq
) {
328 log_error(ls
, "failed dir dump sanity check seq_init: %llu seq: %llu",
329 (unsigned long long)dd
->seq_init
,
330 (unsigned long long)ls
->ls_recover_seq
);
334 dd
= init_dir_dump(ls
, nodeid
);
336 log_error(ls
, "failed to allocate dir dump context");
341 list
= ls
->ls_masters_list
.next
;
345 for (offset
= 0; list
!= &ls
->ls_masters_list
; list
= list
->next
) {
346 r
= list_entry(list
, struct dlm_rsb
, res_masters_list
);
347 dir_nodeid
= dlm_dir_nodeid(r
);
348 if (dir_nodeid
!= nodeid
)
352 * The block ends when we can't fit the following in the
353 * remaining buffer space:
354 * namelen (uint16_t) +
355 * name (r->res_length) +
356 * end-of-block record 0x0000 (uint16_t)
359 if (offset
+ sizeof(uint16_t)*2 + r
->res_length
> outlen
) {
360 /* Write end-of-block record */
361 be_namelen
= cpu_to_be16(0);
362 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
363 offset
+= sizeof(__be16
);
368 be_namelen
= cpu_to_be16(r
->res_length
);
369 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
370 offset
+= sizeof(__be16
);
371 memcpy(outbuf
+ offset
, r
->res_name
, r
->res_length
);
372 offset
+= r
->res_length
;
378 * If we've reached the end of the list (and there's room) write a
379 * terminating record.
382 if ((list
== &ls
->ls_masters_list
) &&
383 (offset
+ sizeof(uint16_t) <= outlen
)) {
385 be_namelen
= cpu_to_be16(0xFFFF);
386 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
387 offset
+= sizeof(__be16
);
389 log_rinfo(ls
, "dlm_recover_directory nodeid %d sent %u res out %u messages",
390 nodeid
, dd
->sent_res
, dd
->sent_msg
);
392 write_lock_bh(&ls
->ls_dir_dump_lock
);
393 list_del_init(&dd
->list
);
394 write_unlock_bh(&ls
->ls_dir_dump_lock
);
398 read_unlock_bh(&ls
->ls_masters_lock
);