1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include "dlm_internal.h"
13 #include "lockspace.h"
25 * We use the upper 16 bits of the hash value to select the directory node.
26 * Low bits are used for distribution of rsb's among hash buckets on each node.
28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29 * num_nodes to the hash value. This value in the desired range is used as an
30 * offset into the sorted list of nodeid's to give the particular nodeid.
33 int dlm_hash2nodeid(struct dlm_ls
*ls
, uint32_t hash
)
37 if (ls
->ls_num_nodes
== 1)
38 return dlm_our_nodeid();
40 node
= (hash
>> 16) % ls
->ls_total_weight
;
41 return ls
->ls_node_array
[node
];
45 int dlm_dir_nodeid(struct dlm_rsb
*r
)
47 return r
->res_dir_nodeid
;
50 void dlm_recover_dir_nodeid(struct dlm_ls
*ls
)
54 down_read(&ls
->ls_root_sem
);
55 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
56 r
->res_dir_nodeid
= dlm_hash2nodeid(ls
, r
->res_hash
);
58 up_read(&ls
->ls_root_sem
);
61 int dlm_recover_directory(struct dlm_ls
*ls
)
63 struct dlm_member
*memb
;
64 char *b
, *last_name
= NULL
;
65 int error
= -ENOMEM
, last_len
, nodeid
, result
;
67 unsigned int count
= 0, count_match
= 0, count_bad
= 0, count_add
= 0;
69 log_rinfo(ls
, "dlm_recover_directory");
71 if (dlm_no_directory(ls
))
74 last_name
= kmalloc(DLM_RESNAME_MAXLEN
, GFP_NOFS
);
78 list_for_each_entry(memb
, &ls
->ls_nodes
, list
) {
79 if (memb
->nodeid
== dlm_our_nodeid())
82 memset(last_name
, 0, DLM_RESNAME_MAXLEN
);
87 error
= dlm_recovery_stopped(ls
);
91 error
= dlm_rcom_names(ls
, memb
->nodeid
,
99 * pick namelen/name pairs out of received buffer
102 b
= ls
->ls_recover_buf
->rc_buf
;
103 left
= ls
->ls_recover_buf
->rc_header
.h_length
;
104 left
-= sizeof(struct dlm_rcom
);
110 if (left
< sizeof(__be16
))
113 memcpy(&v
, b
, sizeof(__be16
));
114 namelen
= be16_to_cpu(v
);
116 left
-= sizeof(__be16
);
118 /* namelen of 0xFFFFF marks end of names for
119 this node; namelen of 0 marks end of the
122 if (namelen
== 0xFFFF)
130 if (namelen
> DLM_RESNAME_MAXLEN
)
133 error
= dlm_master_lookup(ls
, memb
->nodeid
,
138 log_error(ls
, "recover_dir lookup %d",
143 /* The name was found in rsbtbl, but the
144 * master nodeid is different from
145 * memb->nodeid which says it is the master.
146 * This should not happen. */
148 if (result
== DLM_LU_MATCH
&&
149 nodeid
!= memb
->nodeid
) {
151 log_error(ls
, "recover_dir lookup %d "
152 "nodeid %d memb %d bad %u",
153 result
, nodeid
, memb
->nodeid
,
155 print_hex_dump_bytes("dlm_recover_dir ",
160 /* The name was found in rsbtbl, and the
161 * master nodeid matches memb->nodeid. */
163 if (result
== DLM_LU_MATCH
&&
164 nodeid
== memb
->nodeid
) {
168 /* The name was not found in rsbtbl and was
169 * added with memb->nodeid as the master. */
171 if (result
== DLM_LU_ADD
) {
176 memcpy(last_name
, b
, namelen
);
188 dlm_set_recover_status(ls
, DLM_RS_DIR
);
190 log_rinfo(ls
, "dlm_recover_directory %u in %u new",
198 static struct dlm_rsb
*find_rsb_root(struct dlm_ls
*ls
, char *name
, int len
)
201 uint32_t hash
, bucket
;
204 hash
= jhash(name
, len
, 0);
205 bucket
= hash
& (ls
->ls_rsbtbl_size
- 1);
207 spin_lock(&ls
->ls_rsbtbl
[bucket
].lock
);
208 rv
= dlm_search_rsb_tree(&ls
->ls_rsbtbl
[bucket
].keep
, name
, len
, &r
);
210 rv
= dlm_search_rsb_tree(&ls
->ls_rsbtbl
[bucket
].toss
,
212 spin_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
217 down_read(&ls
->ls_root_sem
);
218 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
219 if (len
== r
->res_length
&& !memcmp(name
, r
->res_name
, len
)) {
220 up_read(&ls
->ls_root_sem
);
221 log_debug(ls
, "find_rsb_root revert to root_list %s",
226 up_read(&ls
->ls_root_sem
);
230 /* Find the rsb where we left off (or start again), then send rsb names
231 for rsb's we're master of and whose directory node matches the requesting
232 node. inbuf is the rsb name last sent, inlen is the name's length */
234 void dlm_copy_master_names(struct dlm_ls
*ls
, char *inbuf
, int inlen
,
235 char *outbuf
, int outlen
, int nodeid
)
237 struct list_head
*list
;
239 int offset
= 0, dir_nodeid
;
242 down_read(&ls
->ls_root_sem
);
245 r
= find_rsb_root(ls
, inbuf
, inlen
);
247 inbuf
[inlen
- 1] = '\0';
248 log_error(ls
, "copy_master_names from %d start %d %s",
249 nodeid
, inlen
, inbuf
);
252 list
= r
->res_root_list
.next
;
254 list
= ls
->ls_root_list
.next
;
257 for (offset
= 0; list
!= &ls
->ls_root_list
; list
= list
->next
) {
258 r
= list_entry(list
, struct dlm_rsb
, res_root_list
);
262 dir_nodeid
= dlm_dir_nodeid(r
);
263 if (dir_nodeid
!= nodeid
)
267 * The block ends when we can't fit the following in the
268 * remaining buffer space:
269 * namelen (uint16_t) +
270 * name (r->res_length) +
271 * end-of-block record 0x0000 (uint16_t)
274 if (offset
+ sizeof(uint16_t)*2 + r
->res_length
> outlen
) {
275 /* Write end-of-block record */
276 be_namelen
= cpu_to_be16(0);
277 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
278 offset
+= sizeof(__be16
);
279 ls
->ls_recover_dir_sent_msg
++;
283 be_namelen
= cpu_to_be16(r
->res_length
);
284 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
285 offset
+= sizeof(__be16
);
286 memcpy(outbuf
+ offset
, r
->res_name
, r
->res_length
);
287 offset
+= r
->res_length
;
288 ls
->ls_recover_dir_sent_res
++;
292 * If we've reached the end of the list (and there's room) write a
293 * terminating record.
296 if ((list
== &ls
->ls_root_list
) &&
297 (offset
+ sizeof(uint16_t) <= outlen
)) {
298 be_namelen
= cpu_to_be16(0xFFFF);
299 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
300 offset
+= sizeof(__be16
);
301 ls
->ls_recover_dir_sent_msg
++;
304 up_read(&ls
->ls_root_sem
);