Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
lockspace.c
Go to the documentation of this file.
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int ls_count;
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37  ssize_t ret = len;
38  int n = simple_strtol(buf, NULL, 0);
39 
41  if (!ls)
42  return -EINVAL;
43 
44  switch (n) {
45  case 0:
46  dlm_ls_stop(ls);
47  break;
48  case 1:
49  dlm_ls_start(ls);
50  break;
51  default:
52  ret = -EINVAL;
53  }
55  return ret;
56 }
57 
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60  ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
62  wake_up(&ls->ls_uevent_wait);
63  return len;
64 }
65 
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68  return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70 
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73  ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74  return len;
75 }
76 
77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78 {
79  return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80 }
81 
82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83 {
84  int val = simple_strtoul(buf, NULL, 0);
85  if (val == 1)
87  return len;
88 }
89 
90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
91 {
93  return snprintf(buf, PAGE_SIZE, "%x\n", status);
94 }
95 
96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
97 {
98  return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
99 }
100 
101 struct dlm_attr {
102  struct attribute attr;
103  ssize_t (*show)(struct dlm_ls *, char *);
104  ssize_t (*store)(struct dlm_ls *, const char *, size_t);
105 };
106 
107 static struct dlm_attr dlm_attr_control = {
108  .attr = {.name = "control", .mode = S_IWUSR},
109  .store = dlm_control_store
110 };
111 
112 static struct dlm_attr dlm_attr_event = {
113  .attr = {.name = "event_done", .mode = S_IWUSR},
114  .store = dlm_event_store
115 };
116 
117 static struct dlm_attr dlm_attr_id = {
118  .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119  .show = dlm_id_show,
120  .store = dlm_id_store
121 };
122 
123 static struct dlm_attr dlm_attr_nodir = {
124  .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125  .show = dlm_nodir_show,
126  .store = dlm_nodir_store
127 };
128 
129 static struct dlm_attr dlm_attr_recover_status = {
130  .attr = {.name = "recover_status", .mode = S_IRUGO},
131  .show = dlm_recover_status_show
132 };
133 
134 static struct dlm_attr dlm_attr_recover_nodeid = {
135  .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
136  .show = dlm_recover_nodeid_show
137 };
138 
139 static struct attribute *dlm_attrs[] = {
140  &dlm_attr_control.attr,
141  &dlm_attr_event.attr,
142  &dlm_attr_id.attr,
143  &dlm_attr_nodir.attr,
144  &dlm_attr_recover_status.attr,
145  &dlm_attr_recover_nodeid.attr,
146  NULL,
147 };
148 
149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150  char *buf)
151 {
152  struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
153  struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154  return a->show ? a->show(ls, buf) : 0;
155 }
156 
157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158  const char *buf, size_t len)
159 {
160  struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
161  struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162  return a->store ? a->store(ls, buf, len) : len;
163 }
164 
165 static void lockspace_kobj_release(struct kobject *k)
166 {
167  struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
168  kfree(ls);
169 }
170 
171 static const struct sysfs_ops dlm_attr_ops = {
172  .show = dlm_attr_show,
173  .store = dlm_attr_store,
174 };
175 
176 static struct kobj_type dlm_ktype = {
177  .default_attrs = dlm_attrs,
178  .sysfs_ops = &dlm_attr_ops,
179  .release = lockspace_kobj_release,
180 };
181 
182 static struct kset *dlm_kset;
183 
184 static int do_uevent(struct dlm_ls *ls, int in)
185 {
186  int error;
187 
188  if (in)
190  else
192 
193  log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
194 
195  /* dlm_controld will see the uevent, do the necessary group management
196  and then write to sysfs to wake us */
197 
200 
201  log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
202 
203  if (error)
204  goto out;
205 
206  error = ls->ls_uevent_result;
207  out:
208  if (error)
209  log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210  error, ls->ls_uevent_result);
211  return error;
212 }
213 
214 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215  struct kobj_uevent_env *env)
216 {
217  struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
218 
219  add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220  return 0;
221 }
222 
223 static struct kset_uevent_ops dlm_uevent_ops = {
224  .uevent = dlm_uevent,
225 };
226 
228 {
229  ls_count = 0;
230  mutex_init(&ls_lock);
231  INIT_LIST_HEAD(&lslist);
232  spin_lock_init(&lslist_lock);
233 
234  dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235  if (!dlm_kset) {
236  printk(KERN_WARNING "%s: can not create kset\n", __func__);
237  return -ENOMEM;
238  }
239  return 0;
240 }
241 
243 {
244  kset_unregister(dlm_kset);
245 }
246 
247 static struct dlm_ls *find_ls_to_scan(void)
248 {
249  struct dlm_ls *ls;
250 
251  spin_lock(&lslist_lock);
252  list_for_each_entry(ls, &lslist, ls_list) {
253  if (time_after_eq(jiffies, ls->ls_scan_time +
254  dlm_config.ci_scan_secs * HZ)) {
255  spin_unlock(&lslist_lock);
256  return ls;
257  }
258  }
259  spin_unlock(&lslist_lock);
260  return NULL;
261 }
262 
263 static int dlm_scand(void *data)
264 {
265  struct dlm_ls *ls;
266 
267  while (!kthread_should_stop()) {
268  ls = find_ls_to_scan();
269  if (ls) {
270  if (dlm_lock_recovery_try(ls)) {
271  ls->ls_scan_time = jiffies;
272  dlm_scan_rsbs(ls);
273  dlm_scan_timeout(ls);
274  dlm_scan_waiters(ls);
276  } else {
277  ls->ls_scan_time += HZ;
278  }
279  continue;
280  }
282  }
283  return 0;
284 }
285 
286 static int dlm_scand_start(void)
287 {
288  struct task_struct *p;
289  int error = 0;
290 
291  p = kthread_run(dlm_scand, NULL, "dlm_scand");
292  if (IS_ERR(p))
293  error = PTR_ERR(p);
294  else
295  scand_task = p;
296  return error;
297 }
298 
299 static void dlm_scand_stop(void)
300 {
301  kthread_stop(scand_task);
302 }
303 
305 {
306  struct dlm_ls *ls;
307 
308  spin_lock(&lslist_lock);
309 
310  list_for_each_entry(ls, &lslist, ls_list) {
311  if (ls->ls_global_id == id) {
312  ls->ls_count++;
313  goto out;
314  }
315  }
316  ls = NULL;
317  out:
318  spin_unlock(&lslist_lock);
319  return ls;
320 }
321 
323 {
324  struct dlm_ls *ls;
325 
326  spin_lock(&lslist_lock);
327  list_for_each_entry(ls, &lslist, ls_list) {
328  if (ls->ls_local_handle == lockspace) {
329  ls->ls_count++;
330  goto out;
331  }
332  }
333  ls = NULL;
334  out:
335  spin_unlock(&lslist_lock);
336  return ls;
337 }
338 
340 {
341  struct dlm_ls *ls;
342 
343  spin_lock(&lslist_lock);
344  list_for_each_entry(ls, &lslist, ls_list) {
345  if (ls->ls_device.minor == minor) {
346  ls->ls_count++;
347  goto out;
348  }
349  }
350  ls = NULL;
351  out:
352  spin_unlock(&lslist_lock);
353  return ls;
354 }
355 
356 void dlm_put_lockspace(struct dlm_ls *ls)
357 {
358  spin_lock(&lslist_lock);
359  ls->ls_count--;
360  spin_unlock(&lslist_lock);
361 }
362 
363 static void remove_lockspace(struct dlm_ls *ls)
364 {
365  for (;;) {
366  spin_lock(&lslist_lock);
367  if (ls->ls_count == 0) {
368  WARN_ON(ls->ls_create_count != 0);
369  list_del(&ls->ls_list);
370  spin_unlock(&lslist_lock);
371  return;
372  }
373  spin_unlock(&lslist_lock);
374  ssleep(1);
375  }
376 }
377 
378 static int threads_start(void)
379 {
380  int error;
381 
382  error = dlm_scand_start();
383  if (error) {
384  log_print("cannot start dlm_scand thread %d", error);
385  goto fail;
386  }
387 
388  /* Thread for sending/receiving messages for all lockspace's */
389  error = dlm_lowcomms_start();
390  if (error) {
391  log_print("cannot start dlm lowcomms %d", error);
392  goto scand_fail;
393  }
394 
395  return 0;
396 
397  scand_fail:
398  dlm_scand_stop();
399  fail:
400  return error;
401 }
402 
403 static void threads_stop(void)
404 {
405  dlm_scand_stop();
407 }
408 
409 static int new_lockspace(const char *name, const char *cluster,
410  uint32_t flags, int lvblen,
411  const struct dlm_lockspace_ops *ops, void *ops_arg,
412  int *ops_result, dlm_lockspace_t **lockspace)
413 {
414  struct dlm_ls *ls;
415  int i, size, error;
416  int do_unreg = 0;
417  int namelen = strlen(name);
418 
419  if (namelen > DLM_LOCKSPACE_LEN)
420  return -EINVAL;
421 
422  if (!lvblen || (lvblen % 8))
423  return -EINVAL;
424 
425  if (!try_module_get(THIS_MODULE))
426  return -EINVAL;
427 
428  if (!dlm_user_daemon_available()) {
429  log_print("dlm user daemon not available");
430  error = -EUNATCH;
431  goto out;
432  }
433 
434  if (ops && ops_result) {
435  if (!dlm_config.ci_recover_callbacks)
436  *ops_result = -EOPNOTSUPP;
437  else
438  *ops_result = 0;
439  }
440 
441  if (dlm_config.ci_recover_callbacks && cluster &&
442  strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443  log_print("dlm cluster name %s mismatch %s",
444  dlm_config.ci_cluster_name, cluster);
445  error = -EBADR;
446  goto out;
447  }
448 
449  error = 0;
450 
451  spin_lock(&lslist_lock);
452  list_for_each_entry(ls, &lslist, ls_list) {
453  WARN_ON(ls->ls_create_count <= 0);
454  if (ls->ls_namelen != namelen)
455  continue;
456  if (memcmp(ls->ls_name, name, namelen))
457  continue;
458  if (flags & DLM_LSFL_NEWEXCL) {
459  error = -EEXIST;
460  break;
461  }
462  ls->ls_create_count++;
463  *lockspace = ls;
464  error = 1;
465  break;
466  }
467  spin_unlock(&lslist_lock);
468 
469  if (error)
470  goto out;
471 
472  error = -ENOMEM;
473 
474  ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475  if (!ls)
476  goto out;
477  memcpy(ls->ls_name, name, namelen);
478  ls->ls_namelen = namelen;
479  ls->ls_lvblen = lvblen;
480  ls->ls_count = 0;
481  ls->ls_flags = 0;
482  ls->ls_scan_time = jiffies;
483 
484  if (ops && dlm_config.ci_recover_callbacks) {
485  ls->ls_ops = ops;
486  ls->ls_ops_arg = ops_arg;
487  }
488 
489  if (flags & DLM_LSFL_TIMEWARN)
491 
492  /* ls_exflags are forced to match among nodes, and we don't
493  need to require all nodes to have some flags set */
494  ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
496 
497  size = dlm_config.ci_rsbtbl_size;
498  ls->ls_rsbtbl_size = size;
499 
500  ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501  if (!ls->ls_rsbtbl)
502  goto out_lsfree;
503  for (i = 0; i < size; i++) {
504  ls->ls_rsbtbl[i].keep.rb_node = NULL;
505  ls->ls_rsbtbl[i].toss.rb_node = NULL;
506  spin_lock_init(&ls->ls_rsbtbl[i].lock);
507  }
508 
510 
511  for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
512  ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
513  GFP_KERNEL);
514  if (!ls->ls_remove_names[i])
515  goto out_rsbtbl;
516  }
517 
518  idr_init(&ls->ls_lkbidr);
520 
521  INIT_LIST_HEAD(&ls->ls_waiters);
523  INIT_LIST_HEAD(&ls->ls_orphans);
525  INIT_LIST_HEAD(&ls->ls_timeout);
527 
528  INIT_LIST_HEAD(&ls->ls_new_rsb);
530 
531  INIT_LIST_HEAD(&ls->ls_nodes);
532  INIT_LIST_HEAD(&ls->ls_nodes_gone);
533  ls->ls_num_nodes = 0;
534  ls->ls_low_nodeid = 0;
535  ls->ls_total_weight = 0;
536  ls->ls_node_array = NULL;
537 
538  memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
539  ls->ls_stub_rsb.res_ls = ls;
540 
543 
545  ls->ls_uevent_result = 0;
546  init_completion(&ls->ls_members_done);
547  ls->ls_members_result = -1;
548 
549  mutex_init(&ls->ls_cb_mutex);
550  INIT_LIST_HEAD(&ls->ls_cb_delay);
551 
552  ls->ls_recoverd_task = NULL;
556  get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
557  ls->ls_recover_status = 0;
558  ls->ls_recover_seq = 0;
559  ls->ls_recover_args = NULL;
562  INIT_LIST_HEAD(&ls->ls_requestqueue);
565 
566  ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
567  if (!ls->ls_recover_buf)
568  goto out_lkbidr;
569 
570  ls->ls_slot = 0;
571  ls->ls_num_slots = 0;
572  ls->ls_slots_size = 0;
573  ls->ls_slots = NULL;
574 
575  INIT_LIST_HEAD(&ls->ls_recover_list);
577  idr_init(&ls->ls_recover_idr);
579  ls->ls_recover_list_count = 0;
580  ls->ls_local_handle = ls;
582  INIT_LIST_HEAD(&ls->ls_root_list);
583  init_rwsem(&ls->ls_root_sem);
584 
585  spin_lock(&lslist_lock);
586  ls->ls_create_count = 1;
587  list_add(&ls->ls_list, &lslist);
588  spin_unlock(&lslist_lock);
589 
590  if (flags & DLM_LSFL_FS) {
591  error = dlm_callback_start(ls);
592  if (error) {
593  log_error(ls, "can't start dlm_callback %d", error);
594  goto out_delist;
595  }
596  }
597 
599 
600  /*
601  * Once started, dlm_recoverd first looks for ls in lslist, then
602  * initializes ls_in_recovery as locked in "down" mode. We need
603  * to wait for the wakeup from dlm_recoverd because in_recovery
604  * has to start out in down mode.
605  */
606 
607  error = dlm_recoverd_start(ls);
608  if (error) {
609  log_error(ls, "can't start dlm_recoverd %d", error);
610  goto out_callback;
611  }
612 
615 
616  ls->ls_kobj.kset = dlm_kset;
617  error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
618  "%s", ls->ls_name);
619  if (error)
620  goto out_recoverd;
622 
623  /* let kobject handle freeing of ls if there's an error */
624  do_unreg = 1;
625 
626  /* This uevent triggers dlm_controld in userspace to add us to the
627  group of nodes that are members of this lockspace (managed by the
628  cluster infrastructure.) Once it's done that, it tells us who the
629  current lockspace members are (via configfs) and then tells the
630  lockspace to start running (via sysfs) in dlm_ls_start(). */
631 
632  error = do_uevent(ls, 1);
633  if (error)
634  goto out_recoverd;
635 
637  error = ls->ls_members_result;
638  if (error)
639  goto out_members;
640 
642 
643  log_debug(ls, "join complete");
644  *lockspace = ls;
645  return 0;
646 
647  out_members:
648  do_uevent(ls, 0);
649  dlm_clear_members(ls);
650  kfree(ls->ls_node_array);
651  out_recoverd:
652  dlm_recoverd_stop(ls);
653  out_callback:
654  dlm_callback_stop(ls);
655  out_delist:
656  spin_lock(&lslist_lock);
657  list_del(&ls->ls_list);
658  spin_unlock(&lslist_lock);
660  kfree(ls->ls_recover_buf);
661  out_lkbidr:
662  idr_destroy(&ls->ls_lkbidr);
663  for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
664  if (ls->ls_remove_names[i])
665  kfree(ls->ls_remove_names[i]);
666  }
667  out_rsbtbl:
668  vfree(ls->ls_rsbtbl);
669  out_lsfree:
670  if (do_unreg)
671  kobject_put(&ls->ls_kobj);
672  else
673  kfree(ls);
674  out:
675  module_put(THIS_MODULE);
676  return error;
677 }
678 
679 int dlm_new_lockspace(const char *name, const char *cluster,
680  uint32_t flags, int lvblen,
681  const struct dlm_lockspace_ops *ops, void *ops_arg,
682  int *ops_result, dlm_lockspace_t **lockspace)
683 {
684  int error = 0;
685 
686  mutex_lock(&ls_lock);
687  if (!ls_count)
688  error = threads_start();
689  if (error)
690  goto out;
691 
692  error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
693  ops_result, lockspace);
694  if (!error)
695  ls_count++;
696  if (error > 0)
697  error = 0;
698  if (!ls_count)
699  threads_stop();
700  out:
701  mutex_unlock(&ls_lock);
702  return error;
703 }
704 
705 static int lkb_idr_is_local(int id, void *p, void *data)
706 {
707  struct dlm_lkb *lkb = p;
708 
709  if (!lkb->lkb_nodeid)
710  return 1;
711  return 0;
712 }
713 
714 static int lkb_idr_is_any(int id, void *p, void *data)
715 {
716  return 1;
717 }
718 
719 static int lkb_idr_free(int id, void *p, void *data)
720 {
721  struct dlm_lkb *lkb = p;
722 
723  if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
724  dlm_free_lvb(lkb->lkb_lvbptr);
725 
726  dlm_free_lkb(lkb);
727  return 0;
728 }
729 
730 /* NOTE: We check the lkbidr here rather than the resource table.
731  This is because there may be LKBs queued as ASTs that have been unlinked
732  from their RSBs and are pending deletion once the AST has been delivered */
733 
734 static int lockspace_busy(struct dlm_ls *ls, int force)
735 {
736  int rv;
737 
738  spin_lock(&ls->ls_lkbidr_spin);
739  if (force == 0) {
740  rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
741  } else if (force == 1) {
742  rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
743  } else {
744  rv = 0;
745  }
746  spin_unlock(&ls->ls_lkbidr_spin);
747  return rv;
748 }
749 
750 static int release_lockspace(struct dlm_ls *ls, int force)
751 {
752  struct dlm_rsb *rsb;
753  struct rb_node *n;
754  int i, busy, rv;
755 
756  busy = lockspace_busy(ls, force);
757 
758  spin_lock(&lslist_lock);
759  if (ls->ls_create_count == 1) {
760  if (busy) {
761  rv = -EBUSY;
762  } else {
763  /* remove_lockspace takes ls off lslist */
764  ls->ls_create_count = 0;
765  rv = 0;
766  }
767  } else if (ls->ls_create_count > 1) {
768  rv = --ls->ls_create_count;
769  } else {
770  rv = -EINVAL;
771  }
772  spin_unlock(&lslist_lock);
773 
774  if (rv) {
775  log_debug(ls, "release_lockspace no remove %d", rv);
776  return rv;
777  }
778 
780 
781  if (force < 3 && dlm_user_daemon_available())
782  do_uevent(ls, 0);
783 
784  dlm_recoverd_stop(ls);
785 
786  dlm_callback_stop(ls);
787 
788  remove_lockspace(ls);
789 
791 
792  kfree(ls->ls_recover_buf);
793 
794  /*
795  * Free all lkb's in idr
796  */
797 
798  idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
800  idr_destroy(&ls->ls_lkbidr);
801 
802  /*
803  * Free all rsb's on rsbtbl[] lists
804  */
805 
806  for (i = 0; i < ls->ls_rsbtbl_size; i++) {
807  while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
808  rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
809  rb_erase(n, &ls->ls_rsbtbl[i].keep);
810  dlm_free_rsb(rsb);
811  }
812 
813  while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
814  rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
815  rb_erase(n, &ls->ls_rsbtbl[i].toss);
816  dlm_free_rsb(rsb);
817  }
818  }
819 
820  vfree(ls->ls_rsbtbl);
821 
822  for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
823  kfree(ls->ls_remove_names[i]);
824 
825  while (!list_empty(&ls->ls_new_rsb)) {
826  rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
827  res_hashchain);
828  list_del(&rsb->res_hashchain);
829  dlm_free_rsb(rsb);
830  }
831 
832  /*
833  * Free structures on any other lists
834  */
835 
837  kfree(ls->ls_recover_args);
838  dlm_clear_members(ls);
840  kfree(ls->ls_node_array);
841  log_debug(ls, "release_lockspace final free");
842  kobject_put(&ls->ls_kobj);
843  /* The ls structure will be freed when the kobject is done with */
844 
845  module_put(THIS_MODULE);
846  return 0;
847 }
848 
849 /*
850  * Called when a system has released all its locks and is not going to use the
851  * lockspace any longer. We free everything we're managing for this lockspace.
852  * Remaining nodes will go through the recovery process as if we'd died. The
853  * lockspace must continue to function as usual, participating in recoveries,
854  * until this returns.
855  *
856  * Force has 4 possible values:
857  * 0 - don't destroy locksapce if it has any LKBs
858  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
859  * 2 - destroy lockspace regardless of LKBs
860  * 3 - destroy lockspace as part of a forced shutdown
861  */
862 
863 int dlm_release_lockspace(void *lockspace, int force)
864 {
865  struct dlm_ls *ls;
866  int error;
867 
868  ls = dlm_find_lockspace_local(lockspace);
869  if (!ls)
870  return -EINVAL;
871  dlm_put_lockspace(ls);
872 
873  mutex_lock(&ls_lock);
874  error = release_lockspace(ls, force);
875  if (!error)
876  ls_count--;
877  if (!ls_count)
878  threads_stop();
879  mutex_unlock(&ls_lock);
880 
881  return error;
882 }
883 
885 {
886  struct dlm_ls *ls;
887 
888  restart:
889  spin_lock(&lslist_lock);
890  list_for_each_entry(ls, &lslist, ls_list) {
891  if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
892  continue;
893  spin_unlock(&lslist_lock);
894  log_error(ls, "no userland control daemon, stopping lockspace");
895  dlm_ls_stop(ls);
896  goto restart;
897  }
898  spin_unlock(&lslist_lock);
899 }
900