sync hh.org
[hh.org.git] / fs / dlm / dir.c
blob46754553fdcc4ae06799d7f0a1c9a102c4a6900f
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
29 spin_lock(&ls->ls_recover_list_lock);
30 list_add(&de->list, &ls->ls_recover_list);
31 spin_unlock(&ls->ls_recover_list_lock);
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
36 int found = 0;
37 struct dlm_direntry *de;
39 spin_lock(&ls->ls_recover_list_lock);
40 list_for_each_entry(de, &ls->ls_recover_list, list) {
41 if (de->length == len) {
42 list_del(&de->list);
43 de->master_nodeid = 0;
44 memset(de->name, 0, len);
45 found = 1;
46 break;
49 spin_unlock(&ls->ls_recover_list_lock);
51 if (!found)
52 de = allocate_direntry(ls, len);
53 return de;
56 void dlm_clear_free_entries(struct dlm_ls *ls)
58 struct dlm_direntry *de;
60 spin_lock(&ls->ls_recover_list_lock);
61 while (!list_empty(&ls->ls_recover_list)) {
62 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 list);
64 list_del(&de->list);
65 free_direntry(de);
67 spin_unlock(&ls->ls_recover_list_lock);
71 * We use the upper 16 bits of the hash value to select the directory node.
72 * Low bits are used for distribution of rsb's among hash buckets on each node.
74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75 * num_nodes to the hash value. This value in the desired range is used as an
76 * offset into the sorted list of nodeid's to give the particular nodeid.
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
81 struct list_head *tmp;
82 struct dlm_member *memb = NULL;
83 uint32_t node, n = 0;
84 int nodeid;
86 if (ls->ls_num_nodes == 1) {
87 nodeid = dlm_our_nodeid();
88 goto out;
91 if (ls->ls_node_array) {
92 node = (hash >> 16) % ls->ls_total_weight;
93 nodeid = ls->ls_node_array[node];
94 goto out;
97 /* make_member_array() failed to kmalloc ls_node_array... */
99 node = (hash >> 16) % ls->ls_num_nodes;
101 list_for_each(tmp, &ls->ls_nodes) {
102 if (n++ != node)
103 continue;
104 memb = list_entry(tmp, struct dlm_member, list);
105 break;
108 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 ls->ls_num_nodes, n, node););
110 nodeid = memb->nodeid;
111 out:
112 return nodeid;
115 int dlm_dir_nodeid(struct dlm_rsb *r)
117 return dlm_hash2nodeid(r->res_ls, r->res_hash);
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
122 uint32_t val;
124 val = jhash(name, len, 0);
125 val &= (ls->ls_dirtbl_size - 1);
127 return val;
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
132 uint32_t bucket;
134 bucket = dir_hash(ls, de->name, de->length);
135 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139 int namelen, uint32_t bucket)
141 struct dlm_direntry *de;
143 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 if (de->length == namelen && !memcmp(name, de->name, namelen))
145 goto out;
147 de = NULL;
148 out:
149 return de;
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
154 struct dlm_direntry *de;
155 uint32_t bucket;
157 bucket = dir_hash(ls, name, namelen);
159 write_lock(&ls->ls_dirtbl[bucket].lock);
161 de = search_bucket(ls, name, namelen, bucket);
163 if (!de) {
164 log_error(ls, "remove fr %u none", nodeid);
165 goto out;
168 if (de->master_nodeid != nodeid) {
169 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 goto out;
173 list_del(&de->list);
174 free_direntry(de);
175 out:
176 write_unlock(&ls->ls_dirtbl[bucket].lock);
179 void dlm_dir_clear(struct dlm_ls *ls)
181 struct list_head *head;
182 struct dlm_direntry *de;
183 int i;
185 DLM_ASSERT(list_empty(&ls->ls_recover_list), );
187 for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 write_lock(&ls->ls_dirtbl[i].lock);
189 head = &ls->ls_dirtbl[i].list;
190 while (!list_empty(head)) {
191 de = list_entry(head->next, struct dlm_direntry, list);
192 list_del(&de->list);
193 put_free_de(ls, de);
195 write_unlock(&ls->ls_dirtbl[i].lock);
199 int dlm_recover_directory(struct dlm_ls *ls)
201 struct dlm_member *memb;
202 struct dlm_direntry *de;
203 char *b, *last_name = NULL;
204 int error = -ENOMEM, last_len, count = 0;
205 uint16_t namelen;
207 log_debug(ls, "dlm_recover_directory");
209 if (dlm_no_directory(ls))
210 goto out_status;
212 dlm_dir_clear(ls);
214 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL);
215 if (!last_name)
216 goto out;
218 list_for_each_entry(memb, &ls->ls_nodes, list) {
219 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 last_len = 0;
222 for (;;) {
223 error = dlm_recovery_stopped(ls);
224 if (error)
225 goto out_free;
227 error = dlm_rcom_names(ls, memb->nodeid,
228 last_name, last_len);
229 if (error)
230 goto out_free;
232 schedule();
235 * pick namelen/name pairs out of received buffer
238 b = ls->ls_recover_buf + sizeof(struct dlm_rcom);
240 for (;;) {
241 memcpy(&namelen, b, sizeof(uint16_t));
242 namelen = be16_to_cpu(namelen);
243 b += sizeof(uint16_t);
245 /* namelen of 0xFFFFF marks end of names for
246 this node; namelen of 0 marks end of the
247 buffer */
249 if (namelen == 0xFFFF)
250 goto done;
251 if (!namelen)
252 break;
254 error = -ENOMEM;
255 de = get_free_de(ls, namelen);
256 if (!de)
257 goto out_free;
259 de->master_nodeid = memb->nodeid;
260 de->length = namelen;
261 last_len = namelen;
262 memcpy(de->name, b, namelen);
263 memcpy(last_name, b, namelen);
264 b += namelen;
266 add_entry_to_hash(ls, de);
267 count++;
270 done:
274 out_status:
275 error = 0;
276 dlm_set_recover_status(ls, DLM_RS_DIR);
277 log_debug(ls, "dlm_recover_directory %d entries", count);
278 out_free:
279 kfree(last_name);
280 out:
281 dlm_clear_free_entries(ls);
282 return error;
285 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
286 int namelen, int *r_nodeid)
288 struct dlm_direntry *de, *tmp;
289 uint32_t bucket;
291 bucket = dir_hash(ls, name, namelen);
293 write_lock(&ls->ls_dirtbl[bucket].lock);
294 de = search_bucket(ls, name, namelen, bucket);
295 if (de) {
296 *r_nodeid = de->master_nodeid;
297 write_unlock(&ls->ls_dirtbl[bucket].lock);
298 if (*r_nodeid == nodeid)
299 return -EEXIST;
300 return 0;
303 write_unlock(&ls->ls_dirtbl[bucket].lock);
305 de = allocate_direntry(ls, namelen);
306 if (!de)
307 return -ENOMEM;
309 de->master_nodeid = nodeid;
310 de->length = namelen;
311 memcpy(de->name, name, namelen);
313 write_lock(&ls->ls_dirtbl[bucket].lock);
314 tmp = search_bucket(ls, name, namelen, bucket);
315 if (tmp) {
316 free_direntry(de);
317 de = tmp;
318 } else {
319 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
321 *r_nodeid = de->master_nodeid;
322 write_unlock(&ls->ls_dirtbl[bucket].lock);
323 return 0;
326 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
327 int *r_nodeid)
329 return get_entry(ls, nodeid, name, namelen, r_nodeid);
332 /* Copy the names of master rsb's into the buffer provided.
333 Only select names whose dir node is the given nodeid. */
335 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
336 char *outbuf, int outlen, int nodeid)
338 struct list_head *list;
339 struct dlm_rsb *start_r = NULL, *r = NULL;
340 int offset = 0, start_namelen, error, dir_nodeid;
341 char *start_name;
342 uint16_t be_namelen;
345 * Find the rsb where we left off (or start again)
348 start_namelen = inlen;
349 start_name = inbuf;
351 if (start_namelen > 1) {
353 * We could also use a find_rsb_root() function here that
354 * searched the ls_root_list.
356 error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER,
357 &start_r);
358 DLM_ASSERT(!error && start_r,
359 printk("error %d\n", error););
360 DLM_ASSERT(!list_empty(&start_r->res_root_list),
361 dlm_print_rsb(start_r););
362 dlm_put_rsb(start_r);
366 * Send rsb names for rsb's we're master of and whose directory node
367 * matches the requesting node.
370 down_read(&ls->ls_root_sem);
371 if (start_r)
372 list = start_r->res_root_list.next;
373 else
374 list = ls->ls_root_list.next;
376 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
377 r = list_entry(list, struct dlm_rsb, res_root_list);
378 if (r->res_nodeid)
379 continue;
381 dir_nodeid = dlm_dir_nodeid(r);
382 if (dir_nodeid != nodeid)
383 continue;
386 * The block ends when we can't fit the following in the
387 * remaining buffer space:
388 * namelen (uint16_t) +
389 * name (r->res_length) +
390 * end-of-block record 0x0000 (uint16_t)
393 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
394 /* Write end-of-block record */
395 be_namelen = 0;
396 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
397 offset += sizeof(uint16_t);
398 goto out;
401 be_namelen = cpu_to_be16(r->res_length);
402 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
403 offset += sizeof(uint16_t);
404 memcpy(outbuf + offset, r->res_name, r->res_length);
405 offset += r->res_length;
409 * If we've reached the end of the list (and there's room) write a
410 * terminating record.
413 if ((list == &ls->ls_root_list) &&
414 (offset + sizeof(uint16_t) <= outlen)) {
415 be_namelen = 0xFFFF;
416 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
417 offset += sizeof(uint16_t);
420 out:
421 up_read(&ls->ls_root_sem);