Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
recoverd.c
Go to the documentation of this file.
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "dir.h"
18 #include "ast.h"
19 #include "recover.h"
20 #include "lowcomms.h"
21 #include "lock.h"
22 #include "requestqueue.h"
23 #include "recoverd.h"
24 
25 
26 /* If the start for which we're re-enabling locking (seq) has been superseded
27  by a newer stop (ls_recover_seq), we need to leave locking disabled.
28 
29  We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
30  locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
31  enables locking and clears the requestqueue between a and b. */
32 
33 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
34 {
35  int error = -EINTR;
36 
38 
39  spin_lock(&ls->ls_recover_lock);
40  if (ls->ls_recover_seq == seq) {
42  /* unblocks processes waiting to enter the dlm */
45  error = 0;
46  }
47  spin_unlock(&ls->ls_recover_lock);
48 
50  return error;
51 }
52 
53 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
54 {
55  unsigned long start;
56  int error, neg = 0;
57 
58  log_debug(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
59 
61 
63 
64  dlm_clear_toss(ls);
65 
66  /*
67  * This list of root rsb's will be the basis of most of the recovery
68  * routines.
69  */
70 
72 
73  /*
74  * Add or remove nodes from the lockspace's ls_nodes list.
75  */
76 
77  error = dlm_recover_members(ls, rv, &neg);
78  if (error) {
79  log_debug(ls, "dlm_recover_members error %d", error);
80  goto fail;
81  }
82 
84 
87  ls->ls_recover_locks_in = 0;
88 
90 
91  error = dlm_recover_members_wait(ls);
92  if (error) {
93  log_debug(ls, "dlm_recover_members_wait error %d", error);
94  goto fail;
95  }
96 
97  start = jiffies;
98 
99  /*
100  * Rebuild our own share of the directory by collecting from all other
101  * nodes their master rsb names that hash to us.
102  */
103 
104  error = dlm_recover_directory(ls);
105  if (error) {
106  log_debug(ls, "dlm_recover_directory error %d", error);
107  goto fail;
108  }
109 
111 
112  error = dlm_recover_directory_wait(ls);
113  if (error) {
114  log_debug(ls, "dlm_recover_directory_wait error %d", error);
115  goto fail;
116  }
117 
118  log_debug(ls, "dlm_recover_directory %u out %u messages",
120 
121  /*
122  * We may have outstanding operations that are waiting for a reply from
123  * a failed node. Mark these to be resent after recovery. Unlock and
124  * cancel ops can just be completed.
125  */
126 
128 
129  error = dlm_recovery_stopped(ls);
130  if (error)
131  goto fail;
132 
133  if (neg || dlm_no_directory(ls)) {
134  /*
135  * Clear lkb's for departed nodes.
136  */
137 
138  dlm_recover_purge(ls);
139 
140  /*
141  * Get new master nodeid's for rsb's that were mastered on
142  * departed nodes.
143  */
144 
145  error = dlm_recover_masters(ls);
146  if (error) {
147  log_debug(ls, "dlm_recover_masters error %d", error);
148  goto fail;
149  }
150 
151  /*
152  * Send our locks on remastered rsb's to the new masters.
153  */
154 
155  error = dlm_recover_locks(ls);
156  if (error) {
157  log_debug(ls, "dlm_recover_locks error %d", error);
158  goto fail;
159  }
160 
162 
163  error = dlm_recover_locks_wait(ls);
164  if (error) {
165  log_debug(ls, "dlm_recover_locks_wait error %d", error);
166  goto fail;
167  }
168 
169  log_debug(ls, "dlm_recover_locks %u in",
170  ls->ls_recover_locks_in);
171 
172  /*
173  * Finalize state in master rsb's now that all locks can be
174  * checked. This includes conversion resolution and lvb
175  * settings.
176  */
177 
178  dlm_recover_rsbs(ls);
179  } else {
180  /*
181  * Other lockspace members may be going through the "neg" steps
182  * while also adding us to the lockspace, in which case they'll
183  * be doing the recover_locks (RS_LOCKS) barrier.
184  */
186 
187  error = dlm_recover_locks_wait(ls);
188  if (error) {
189  log_debug(ls, "dlm_recover_locks_wait error %d", error);
190  goto fail;
191  }
192  }
193 
195 
196  /*
197  * Purge directory-related requests that are saved in requestqueue.
198  * All dir requests from before recovery are invalid now due to the dir
199  * rebuild and will be resent by the requesting nodes.
200  */
201 
203 
205 
206  error = dlm_recover_done_wait(ls);
207  if (error) {
208  log_debug(ls, "dlm_recover_done_wait error %d", error);
209  goto fail;
210  }
211 
213 
215 
217 
218  error = enable_locking(ls, rv->seq);
219  if (error) {
220  log_debug(ls, "enable_locking error %d", error);
221  goto fail;
222  }
223 
224  error = dlm_process_requestqueue(ls);
225  if (error) {
226  log_debug(ls, "dlm_process_requestqueue error %d", error);
227  goto fail;
228  }
229 
230  error = dlm_recover_waiters_post(ls);
231  if (error) {
232  log_debug(ls, "dlm_recover_waiters_post error %d", error);
233  goto fail;
234  }
235 
236  dlm_recover_grant(ls);
237 
238  log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
239  (unsigned long long)rv->seq, ls->ls_generation,
240  jiffies_to_msecs(jiffies - start));
242 
244  return 0;
245 
246  fail:
248  log_debug(ls, "dlm_recover %llu error %d",
249  (unsigned long long)rv->seq, error);
251  return error;
252 }
253 
254 /* The dlm_ls_start() that created the rv we take here may already have been
255  stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
256  flag set. */
257 
258 static void do_ls_recovery(struct dlm_ls *ls)
259 {
260  struct dlm_recover *rv = NULL;
261 
262  spin_lock(&ls->ls_recover_lock);
263  rv = ls->ls_recover_args;
264  ls->ls_recover_args = NULL;
265  if (rv && ls->ls_recover_seq == rv->seq)
267  spin_unlock(&ls->ls_recover_lock);
268 
269  if (rv) {
270  ls_recover(ls, rv);
271  kfree(rv->nodes);
272  kfree(rv);
273  }
274 }
275 
276 static int dlm_recoverd(void *arg)
277 {
278  struct dlm_ls *ls;
279 
280  ls = dlm_find_lockspace_local(arg);
281  if (!ls) {
282  log_print("dlm_recoverd: no lockspace %p", arg);
283  return -1;
284  }
285 
289 
290  while (!kthread_should_stop()) {
292  if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
294  schedule();
296 
301  }
302 
304  do_ls_recovery(ls);
305  }
306 
308  up_write(&ls->ls_in_recovery);
309 
310  dlm_put_lockspace(ls);
311  return 0;
312 }
313 
314 int dlm_recoverd_start(struct dlm_ls *ls)
315 {
316  struct task_struct *p;
317  int error = 0;
318 
319  p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
320  if (IS_ERR(p))
321  error = PTR_ERR(p);
322  else
323  ls->ls_recoverd_task = p;
324  return error;
325 }
326 
327 void dlm_recoverd_stop(struct dlm_ls *ls)
328 {
330 }
331 
332 void dlm_recoverd_suspend(struct dlm_ls *ls)
333 {
334  wake_up(&ls->ls_wait_general);
336 }
337 
338 void dlm_recoverd_resume(struct dlm_ls *ls)
339 {
341 }
342