1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
27 * We use the upper 16 bits of the hash value to select the directory node.
28 * Low bits are used for distribution of rsb's among hash buckets on each node.
30 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
31 * num_nodes to the hash value. This value in the desired range is used as an
32 * offset into the sorted list of nodeid's to give the particular nodeid.
35 int dlm_hash2nodeid(struct dlm_ls
*ls
, uint32_t hash
)
39 if (ls
->ls_num_nodes
== 1)
40 return dlm_our_nodeid();
42 node
= (hash
>> 16) % ls
->ls_total_weight
;
43 return ls
->ls_node_array
[node
];
47 int dlm_dir_nodeid(struct dlm_rsb
*r
)
49 return r
->res_dir_nodeid
;
52 void dlm_recover_dir_nodeid(struct dlm_ls
*ls
)
56 down_read(&ls
->ls_root_sem
);
57 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
58 r
->res_dir_nodeid
= dlm_hash2nodeid(ls
, r
->res_hash
);
60 up_read(&ls
->ls_root_sem
);
63 int dlm_recover_directory(struct dlm_ls
*ls
)
65 struct dlm_member
*memb
;
66 char *b
, *last_name
= NULL
;
67 int error
= -ENOMEM
, last_len
, nodeid
, result
;
69 unsigned int count
= 0, count_match
= 0, count_bad
= 0, count_add
= 0;
71 log_debug(ls
, "dlm_recover_directory");
73 if (dlm_no_directory(ls
))
76 last_name
= kmalloc(DLM_RESNAME_MAXLEN
, GFP_NOFS
);
80 list_for_each_entry(memb
, &ls
->ls_nodes
, list
) {
81 if (memb
->nodeid
== dlm_our_nodeid())
84 memset(last_name
, 0, DLM_RESNAME_MAXLEN
);
89 error
= dlm_recovery_stopped(ls
);
93 error
= dlm_rcom_names(ls
, memb
->nodeid
,
101 * pick namelen/name pairs out of received buffer
104 b
= ls
->ls_recover_buf
->rc_buf
;
105 left
= ls
->ls_recover_buf
->rc_header
.h_length
;
106 left
-= sizeof(struct dlm_rcom
);
112 if (left
< sizeof(__be16
))
115 memcpy(&v
, b
, sizeof(__be16
));
116 namelen
= be16_to_cpu(v
);
118 left
-= sizeof(__be16
);
120 /* namelen of 0xFFFFF marks end of names for
121 this node; namelen of 0 marks end of the
124 if (namelen
== 0xFFFF)
132 if (namelen
> DLM_RESNAME_MAXLEN
)
135 error
= dlm_master_lookup(ls
, memb
->nodeid
,
140 log_error(ls
, "recover_dir lookup %d",
145 /* The name was found in rsbtbl, but the
146 * master nodeid is different from
147 * memb->nodeid which says it is the master.
148 * This should not happen. */
150 if (result
== DLM_LU_MATCH
&&
151 nodeid
!= memb
->nodeid
) {
153 log_error(ls
, "recover_dir lookup %d "
154 "nodeid %d memb %d bad %u",
155 result
, nodeid
, memb
->nodeid
,
157 print_hex_dump_bytes("dlm_recover_dir ",
162 /* The name was found in rsbtbl, and the
163 * master nodeid matches memb->nodeid. */
165 if (result
== DLM_LU_MATCH
&&
166 nodeid
== memb
->nodeid
) {
170 /* The name was not found in rsbtbl and was
171 * added with memb->nodeid as the master. */
173 if (result
== DLM_LU_ADD
) {
178 memcpy(last_name
, b
, namelen
);
190 dlm_set_recover_status(ls
, DLM_RS_DIR
);
192 log_debug(ls
, "dlm_recover_directory %u in %u new",
200 static struct dlm_rsb
*find_rsb_root(struct dlm_ls
*ls
, char *name
, int len
)
203 uint32_t hash
, bucket
;
206 hash
= jhash(name
, len
, 0);
207 bucket
= hash
& (ls
->ls_rsbtbl_size
- 1);
209 spin_lock(&ls
->ls_rsbtbl
[bucket
].lock
);
210 rv
= dlm_search_rsb_tree(&ls
->ls_rsbtbl
[bucket
].keep
, name
, len
, &r
);
212 rv
= dlm_search_rsb_tree(&ls
->ls_rsbtbl
[bucket
].toss
,
214 spin_unlock(&ls
->ls_rsbtbl
[bucket
].lock
);
219 down_read(&ls
->ls_root_sem
);
220 list_for_each_entry(r
, &ls
->ls_root_list
, res_root_list
) {
221 if (len
== r
->res_length
&& !memcmp(name
, r
->res_name
, len
)) {
222 up_read(&ls
->ls_root_sem
);
223 log_debug(ls
, "find_rsb_root revert to root_list %s",
228 up_read(&ls
->ls_root_sem
);
232 /* Find the rsb where we left off (or start again), then send rsb names
233 for rsb's we're master of and whose directory node matches the requesting
234 node. inbuf is the rsb name last sent, inlen is the name's length */
236 void dlm_copy_master_names(struct dlm_ls
*ls
, char *inbuf
, int inlen
,
237 char *outbuf
, int outlen
, int nodeid
)
239 struct list_head
*list
;
241 int offset
= 0, dir_nodeid
;
244 down_read(&ls
->ls_root_sem
);
247 r
= find_rsb_root(ls
, inbuf
, inlen
);
249 inbuf
[inlen
- 1] = '\0';
250 log_error(ls
, "copy_master_names from %d start %d %s",
251 nodeid
, inlen
, inbuf
);
254 list
= r
->res_root_list
.next
;
256 list
= ls
->ls_root_list
.next
;
259 for (offset
= 0; list
!= &ls
->ls_root_list
; list
= list
->next
) {
260 r
= list_entry(list
, struct dlm_rsb
, res_root_list
);
264 dir_nodeid
= dlm_dir_nodeid(r
);
265 if (dir_nodeid
!= nodeid
)
269 * The block ends when we can't fit the following in the
270 * remaining buffer space:
271 * namelen (uint16_t) +
272 * name (r->res_length) +
273 * end-of-block record 0x0000 (uint16_t)
276 if (offset
+ sizeof(uint16_t)*2 + r
->res_length
> outlen
) {
277 /* Write end-of-block record */
278 be_namelen
= cpu_to_be16(0);
279 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
280 offset
+= sizeof(__be16
);
281 ls
->ls_recover_dir_sent_msg
++;
285 be_namelen
= cpu_to_be16(r
->res_length
);
286 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
287 offset
+= sizeof(__be16
);
288 memcpy(outbuf
+ offset
, r
->res_name
, r
->res_length
);
289 offset
+= r
->res_length
;
290 ls
->ls_recover_dir_sent_res
++;
294 * If we've reached the end of the list (and there's room) write a
295 * terminating record.
298 if ((list
== &ls
->ls_root_list
) &&
299 (offset
+ sizeof(uint16_t) <= outlen
)) {
300 be_namelen
= cpu_to_be16(0xFFFF);
301 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(__be16
));
302 offset
+= sizeof(__be16
);
303 ls
->ls_recover_dir_sent_msg
++;
306 up_read(&ls
->ls_root_sem
);