Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
member.c
Go to the documentation of this file.
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved.
5 **
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 #include "dlm_internal.h"
14 #include "lockspace.h"
15 #include "member.h"
16 #include "recoverd.h"
17 #include "recover.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "lowcomms.h"
21 
23 {
24  if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
25  return 0;
26  return 1;
27 }
28 
29 void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
30  struct dlm_member *memb)
31 {
32  struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
33 
35  return;
36 
37  memb->slot = le16_to_cpu(rf->rf_our_slot);
39 }
40 
41 void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
42 {
43  struct dlm_slot *slot;
44  struct rcom_slot *ro;
45  int i;
46 
47  ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
48 
49  /* ls_slots array is sparse, but not rcom_slots */
50 
51  for (i = 0; i < ls->ls_slots_size; i++) {
52  slot = &ls->ls_slots[i];
53  if (!slot->nodeid)
54  continue;
55  ro->ro_nodeid = cpu_to_le32(slot->nodeid);
56  ro->ro_slot = cpu_to_le16(slot->slot);
57  ro++;
58  }
59 }
60 
61 #define SLOT_DEBUG_LINE 128
62 
63 static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
64  struct rcom_slot *ro0, struct dlm_slot *array,
65  int array_size)
66 {
67  char line[SLOT_DEBUG_LINE];
68  int len = SLOT_DEBUG_LINE - 1;
69  int pos = 0;
70  int ret, i;
71 
72  if (!dlm_config.ci_log_debug)
73  return;
74 
75  memset(line, 0, sizeof(line));
76 
77  if (array) {
78  for (i = 0; i < array_size; i++) {
79  if (!array[i].nodeid)
80  continue;
81 
82  ret = snprintf(line + pos, len - pos, " %d:%d",
83  array[i].slot, array[i].nodeid);
84  if (ret >= len - pos)
85  break;
86  pos += ret;
87  }
88  } else if (ro0) {
89  for (i = 0; i < num_slots; i++) {
90  ret = snprintf(line + pos, len - pos, " %d:%d",
91  ro0[i].ro_slot, ro0[i].ro_nodeid);
92  if (ret >= len - pos)
93  break;
94  pos += ret;
95  }
96  }
97 
98  log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
99 }
100 
101 int dlm_slots_copy_in(struct dlm_ls *ls)
102 {
103  struct dlm_member *memb;
104  struct dlm_rcom *rc = ls->ls_recover_buf;
105  struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
106  struct rcom_slot *ro0, *ro;
107  int our_nodeid = dlm_our_nodeid();
108  int i, num_slots;
109  uint32_t gen;
110 
111  if (!dlm_slots_version(&rc->rc_header))
112  return -1;
113 
114  gen = le32_to_cpu(rf->rf_generation);
115  if (gen <= ls->ls_generation) {
116  log_error(ls, "dlm_slots_copy_in gen %u old %u",
117  gen, ls->ls_generation);
118  }
119  ls->ls_generation = gen;
120 
121  num_slots = le16_to_cpu(rf->rf_num_slots);
122  if (!num_slots)
123  return -1;
124 
125  ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
126 
127  for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
128  ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
129  ro->ro_slot = le16_to_cpu(ro->ro_slot);
130  }
131 
132  log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
133 
134  list_for_each_entry(memb, &ls->ls_nodes, list) {
135  for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
136  if (ro->ro_nodeid != memb->nodeid)
137  continue;
138  memb->slot = ro->ro_slot;
139  memb->slot_prev = memb->slot;
140  break;
141  }
142 
143  if (memb->nodeid == our_nodeid) {
144  if (ls->ls_slot && ls->ls_slot != memb->slot) {
145  log_error(ls, "dlm_slots_copy_in our slot "
146  "changed %d %d", ls->ls_slot,
147  memb->slot);
148  return -1;
149  }
150 
151  if (!ls->ls_slot)
152  ls->ls_slot = memb->slot;
153  }
154 
155  if (!memb->slot) {
156  log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
157  memb->nodeid);
158  return -1;
159  }
160  }
161 
162  return 0;
163 }
164 
165 /* for any nodes that do not support slots, we will not have set memb->slot
166  in wait_status_all(), so memb->slot will remain -1, and we will not
167  assign slots or set ls_num_slots here */
168 
169 int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
170  struct dlm_slot **slots_out, uint32_t *gen_out)
171 {
172  struct dlm_member *memb;
173  struct dlm_slot *array;
174  int our_nodeid = dlm_our_nodeid();
175  int array_size, max_slots, i;
176  int need = 0;
177  int max = 0;
178  int num = 0;
179  uint32_t gen = 0;
180 
181  /* our own memb struct will have slot -1 gen 0 */
182 
183  list_for_each_entry(memb, &ls->ls_nodes, list) {
184  if (memb->nodeid == our_nodeid) {
185  memb->slot = ls->ls_slot;
186  memb->generation = ls->ls_generation;
187  break;
188  }
189  }
190 
191  list_for_each_entry(memb, &ls->ls_nodes, list) {
192  if (memb->generation > gen)
193  gen = memb->generation;
194 
195  /* node doesn't support slots */
196 
197  if (memb->slot == -1)
198  return -1;
199 
200  /* node needs a slot assigned */
201 
202  if (!memb->slot)
203  need++;
204 
205  /* node has a slot assigned */
206 
207  num++;
208 
209  if (!max || max < memb->slot)
210  max = memb->slot;
211 
212  /* sanity check, once slot is assigned it shouldn't change */
213 
214  if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
215  log_error(ls, "nodeid %d slot changed %d %d",
216  memb->nodeid, memb->slot_prev, memb->slot);
217  return -1;
218  }
219  memb->slot_prev = memb->slot;
220  }
221 
222  array_size = max + need;
223 
224  array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
225  if (!array)
226  return -ENOMEM;
227 
228  num = 0;
229 
230  /* fill in slots (offsets) that are used */
231 
232  list_for_each_entry(memb, &ls->ls_nodes, list) {
233  if (!memb->slot)
234  continue;
235 
236  if (memb->slot > array_size) {
237  log_error(ls, "invalid slot number %d", memb->slot);
238  kfree(array);
239  return -1;
240  }
241 
242  array[memb->slot - 1].nodeid = memb->nodeid;
243  array[memb->slot - 1].slot = memb->slot;
244  num++;
245  }
246 
247  /* assign new slots from unused offsets */
248 
249  list_for_each_entry(memb, &ls->ls_nodes, list) {
250  if (memb->slot)
251  continue;
252 
253  for (i = 0; i < array_size; i++) {
254  if (array[i].nodeid)
255  continue;
256 
257  memb->slot = i + 1;
258  memb->slot_prev = memb->slot;
259  array[i].nodeid = memb->nodeid;
260  array[i].slot = memb->slot;
261  num++;
262 
263  if (!ls->ls_slot && memb->nodeid == our_nodeid)
264  ls->ls_slot = memb->slot;
265  break;
266  }
267 
268  if (!memb->slot) {
269  log_error(ls, "no free slot found");
270  kfree(array);
271  return -1;
272  }
273  }
274 
275  gen++;
276 
277  log_debug_slots(ls, gen, num, NULL, array, array_size);
278 
279  max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
280  sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
281 
282  if (num > max_slots) {
283  log_error(ls, "num_slots %d exceeds max_slots %d",
284  num, max_slots);
285  kfree(array);
286  return -1;
287  }
288 
289  *gen_out = gen;
290  *slots_out = array;
291  *slots_size = array_size;
292  *num_slots = num;
293  return 0;
294 }
295 
296 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
297 {
298  struct dlm_member *memb = NULL;
299  struct list_head *tmp;
300  struct list_head *newlist = &new->list;
301  struct list_head *head = &ls->ls_nodes;
302 
303  list_for_each(tmp, head) {
304  memb = list_entry(tmp, struct dlm_member, list);
305  if (new->nodeid < memb->nodeid)
306  break;
307  }
308 
309  if (!memb)
310  list_add_tail(newlist, head);
311  else {
312  /* FIXME: can use list macro here */
313  newlist->prev = tmp->prev;
314  newlist->next = tmp;
315  tmp->prev->next = newlist;
316  tmp->prev = newlist;
317  }
318 }
319 
320 static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
321 {
322  struct dlm_member *memb;
323  int error;
324 
325  memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
326  if (!memb)
327  return -ENOMEM;
328 
329  error = dlm_lowcomms_connect_node(node->nodeid);
330  if (error < 0) {
331  kfree(memb);
332  return error;
333  }
334 
335  memb->nodeid = node->nodeid;
336  memb->weight = node->weight;
337  memb->comm_seq = node->comm_seq;
338  add_ordered_member(ls, memb);
339  ls->ls_num_nodes++;
340  return 0;
341 }
342 
343 static struct dlm_member *find_memb(struct list_head *head, int nodeid)
344 {
345  struct dlm_member *memb;
346 
347  list_for_each_entry(memb, head, list) {
348  if (memb->nodeid == nodeid)
349  return memb;
350  }
351  return NULL;
352 }
353 
354 int dlm_is_member(struct dlm_ls *ls, int nodeid)
355 {
356  if (find_memb(&ls->ls_nodes, nodeid))
357  return 1;
358  return 0;
359 }
360 
361 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
362 {
363  if (find_memb(&ls->ls_nodes_gone, nodeid))
364  return 1;
365  return 0;
366 }
367 
368 static void clear_memb_list(struct list_head *head)
369 {
370  struct dlm_member *memb;
371 
372  while (!list_empty(head)) {
373  memb = list_entry(head->next, struct dlm_member, list);
374  list_del(&memb->list);
375  kfree(memb);
376  }
377 }
378 
379 void dlm_clear_members(struct dlm_ls *ls)
380 {
381  clear_memb_list(&ls->ls_nodes);
382  ls->ls_num_nodes = 0;
383 }
384 
386 {
387  clear_memb_list(&ls->ls_nodes_gone);
388 }
389 
390 static void make_member_array(struct dlm_ls *ls)
391 {
392  struct dlm_member *memb;
393  int i, w, x = 0, total = 0, all_zero = 0, *array;
394 
395  kfree(ls->ls_node_array);
396  ls->ls_node_array = NULL;
397 
398  list_for_each_entry(memb, &ls->ls_nodes, list) {
399  if (memb->weight)
400  total += memb->weight;
401  }
402 
403  /* all nodes revert to weight of 1 if all have weight 0 */
404 
405  if (!total) {
406  total = ls->ls_num_nodes;
407  all_zero = 1;
408  }
409 
410  ls->ls_total_weight = total;
411 
412  array = kmalloc(sizeof(int) * total, GFP_NOFS);
413  if (!array)
414  return;
415 
416  list_for_each_entry(memb, &ls->ls_nodes, list) {
417  if (!all_zero && !memb->weight)
418  continue;
419 
420  if (all_zero)
421  w = 1;
422  else
423  w = memb->weight;
424 
425  DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
426 
427  for (i = 0; i < w; i++)
428  array[x++] = memb->nodeid;
429  }
430 
431  ls->ls_node_array = array;
432 }
433 
434 /* send a status request to all members just to establish comms connections */
435 
436 static int ping_members(struct dlm_ls *ls)
437 {
438  struct dlm_member *memb;
439  int error = 0;
440 
441  list_for_each_entry(memb, &ls->ls_nodes, list) {
442  error = dlm_recovery_stopped(ls);
443  if (error)
444  break;
445  error = dlm_rcom_status(ls, memb->nodeid, 0);
446  if (error)
447  break;
448  }
449  if (error)
450  log_debug(ls, "ping_members aborted %d last nodeid %d",
451  error, ls->ls_recover_nodeid);
452  return error;
453 }
454 
455 static void dlm_lsop_recover_prep(struct dlm_ls *ls)
456 {
457  if (!ls->ls_ops || !ls->ls_ops->recover_prep)
458  return;
459  ls->ls_ops->recover_prep(ls->ls_ops_arg);
460 }
461 
462 static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
463 {
464  struct dlm_slot slot;
465  uint32_t seq;
466  int error;
467 
468  if (!ls->ls_ops || !ls->ls_ops->recover_slot)
469  return;
470 
471  /* if there is no comms connection with this node
472  or the present comms connection is newer
473  than the one when this member was added, then
474  we consider the node to have failed (versus
475  being removed due to dlm_release_lockspace) */
476 
477  error = dlm_comm_seq(memb->nodeid, &seq);
478 
479  if (!error && seq == memb->comm_seq)
480  return;
481 
482  slot.nodeid = memb->nodeid;
483  slot.slot = memb->slot;
484 
485  ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
486 }
487 
489 {
490  struct dlm_member *memb;
491  struct dlm_slot *slots;
492  int i, num;
493 
494  if (!ls->ls_ops || !ls->ls_ops->recover_done)
495  return;
496 
497  num = ls->ls_num_nodes;
498 
499  slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
500  if (!slots)
501  return;
502 
503  i = 0;
504  list_for_each_entry(memb, &ls->ls_nodes, list) {
505  if (i == num) {
506  log_error(ls, "dlm_lsop_recover_done bad num %d", num);
507  goto out;
508  }
509  slots[i].nodeid = memb->nodeid;
510  slots[i].slot = memb->slot;
511  i++;
512  }
513 
514  ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
515  ls->ls_slot, ls->ls_generation);
516  out:
517  kfree(slots);
518 }
519 
520 static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
521  int nodeid)
522 {
523  int i;
524 
525  for (i = 0; i < rv->nodes_count; i++) {
526  if (rv->nodes[i].nodeid == nodeid)
527  return &rv->nodes[i];
528  }
529  return NULL;
530 }
531 
532 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
533 {
534  struct dlm_member *memb, *safe;
535  struct dlm_config_node *node;
536  int i, error, neg = 0, low = -1;
537 
538  /* previously removed members that we've not finished removing need to
539  count as a negative change so the "neg" recovery steps will happen */
540 
541  list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
542  log_debug(ls, "prev removed member %d", memb->nodeid);
543  neg++;
544  }
545 
546  /* move departed members from ls_nodes to ls_nodes_gone */
547 
548  list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
549  node = find_config_node(rv, memb->nodeid);
550  if (node && !node->new)
551  continue;
552 
553  if (!node) {
554  log_debug(ls, "remove member %d", memb->nodeid);
555  } else {
556  /* removed and re-added */
557  log_debug(ls, "remove member %d comm_seq %u %u",
558  memb->nodeid, memb->comm_seq, node->comm_seq);
559  }
560 
561  neg++;
562  list_move(&memb->list, &ls->ls_nodes_gone);
563  ls->ls_num_nodes--;
564  dlm_lsop_recover_slot(ls, memb);
565  }
566 
567  /* add new members to ls_nodes */
568 
569  for (i = 0; i < rv->nodes_count; i++) {
570  node = &rv->nodes[i];
571  if (dlm_is_member(ls, node->nodeid))
572  continue;
573  dlm_add_member(ls, node);
574  log_debug(ls, "add member %d", node->nodeid);
575  }
576 
577  list_for_each_entry(memb, &ls->ls_nodes, list) {
578  if (low == -1 || memb->nodeid < low)
579  low = memb->nodeid;
580  }
581  ls->ls_low_nodeid = low;
582 
583  make_member_array(ls);
584  *neg_out = neg;
585 
586  error = ping_members(ls);
587  if (!error || error == -EPROTO) {
588  /* new_lockspace() may be waiting to know if the config
589  is good or bad */
590  ls->ls_members_result = error;
592  }
593 
594  log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
595  return error;
596 }
597 
598 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
599  dlm_ls_start() is called on any of them to start the new recovery. */
600 
601 int dlm_ls_stop(struct dlm_ls *ls)
602 {
603  int new;
604 
605  /*
606  * Prevent dlm_recv from being in the middle of something when we do
607  * the stop. This includes ensuring dlm_recv isn't processing a
608  * recovery message (rcom), while dlm_recoverd is aborting and
609  * resetting things from an in-progress recovery. i.e. we want
610  * dlm_recoverd to abort its recovery without worrying about dlm_recv
611  * processing an rcom at the same time. Stopping dlm_recv also makes
612  * it easy for dlm_receive_message() to check locking stopped and add a
613  * message to the requestqueue without races.
614  */
615 
617 
618  /*
619  * Abort any recovery that's in progress (see RECOVER_STOP,
620  * dlm_recovery_stopped()) and tell any other threads running in the
621  * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
622  */
623 
624  spin_lock(&ls->ls_recover_lock);
627  ls->ls_recover_seq++;
628  spin_unlock(&ls->ls_recover_lock);
629 
630  /*
631  * Let dlm_recv run again, now any normal messages will be saved on the
632  * requestqueue for later.
633  */
634 
635  up_write(&ls->ls_recv_active);
636 
637  /*
638  * This in_recovery lock does two things:
639  * 1) Keeps this function from returning until all threads are out
640  * of locking routines and locking is truly stopped.
641  * 2) Keeps any new requests from being processed until it's unlocked
642  * when recovery is complete.
643  */
644 
645  if (new) {
650  }
651 
652  /*
653  * The recoverd suspend/resume makes sure that dlm_recoverd (if
654  * running) has noticed RECOVER_STOP above and quit processing the
655  * previous recovery.
656  */
657 
659 
660  spin_lock(&ls->ls_recover_lock);
661  kfree(ls->ls_slots);
662  ls->ls_slots = NULL;
663  ls->ls_num_slots = 0;
664  ls->ls_slots_size = 0;
665  ls->ls_recover_status = 0;
666  spin_unlock(&ls->ls_recover_lock);
667 
669 
670  if (!ls->ls_recover_begin)
672 
673  dlm_lsop_recover_prep(ls);
674  return 0;
675 }
676 
677 int dlm_ls_start(struct dlm_ls *ls)
678 {
679  struct dlm_recover *rv = NULL, *rv_old;
680  struct dlm_config_node *nodes;
681  int error, count;
682 
683  rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
684  if (!rv)
685  return -ENOMEM;
686 
687  error = dlm_config_nodes(ls->ls_name, &nodes, &count);
688  if (error < 0)
689  goto fail;
690 
691  spin_lock(&ls->ls_recover_lock);
692 
693  /* the lockspace needs to be stopped before it can be started */
694 
695  if (!dlm_locking_stopped(ls)) {
696  spin_unlock(&ls->ls_recover_lock);
697  log_error(ls, "start ignored: lockspace running");
698  error = -EINVAL;
699  goto fail;
700  }
701 
702  rv->nodes = nodes;
703  rv->nodes_count = count;
704  rv->seq = ++ls->ls_recover_seq;
705  rv_old = ls->ls_recover_args;
706  ls->ls_recover_args = rv;
707  spin_unlock(&ls->ls_recover_lock);
708 
709  if (rv_old) {
710  log_error(ls, "unused recovery %llx %d",
711  (unsigned long long)rv_old->seq, rv_old->nodes_count);
712  kfree(rv_old->nodes);
713  kfree(rv_old);
714  }
715 
718  return 0;
719 
720  fail:
721  kfree(rv);
722  kfree(nodes);
723  return error;
724 }
725