Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dir.c
Go to the documentation of this file.
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25 
26 /*
27  * We use the upper 16 bits of the hash value to select the directory node.
28  * Low bits are used for distribution of rsb's among hash buckets on each node.
29  *
30  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
31  * num_nodes to the hash value. This value in the desired range is used as an
32  * offset into the sorted list of nodeid's to give the particular nodeid.
33  */
34 
36 {
37  uint32_t node;
38 
39  if (ls->ls_num_nodes == 1)
40  return dlm_our_nodeid();
41  else {
42  node = (hash >> 16) % ls->ls_total_weight;
43  return ls->ls_node_array[node];
44  }
45 }
46 
47 int dlm_dir_nodeid(struct dlm_rsb *r)
48 {
49  return r->res_dir_nodeid;
50 }
51 
53 {
54  struct dlm_rsb *r;
55 
56  down_read(&ls->ls_root_sem);
59  }
60  up_read(&ls->ls_root_sem);
61 }
62 
64 {
65  struct dlm_member *memb;
66  char *b, *last_name = NULL;
67  int error = -ENOMEM, last_len, nodeid, result;
69  unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
70 
71  log_debug(ls, "dlm_recover_directory");
72 
73  if (dlm_no_directory(ls))
74  goto out_status;
75 
76  last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
77  if (!last_name)
78  goto out;
79 
80  list_for_each_entry(memb, &ls->ls_nodes, list) {
81  if (memb->nodeid == dlm_our_nodeid())
82  continue;
83 
84  memset(last_name, 0, DLM_RESNAME_MAXLEN);
85  last_len = 0;
86 
87  for (;;) {
88  int left;
89  error = dlm_recovery_stopped(ls);
90  if (error)
91  goto out_free;
92 
93  error = dlm_rcom_names(ls, memb->nodeid,
94  last_name, last_len);
95  if (error)
96  goto out_free;
97 
98  cond_resched();
99 
100  /*
101  * pick namelen/name pairs out of received buffer
102  */
103 
104  b = ls->ls_recover_buf->rc_buf;
105  left = ls->ls_recover_buf->rc_header.h_length;
106  left -= sizeof(struct dlm_rcom);
107 
108  for (;;) {
109  __be16 v;
110 
111  error = -EINVAL;
112  if (left < sizeof(__be16))
113  goto out_free;
114 
115  memcpy(&v, b, sizeof(__be16));
116  namelen = be16_to_cpu(v);
117  b += sizeof(__be16);
118  left -= sizeof(__be16);
119 
120  /* namelen of 0xFFFFF marks end of names for
121  this node; namelen of 0 marks end of the
122  buffer */
123 
124  if (namelen == 0xFFFF)
125  goto done;
126  if (!namelen)
127  break;
128 
129  if (namelen > left)
130  goto out_free;
131 
132  if (namelen > DLM_RESNAME_MAXLEN)
133  goto out_free;
134 
135  error = dlm_master_lookup(ls, memb->nodeid,
136  b, namelen,
138  &nodeid, &result);
139  if (error) {
140  log_error(ls, "recover_dir lookup %d",
141  error);
142  goto out_free;
143  }
144 
145  /* The name was found in rsbtbl, but the
146  * master nodeid is different from
147  * memb->nodeid which says it is the master.
148  * This should not happen. */
149 
150  if (result == DLM_LU_MATCH &&
151  nodeid != memb->nodeid) {
152  count_bad++;
153  log_error(ls, "recover_dir lookup %d "
154  "nodeid %d memb %d bad %u",
155  result, nodeid, memb->nodeid,
156  count_bad);
157  print_hex_dump_bytes("dlm_recover_dir ",
159  b, namelen);
160  }
161 
162  /* The name was found in rsbtbl, and the
163  * master nodeid matches memb->nodeid. */
164 
165  if (result == DLM_LU_MATCH &&
166  nodeid == memb->nodeid) {
167  count_match++;
168  }
169 
170  /* The name was not found in rsbtbl and was
171  * added with memb->nodeid as the master. */
172 
173  if (result == DLM_LU_ADD) {
174  count_add++;
175  }
176 
177  last_len = namelen;
178  memcpy(last_name, b, namelen);
179  b += namelen;
180  left -= namelen;
181  count++;
182  }
183  }
184  done:
185  ;
186  }
187 
188  out_status:
189  error = 0;
191 
192  log_debug(ls, "dlm_recover_directory %u in %u new",
193  count, count_add);
194  out_free:
195  kfree(last_name);
196  out:
197  return error;
198 }
199 
200 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
201 {
202  struct dlm_rsb *r;
203  uint32_t hash, bucket;
204  int rv;
205 
206  hash = jhash(name, len, 0);
207  bucket = hash & (ls->ls_rsbtbl_size - 1);
208 
209  spin_lock(&ls->ls_rsbtbl[bucket].lock);
210  rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211  if (rv)
212  rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213  name, len, &r);
214  spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215 
216  if (!rv)
217  return r;
218 
219  down_read(&ls->ls_root_sem);
221  if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222  up_read(&ls->ls_root_sem);
223  log_debug(ls, "find_rsb_root revert to root_list %s",
224  r->res_name);
225  return r;
226  }
227  }
228  up_read(&ls->ls_root_sem);
229  return NULL;
230 }
231 
232 /* Find the rsb where we left off (or start again), then send rsb names
233  for rsb's we're master of and whose directory node matches the requesting
234  node. inbuf is the rsb name last sent, inlen is the name's length */
235 
236 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237  char *outbuf, int outlen, int nodeid)
238 {
239  struct list_head *list;
240  struct dlm_rsb *r;
241  int offset = 0, dir_nodeid;
242  __be16 be_namelen;
243 
244  down_read(&ls->ls_root_sem);
245 
246  if (inlen > 1) {
247  r = find_rsb_root(ls, inbuf, inlen);
248  if (!r) {
249  inbuf[inlen - 1] = '\0';
250  log_error(ls, "copy_master_names from %d start %d %s",
251  nodeid, inlen, inbuf);
252  goto out;
253  }
254  list = r->res_root_list.next;
255  } else {
256  list = ls->ls_root_list.next;
257  }
258 
259  for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260  r = list_entry(list, struct dlm_rsb, res_root_list);
261  if (r->res_nodeid)
262  continue;
263 
264  dir_nodeid = dlm_dir_nodeid(r);
265  if (dir_nodeid != nodeid)
266  continue;
267 
268  /*
269  * The block ends when we can't fit the following in the
270  * remaining buffer space:
271  * namelen (uint16_t) +
272  * name (r->res_length) +
273  * end-of-block record 0x0000 (uint16_t)
274  */
275 
276  if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277  /* Write end-of-block record */
278  be_namelen = cpu_to_be16(0);
279  memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280  offset += sizeof(__be16);
282  goto out;
283  }
284 
285  be_namelen = cpu_to_be16(r->res_length);
286  memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287  offset += sizeof(__be16);
288  memcpy(outbuf + offset, r->res_name, r->res_length);
289  offset += r->res_length;
291  }
292 
293  /*
294  * If we've reached the end of the list (and there's room) write a
295  * terminating record.
296  */
297 
298  if ((list == &ls->ls_root_list) &&
299  (offset + sizeof(uint16_t) <= outlen)) {
300  be_namelen = cpu_to_be16(0xFFFF);
301  memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302  offset += sizeof(__be16);
304  }
305  out:
306  up_read(&ls->ls_root_sem);
307 }
308