Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dlmmaster.c
Go to the documentation of this file.
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle. All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/init.h>
34 #include <linux/sysctl.h>
35 #include <linux/random.h>
36 #include <linux/blkdev.h>
37 #include <linux/socket.h>
38 #include <linux/inet.h>
39 #include <linux/spinlock.h>
40 #include <linux/delay.h>
41 
42 
43 #include "cluster/heartbeat.h"
44 #include "cluster/nodemanager.h"
45 #include "cluster/tcp.h"
46 
47 #include "dlmapi.h"
48 #include "dlmcommon.h"
49 #include "dlmdomain.h"
50 #include "dlmdebug.h"
51 
52 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
53 #include "cluster/masklog.h"
54 
55 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
56  struct dlm_master_list_entry *mle,
57  struct o2nm_node *node,
58  int idx);
59 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
60  struct dlm_master_list_entry *mle,
61  struct o2nm_node *node,
62  int idx);
63 
64 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
65 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
66  struct dlm_lock_resource *res,
67  void *nodemap, u32 flags);
68 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
69 
70 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
71  struct dlm_master_list_entry *mle,
72  const char *name,
73  unsigned int namelen)
74 {
75  if (dlm != mle->dlm)
76  return 0;
77 
78  if (namelen != mle->mnamelen ||
79  memcmp(name, mle->mname, namelen) != 0)
80  return 0;
81 
82  return 1;
83 }
84 
85 static struct kmem_cache *dlm_lockres_cache = NULL;
86 static struct kmem_cache *dlm_lockname_cache = NULL;
87 static struct kmem_cache *dlm_mle_cache = NULL;
88 
89 static void dlm_mle_release(struct kref *kref);
90 static void dlm_init_mle(struct dlm_master_list_entry *mle,
91  enum dlm_mle_type type,
92  struct dlm_ctxt *dlm,
93  struct dlm_lock_resource *res,
94  const char *name,
95  unsigned int namelen);
96 static void dlm_put_mle(struct dlm_master_list_entry *mle);
97 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
98 static int dlm_find_mle(struct dlm_ctxt *dlm,
99  struct dlm_master_list_entry **mle,
100  char *name, unsigned int namelen);
101 
102 static int dlm_do_master_request(struct dlm_lock_resource *res,
103  struct dlm_master_list_entry *mle, int to);
104 
105 
106 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
107  struct dlm_lock_resource *res,
108  struct dlm_master_list_entry *mle,
109  int *blocked);
110 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
111  struct dlm_lock_resource *res,
112  struct dlm_master_list_entry *mle,
113  int blocked);
114 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
115  struct dlm_lock_resource *res,
116  struct dlm_master_list_entry *mle,
117  struct dlm_master_list_entry **oldmle,
118  const char *name, unsigned int namelen,
119  u8 new_master, u8 master);
120 
121 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
122  struct dlm_lock_resource *res);
123 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
124  struct dlm_lock_resource *res);
125 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
126  struct dlm_lock_resource *res,
127  u8 target);
128 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
129  struct dlm_lock_resource *res);
130 
131 
132 int dlm_is_host_down(int errno)
133 {
134  switch (errno) {
135  case -EBADF:
136  case -ECONNREFUSED:
137  case -ENOTCONN:
138  case -ECONNRESET:
139  case -EPIPE:
140  case -EHOSTDOWN:
141  case -EHOSTUNREACH:
142  case -ETIMEDOUT:
143  case -ECONNABORTED:
144  case -ENETDOWN:
145  case -ENETUNREACH:
146  case -ENETRESET:
147  case -ESHUTDOWN:
148  case -ENOPROTOOPT:
149  case -EINVAL: /* if returned from our tcp code,
150  this means there is no socket */
151  return 1;
152  }
153  return 0;
154 }
155 
156 
157 /*
158  * MASTER LIST FUNCTIONS
159  */
160 
161 
162 /*
163  * regarding master list entries and heartbeat callbacks:
164  *
165  * in order to avoid sleeping and allocation that occurs in
166  * heartbeat, master list entries are simply attached to the
167  * dlm's established heartbeat callbacks. the mle is attached
168  * when it is created, and since the dlm->spinlock is held at
169  * that time, any heartbeat event will be properly discovered
170  * by the mle. the mle needs to be detached from the
171  * dlm->mle_hb_events list as soon as heartbeat events are no
172  * longer useful to the mle, and before the mle is freed.
173  *
174  * as a general rule, heartbeat events are no longer needed by
175  * the mle once an "answer" regarding the lock master has been
176  * received.
177  */
178 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
179  struct dlm_master_list_entry *mle)
180 {
182 
183  list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
184 }
185 
186 
187 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
188  struct dlm_master_list_entry *mle)
189 {
190  if (!list_empty(&mle->hb_events))
191  list_del_init(&mle->hb_events);
192 }
193 
194 
195 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
196  struct dlm_master_list_entry *mle)
197 {
198  spin_lock(&dlm->spinlock);
199  __dlm_mle_detach_hb_events(dlm, mle);
200  spin_unlock(&dlm->spinlock);
201 }
202 
203 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
204 {
205  struct dlm_ctxt *dlm;
206  dlm = mle->dlm;
207 
210  mle->inuse++;
211  kref_get(&mle->mle_refs);
212 }
213 
214 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
215 {
216  struct dlm_ctxt *dlm;
217  dlm = mle->dlm;
218 
219  spin_lock(&dlm->spinlock);
220  spin_lock(&dlm->master_lock);
221  mle->inuse--;
222  __dlm_put_mle(mle);
223  spin_unlock(&dlm->master_lock);
224  spin_unlock(&dlm->spinlock);
225 
226 }
227 
228 /* remove from list and free */
229 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
230 {
231  struct dlm_ctxt *dlm;
232  dlm = mle->dlm;
233 
236  if (!atomic_read(&mle->mle_refs.refcount)) {
237  /* this may or may not crash, but who cares.
238  * it's a BUG. */
239  mlog(ML_ERROR, "bad mle: %p\n", mle);
240  dlm_print_one_mle(mle);
241  BUG();
242  } else
243  kref_put(&mle->mle_refs, dlm_mle_release);
244 }
245 
246 
247 /* must not have any spinlocks coming in */
248 static void dlm_put_mle(struct dlm_master_list_entry *mle)
249 {
250  struct dlm_ctxt *dlm;
251  dlm = mle->dlm;
252 
253  spin_lock(&dlm->spinlock);
254  spin_lock(&dlm->master_lock);
255  __dlm_put_mle(mle);
256  spin_unlock(&dlm->master_lock);
257  spin_unlock(&dlm->spinlock);
258 }
259 
260 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
261 {
262  kref_get(&mle->mle_refs);
263 }
264 
265 static void dlm_init_mle(struct dlm_master_list_entry *mle,
266  enum dlm_mle_type type,
267  struct dlm_ctxt *dlm,
268  struct dlm_lock_resource *res,
269  const char *name,
270  unsigned int namelen)
271 {
273 
274  mle->dlm = dlm;
275  mle->type = type;
276  INIT_HLIST_NODE(&mle->master_hash_node);
277  INIT_LIST_HEAD(&mle->hb_events);
278  memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
279  spin_lock_init(&mle->spinlock);
280  init_waitqueue_head(&mle->wq);
281  atomic_set(&mle->woken, 0);
282  kref_init(&mle->mle_refs);
283  memset(mle->response_map, 0, sizeof(mle->response_map));
284  mle->master = O2NM_MAX_NODES;
285  mle->new_master = O2NM_MAX_NODES;
286  mle->inuse = 0;
287 
288  BUG_ON(mle->type != DLM_MLE_BLOCK &&
289  mle->type != DLM_MLE_MASTER &&
290  mle->type != DLM_MLE_MIGRATION);
291 
292  if (mle->type == DLM_MLE_MASTER) {
293  BUG_ON(!res);
294  mle->mleres = res;
295  memcpy(mle->mname, res->lockname.name, res->lockname.len);
296  mle->mnamelen = res->lockname.len;
297  mle->mnamehash = res->lockname.hash;
298  } else {
299  BUG_ON(!name);
300  mle->mleres = NULL;
301  memcpy(mle->mname, name, namelen);
302  mle->mnamelen = namelen;
303  mle->mnamehash = dlm_lockid_hash(name, namelen);
304  }
305 
306  atomic_inc(&dlm->mle_tot_count[mle->type]);
307  atomic_inc(&dlm->mle_cur_count[mle->type]);
308 
309  /* copy off the node_map and register hb callbacks on our copy */
310  memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
311  memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
312  clear_bit(dlm->node_num, mle->vote_map);
313  clear_bit(dlm->node_num, mle->node_map);
314 
315  /* attach the mle to the domain node up/down events */
316  __dlm_mle_attach_hb_events(dlm, mle);
317 }
318 
319 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
320 {
323 
324  if (!hlist_unhashed(&mle->master_hash_node))
325  hlist_del_init(&mle->master_hash_node);
326 }
327 
328 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
329 {
330  struct hlist_head *bucket;
331 
333 
334  bucket = dlm_master_hash(dlm, mle->mnamehash);
335  hlist_add_head(&mle->master_hash_node, bucket);
336 }
337 
338 /* returns 1 if found, 0 if not */
339 static int dlm_find_mle(struct dlm_ctxt *dlm,
340  struct dlm_master_list_entry **mle,
341  char *name, unsigned int namelen)
342 {
343  struct dlm_master_list_entry *tmpmle;
344  struct hlist_head *bucket;
345  struct hlist_node *list;
346  unsigned int hash;
347 
349 
350  hash = dlm_lockid_hash(name, namelen);
351  bucket = dlm_master_hash(dlm, hash);
352  hlist_for_each(list, bucket) {
353  tmpmle = hlist_entry(list, struct dlm_master_list_entry,
354  master_hash_node);
355  if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
356  continue;
357  dlm_get_mle(tmpmle);
358  *mle = tmpmle;
359  return 1;
360  }
361  return 0;
362 }
363 
364 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
365 {
366  struct dlm_master_list_entry *mle;
367 
369 
371  if (node_up)
372  dlm_mle_node_up(dlm, mle, NULL, idx);
373  else
374  dlm_mle_node_down(dlm, mle, NULL, idx);
375  }
376 }
377 
378 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
379  struct dlm_master_list_entry *mle,
380  struct o2nm_node *node, int idx)
381 {
382  spin_lock(&mle->spinlock);
383 
384  if (!test_bit(idx, mle->node_map))
385  mlog(0, "node %u already removed from nodemap!\n", idx);
386  else
387  clear_bit(idx, mle->node_map);
388 
389  spin_unlock(&mle->spinlock);
390 }
391 
392 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
393  struct dlm_master_list_entry *mle,
394  struct o2nm_node *node, int idx)
395 {
396  spin_lock(&mle->spinlock);
397 
398  if (test_bit(idx, mle->node_map))
399  mlog(0, "node %u already in node map!\n", idx);
400  else
401  set_bit(idx, mle->node_map);
402 
403  spin_unlock(&mle->spinlock);
404 }
405 
406 
408 {
409  dlm_mle_cache = kmem_cache_create("o2dlm_mle",
410  sizeof(struct dlm_master_list_entry),
412  NULL);
413  if (dlm_mle_cache == NULL)
414  return -ENOMEM;
415  return 0;
416 }
417 
419 {
420  if (dlm_mle_cache)
421  kmem_cache_destroy(dlm_mle_cache);
422 }
423 
424 static void dlm_mle_release(struct kref *kref)
425 {
426  struct dlm_master_list_entry *mle;
427  struct dlm_ctxt *dlm;
428 
429  mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
430  dlm = mle->dlm;
431 
434 
435  mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
436  mle->type);
437 
438  /* remove from list if not already */
439  __dlm_unlink_mle(dlm, mle);
440 
441  /* detach the mle from the domain node up/down events */
442  __dlm_mle_detach_hb_events(dlm, mle);
443 
444  atomic_dec(&dlm->mle_cur_count[mle->type]);
445 
446  /* NOTE: kfree under spinlock here.
447  * if this is bad, we can move this to a freelist. */
448  kmem_cache_free(dlm_mle_cache, mle);
449 }
450 
451 
452 /*
453  * LOCK RESOURCE FUNCTIONS
454  */
455 
457 {
458  dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
459  sizeof(struct dlm_lock_resource),
461  if (!dlm_lockres_cache)
462  goto bail;
463 
464  dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
467  if (!dlm_lockname_cache)
468  goto bail;
469 
470  return 0;
471 bail:
473  return -ENOMEM;
474 }
475 
477 {
478  if (dlm_lockname_cache)
479  kmem_cache_destroy(dlm_lockname_cache);
480 
481  if (dlm_lockres_cache)
482  kmem_cache_destroy(dlm_lockres_cache);
483 }
484 
485 static void dlm_lockres_release(struct kref *kref)
486 {
487  struct dlm_lock_resource *res;
488  struct dlm_ctxt *dlm;
489 
490  res = container_of(kref, struct dlm_lock_resource, refs);
491  dlm = res->dlm;
492 
493  /* This should not happen -- all lockres' have a name
494  * associated with them at init time. */
495  BUG_ON(!res->lockname.name);
496 
497  mlog(0, "destroying lockres %.*s\n", res->lockname.len,
498  res->lockname.name);
499 
500  spin_lock(&dlm->track_lock);
501  if (!list_empty(&res->tracking))
502  list_del_init(&res->tracking);
503  else {
504  mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
505  res->lockname.len, res->lockname.name);
507  }
508  spin_unlock(&dlm->track_lock);
509 
510  atomic_dec(&dlm->res_cur_count);
511 
512  if (!hlist_unhashed(&res->hash_node) ||
513  !list_empty(&res->granted) ||
514  !list_empty(&res->converting) ||
515  !list_empty(&res->blocked) ||
516  !list_empty(&res->dirty) ||
517  !list_empty(&res->recovering) ||
518  !list_empty(&res->purge)) {
519  mlog(ML_ERROR,
520  "Going to BUG for resource %.*s."
521  " We're on a list! [%c%c%c%c%c%c%c]\n",
522  res->lockname.len, res->lockname.name,
523  !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
524  !list_empty(&res->granted) ? 'G' : ' ',
525  !list_empty(&res->converting) ? 'C' : ' ',
526  !list_empty(&res->blocked) ? 'B' : ' ',
527  !list_empty(&res->dirty) ? 'D' : ' ',
528  !list_empty(&res->recovering) ? 'R' : ' ',
529  !list_empty(&res->purge) ? 'P' : ' ');
530 
532  }
533 
534  /* By the time we're ready to blow this guy away, we shouldn't
535  * be on any lists. */
536  BUG_ON(!hlist_unhashed(&res->hash_node));
537  BUG_ON(!list_empty(&res->granted));
538  BUG_ON(!list_empty(&res->converting));
539  BUG_ON(!list_empty(&res->blocked));
540  BUG_ON(!list_empty(&res->dirty));
541  BUG_ON(!list_empty(&res->recovering));
542  BUG_ON(!list_empty(&res->purge));
543 
544  kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
545 
546  kmem_cache_free(dlm_lockres_cache, res);
547 }
548 
550 {
551  kref_put(&res->refs, dlm_lockres_release);
552 }
553 
554 static void dlm_init_lockres(struct dlm_ctxt *dlm,
555  struct dlm_lock_resource *res,
556  const char *name, unsigned int namelen)
557 {
558  char *qname;
559 
560  /* If we memset here, we lose our reference to the kmalloc'd
561  * res->lockname.name, so be sure to init every field
562  * correctly! */
563 
564  qname = (char *) res->lockname.name;
565  memcpy(qname, name, namelen);
566 
567  res->lockname.len = namelen;
568  res->lockname.hash = dlm_lockid_hash(name, namelen);
569 
570  init_waitqueue_head(&res->wq);
571  spin_lock_init(&res->spinlock);
572  INIT_HLIST_NODE(&res->hash_node);
573  INIT_LIST_HEAD(&res->granted);
574  INIT_LIST_HEAD(&res->converting);
575  INIT_LIST_HEAD(&res->blocked);
576  INIT_LIST_HEAD(&res->dirty);
577  INIT_LIST_HEAD(&res->recovering);
578  INIT_LIST_HEAD(&res->purge);
579  INIT_LIST_HEAD(&res->tracking);
580  atomic_set(&res->asts_reserved, 0);
581  res->migration_pending = 0;
582  res->inflight_locks = 0;
583 
584  res->dlm = dlm;
585 
586  kref_init(&res->refs);
587 
588  atomic_inc(&dlm->res_tot_count);
589  atomic_inc(&dlm->res_cur_count);
590 
591  /* just for consistency */
592  spin_lock(&res->spinlock);
593  dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
594  spin_unlock(&res->spinlock);
595 
597 
598  res->last_used = 0;
599 
600  spin_lock(&dlm->spinlock);
601  list_add_tail(&res->tracking, &dlm->tracking_list);
602  spin_unlock(&dlm->spinlock);
603 
604  memset(res->lvb, 0, DLM_LVB_LEN);
605  memset(res->refmap, 0, sizeof(res->refmap));
606 }
607 
609  const char *name,
610  unsigned int namelen)
611 {
612  struct dlm_lock_resource *res = NULL;
613 
614  res = kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
615  if (!res)
616  goto error;
617 
618  res->lockname.name = kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
619  if (!res->lockname.name)
620  goto error;
621 
622  dlm_init_lockres(dlm, res, name, namelen);
623  return res;
624 
625 error:
626  if (res && res->lockname.name)
627  kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
628 
629  if (res)
630  kmem_cache_free(dlm_lockres_cache, res);
631  return NULL;
632 }
633 
635  struct dlm_lock_resource *res, int bit)
636 {
638 
639  mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
640  res->lockname.name, bit, __builtin_return_address(0));
641 
642  set_bit(bit, res->refmap);
643 }
644 
646  struct dlm_lock_resource *res, int bit)
647 {
649 
650  mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
651  res->lockname.name, bit, __builtin_return_address(0));
652 
653  clear_bit(bit, res->refmap);
654 }
655 
656 
658  struct dlm_lock_resource *res)
659 {
661 
662  res->inflight_locks++;
663 
664  mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
665  res->lockname.len, res->lockname.name, res->inflight_locks,
666  __builtin_return_address(0));
667 }
668 
670  struct dlm_lock_resource *res)
671 {
673 
674  BUG_ON(res->inflight_locks == 0);
675 
676  res->inflight_locks--;
677 
678  mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
679  res->lockname.len, res->lockname.name, res->inflight_locks,
680  __builtin_return_address(0));
681 
682  wake_up(&res->wq);
683 }
684 
685 /*
686  * lookup a lock resource by name.
687  * may already exist in the hashtable.
688  * lockid is null terminated
689  *
690  * if not, allocate enough for the lockres and for
691  * the temporary structure used in doing the mastering.
692  *
693  * also, do a lookup in the dlm->master_list to see
694  * if another node has begun mastering the same lock.
695  * if so, there should be a block entry in there
696  * for this name, and we should *not* attempt to master
697  * the lock here. need to wait around for that node
698  * to assert_master (or die).
699  *
700  */
702  const char *lockid,
703  int namelen,
704  int flags)
705 {
706  struct dlm_lock_resource *tmpres=NULL, *res=NULL;
707  struct dlm_master_list_entry *mle = NULL;
708  struct dlm_master_list_entry *alloc_mle = NULL;
709  int blocked = 0;
710  int ret, nodenum;
711  struct dlm_node_iter iter;
712  unsigned int hash;
713  int tries = 0;
714  int bit, wait_on_recovery = 0;
715 
716  BUG_ON(!lockid);
717 
718  hash = dlm_lockid_hash(lockid, namelen);
719 
720  mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
721 
722 lookup:
723  spin_lock(&dlm->spinlock);
724  tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
725  if (tmpres) {
726  spin_unlock(&dlm->spinlock);
727  spin_lock(&tmpres->spinlock);
728  /* Wait on the thread that is mastering the resource */
729  if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
730  __dlm_wait_on_lockres(tmpres);
732  spin_unlock(&tmpres->spinlock);
733  dlm_lockres_put(tmpres);
734  tmpres = NULL;
735  goto lookup;
736  }
737 
738  /* Wait on the resource purge to complete before continuing */
739  if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
740  BUG_ON(tmpres->owner == dlm->node_num);
743  spin_unlock(&tmpres->spinlock);
744  dlm_lockres_put(tmpres);
745  tmpres = NULL;
746  goto lookup;
747  }
748 
749  /* Grab inflight ref to pin the resource */
750  dlm_lockres_grab_inflight_ref(dlm, tmpres);
751 
752  spin_unlock(&tmpres->spinlock);
753  if (res)
754  dlm_lockres_put(res);
755  res = tmpres;
756  goto leave;
757  }
758 
759  if (!res) {
760  spin_unlock(&dlm->spinlock);
761  mlog(0, "allocating a new resource\n");
762  /* nothing found and we need to allocate one. */
763  alloc_mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
764  if (!alloc_mle)
765  goto leave;
766  res = dlm_new_lockres(dlm, lockid, namelen);
767  if (!res)
768  goto leave;
769  goto lookup;
770  }
771 
772  mlog(0, "no lockres found, allocated our own: %p\n", res);
773 
774  if (flags & LKM_LOCAL) {
775  /* caller knows it's safe to assume it's not mastered elsewhere
776  * DONE! return right away */
777  spin_lock(&res->spinlock);
778  dlm_change_lockres_owner(dlm, res, dlm->node_num);
779  __dlm_insert_lockres(dlm, res);
781  spin_unlock(&res->spinlock);
782  spin_unlock(&dlm->spinlock);
783  /* lockres still marked IN_PROGRESS */
784  goto wake_waiters;
785  }
786 
787  /* check master list to see if another node has started mastering it */
788  spin_lock(&dlm->master_lock);
789 
790  /* if we found a block, wait for lock to be mastered by another node */
791  blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
792  if (blocked) {
793  int mig;
794  if (mle->type == DLM_MLE_MASTER) {
795  mlog(ML_ERROR, "master entry for nonexistent lock!\n");
796  BUG();
797  }
798  mig = (mle->type == DLM_MLE_MIGRATION);
799  /* if there is a migration in progress, let the migration
800  * finish before continuing. we can wait for the absence
801  * of the MIGRATION mle: either the migrate finished or
802  * one of the nodes died and the mle was cleaned up.
803  * if there is a BLOCK here, but it already has a master
804  * set, we are too late. the master does not have a ref
805  * for us in the refmap. detach the mle and drop it.
806  * either way, go back to the top and start over. */
807  if (mig || mle->master != O2NM_MAX_NODES) {
808  BUG_ON(mig && mle->master == dlm->node_num);
809  /* we arrived too late. the master does not
810  * have a ref for us. retry. */
811  mlog(0, "%s:%.*s: late on %s\n",
812  dlm->name, namelen, lockid,
813  mig ? "MIGRATION" : "BLOCK");
814  spin_unlock(&dlm->master_lock);
815  spin_unlock(&dlm->spinlock);
816 
817  /* master is known, detach */
818  if (!mig)
819  dlm_mle_detach_hb_events(dlm, mle);
820  dlm_put_mle(mle);
821  mle = NULL;
822  /* this is lame, but we can't wait on either
823  * the mle or lockres waitqueue here */
824  if (mig)
825  msleep(100);
826  goto lookup;
827  }
828  } else {
829  /* go ahead and try to master lock on this node */
830  mle = alloc_mle;
831  /* make sure this does not get freed below */
832  alloc_mle = NULL;
833  dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
834  set_bit(dlm->node_num, mle->maybe_map);
835  __dlm_insert_mle(dlm, mle);
836 
837  /* still holding the dlm spinlock, check the recovery map
838  * to see if there are any nodes that still need to be
839  * considered. these will not appear in the mle nodemap
840  * but they might own this lockres. wait on them. */
841  bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
842  if (bit < O2NM_MAX_NODES) {
843  mlog(0, "%s: res %.*s, At least one node (%d) "
844  "to recover before lock mastery can begin\n",
845  dlm->name, namelen, (char *)lockid, bit);
846  wait_on_recovery = 1;
847  }
848  }
849 
850  /* at this point there is either a DLM_MLE_BLOCK or a
851  * DLM_MLE_MASTER on the master list, so it's safe to add the
852  * lockres to the hashtable. anyone who finds the lock will
853  * still have to wait on the IN_PROGRESS. */
854 
855  /* finally add the lockres to its hash bucket */
856  __dlm_insert_lockres(dlm, res);
857 
858  /* Grab inflight ref to pin the resource */
859  spin_lock(&res->spinlock);
861  spin_unlock(&res->spinlock);
862 
863  /* get an extra ref on the mle in case this is a BLOCK
864  * if so, the creator of the BLOCK may try to put the last
865  * ref at this time in the assert master handler, so we
866  * need an extra one to keep from a bad ptr deref. */
867  dlm_get_mle_inuse(mle);
868  spin_unlock(&dlm->master_lock);
869  spin_unlock(&dlm->spinlock);
870 
871 redo_request:
872  while (wait_on_recovery) {
873  /* any cluster changes that occurred after dropping the
874  * dlm spinlock would be detectable be a change on the mle,
875  * so we only need to clear out the recovery map once. */
876  if (dlm_is_recovery_lock(lockid, namelen)) {
877  mlog(0, "%s: Recovery map is not empty, but must "
878  "master $RECOVERY lock now\n", dlm->name);
879  if (!dlm_pre_master_reco_lockres(dlm, res))
880  wait_on_recovery = 0;
881  else {
882  mlog(0, "%s: waiting 500ms for heartbeat state "
883  "change\n", dlm->name);
884  msleep(500);
885  }
886  continue;
887  }
888 
890  msleep(1000);
892 
893  spin_lock(&dlm->spinlock);
894  bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
895  if (bit < O2NM_MAX_NODES) {
896  mlog(0, "%s: res %.*s, At least one node (%d) "
897  "to recover before lock mastery can begin\n",
898  dlm->name, namelen, (char *)lockid, bit);
899  wait_on_recovery = 1;
900  } else
901  wait_on_recovery = 0;
902  spin_unlock(&dlm->spinlock);
903 
904  if (wait_on_recovery)
905  dlm_wait_for_node_recovery(dlm, bit, 10000);
906  }
907 
908  /* must wait for lock to be mastered elsewhere */
909  if (blocked)
910  goto wait;
911 
912  ret = -EINVAL;
913  dlm_node_iter_init(mle->vote_map, &iter);
914  while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
915  ret = dlm_do_master_request(res, mle, nodenum);
916  if (ret < 0)
917  mlog_errno(ret);
918  if (mle->master != O2NM_MAX_NODES) {
919  /* found a master ! */
920  if (mle->master <= nodenum)
921  break;
922  /* if our master request has not reached the master
923  * yet, keep going until it does. this is how the
924  * master will know that asserts are needed back to
925  * the lower nodes. */
926  mlog(0, "%s: res %.*s, Requests only up to %u but "
927  "master is %u, keep going\n", dlm->name, namelen,
928  lockid, nodenum, mle->master);
929  }
930  }
931 
932 wait:
933  /* keep going until the response map includes all nodes */
934  ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
935  if (ret < 0) {
936  wait_on_recovery = 1;
937  mlog(0, "%s: res %.*s, Node map changed, redo the master "
938  "request now, blocked=%d\n", dlm->name, res->lockname.len,
939  res->lockname.name, blocked);
940  if (++tries > 20) {
941  mlog(ML_ERROR, "%s: res %.*s, Spinning on "
942  "dlm_wait_for_lock_mastery, blocked = %d\n",
943  dlm->name, res->lockname.len,
944  res->lockname.name, blocked);
946  dlm_print_one_mle(mle);
947  tries = 0;
948  }
949  goto redo_request;
950  }
951 
952  mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
953  res->lockname.name, res->owner);
954  /* make sure we never continue without this */
955  BUG_ON(res->owner == O2NM_MAX_NODES);
956 
957  /* master is known, detach if not already detached */
958  dlm_mle_detach_hb_events(dlm, mle);
959  dlm_put_mle(mle);
960  /* put the extra ref */
961  dlm_put_mle_inuse(mle);
962 
963 wake_waiters:
964  spin_lock(&res->spinlock);
965  res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
966  spin_unlock(&res->spinlock);
967  wake_up(&res->wq);
968 
969 leave:
970  /* need to free the unused mle */
971  if (alloc_mle)
972  kmem_cache_free(dlm_mle_cache, alloc_mle);
973 
974  return res;
975 }
976 
977 
978 #define DLM_MASTERY_TIMEOUT_MS 5000
979 
980 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
981  struct dlm_lock_resource *res,
982  struct dlm_master_list_entry *mle,
983  int *blocked)
984 {
985  u8 m;
986  int ret, bit;
987  int map_changed, voting_done;
988  int assert, sleep;
989 
990 recheck:
991  ret = 0;
992  assert = 0;
993 
994  /* check if another node has already become the owner */
995  spin_lock(&res->spinlock);
996  if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
997  mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
998  res->lockname.len, res->lockname.name, res->owner);
999  spin_unlock(&res->spinlock);
1000  /* this will cause the master to re-assert across
1001  * the whole cluster, freeing up mles */
1002  if (res->owner != dlm->node_num) {
1003  ret = dlm_do_master_request(res, mle, res->owner);
1004  if (ret < 0) {
1005  /* give recovery a chance to run */
1006  mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1007  msleep(500);
1008  goto recheck;
1009  }
1010  }
1011  ret = 0;
1012  goto leave;
1013  }
1014  spin_unlock(&res->spinlock);
1015 
1016  spin_lock(&mle->spinlock);
1017  m = mle->master;
1018  map_changed = (memcmp(mle->vote_map, mle->node_map,
1019  sizeof(mle->vote_map)) != 0);
1020  voting_done = (memcmp(mle->vote_map, mle->response_map,
1021  sizeof(mle->vote_map)) == 0);
1022 
1023  /* restart if we hit any errors */
1024  if (map_changed) {
1025  int b;
1026  mlog(0, "%s: %.*s: node map changed, restarting\n",
1027  dlm->name, res->lockname.len, res->lockname.name);
1028  ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1029  b = (mle->type == DLM_MLE_BLOCK);
1030  if ((*blocked && !b) || (!*blocked && b)) {
1031  mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1032  dlm->name, res->lockname.len, res->lockname.name,
1033  *blocked, b);
1034  *blocked = b;
1035  }
1036  spin_unlock(&mle->spinlock);
1037  if (ret < 0) {
1038  mlog_errno(ret);
1039  goto leave;
1040  }
1041  mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1042  "rechecking now\n", dlm->name, res->lockname.len,
1043  res->lockname.name);
1044  goto recheck;
1045  } else {
1046  if (!voting_done) {
1047  mlog(0, "map not changed and voting not done "
1048  "for %s:%.*s\n", dlm->name, res->lockname.len,
1049  res->lockname.name);
1050  }
1051  }
1052 
1053  if (m != O2NM_MAX_NODES) {
1054  /* another node has done an assert!
1055  * all done! */
1056  sleep = 0;
1057  } else {
1058  sleep = 1;
1059  /* have all nodes responded? */
1060  if (voting_done && !*blocked) {
1061  bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1062  if (dlm->node_num <= bit) {
1063  /* my node number is lowest.
1064  * now tell other nodes that I am
1065  * mastering this. */
1066  mle->master = dlm->node_num;
1067  /* ref was grabbed in get_lock_resource
1068  * will be dropped in dlmlock_master */
1069  assert = 1;
1070  sleep = 0;
1071  }
1072  /* if voting is done, but we have not received
1073  * an assert master yet, we must sleep */
1074  }
1075  }
1076 
1077  spin_unlock(&mle->spinlock);
1078 
1079  /* sleep if we haven't finished voting yet */
1080  if (sleep) {
1081  unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1082 
1083  /*
1084  if (atomic_read(&mle->mle_refs.refcount) < 2)
1085  mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1086  atomic_read(&mle->mle_refs.refcount),
1087  res->lockname.len, res->lockname.name);
1088  */
1089  atomic_set(&mle->woken, 0);
1090  (void)wait_event_timeout(mle->wq,
1091  (atomic_read(&mle->woken) == 1),
1092  timeo);
1093  if (res->owner == O2NM_MAX_NODES) {
1094  mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1095  res->lockname.len, res->lockname.name);
1096  goto recheck;
1097  }
1098  mlog(0, "done waiting, master is %u\n", res->owner);
1099  ret = 0;
1100  goto leave;
1101  }
1102 
1103  ret = 0; /* done */
1104  if (assert) {
1105  m = dlm->node_num;
1106  mlog(0, "about to master %.*s here, this=%u\n",
1107  res->lockname.len, res->lockname.name, m);
1108  ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1109  if (ret) {
1110  /* This is a failure in the network path,
1111  * not in the response to the assert_master
1112  * (any nonzero response is a BUG on this node).
1113  * Most likely a socket just got disconnected
1114  * due to node death. */
1115  mlog_errno(ret);
1116  }
1117  /* no longer need to restart lock mastery.
1118  * all living nodes have been contacted. */
1119  ret = 0;
1120  }
1121 
1122  /* set the lockres owner */
1123  spin_lock(&res->spinlock);
1124  /* mastery reference obtained either during
1125  * assert_master_handler or in get_lock_resource */
1126  dlm_change_lockres_owner(dlm, res, m);
1127  spin_unlock(&res->spinlock);
1128 
1129 leave:
1130  return ret;
1131 }
1132 
1134 {
1135  int curnode;
1136  unsigned long *orig_bm;
1137  unsigned long *cur_bm;
1139 };
1140 
1142 {
1146 };
1147 
1148 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1149  unsigned long *orig_bm,
1150  unsigned long *cur_bm)
1151 {
1152  unsigned long p1, p2;
1153  int i;
1154 
1155  iter->curnode = -1;
1156  iter->orig_bm = orig_bm;
1157  iter->cur_bm = cur_bm;
1158 
1159  for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1160  p1 = *(iter->orig_bm + i);
1161  p2 = *(iter->cur_bm + i);
1162  iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1163  }
1164 }
1165 
1166 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1168 {
1169  int bit;
1170 
1171  if (iter->curnode >= O2NM_MAX_NODES)
1172  return -ENOENT;
1173 
1174  bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1175  iter->curnode+1);
1176  if (bit >= O2NM_MAX_NODES) {
1177  iter->curnode = O2NM_MAX_NODES;
1178  return -ENOENT;
1179  }
1180 
1181  /* if it was there in the original then this node died */
1182  if (test_bit(bit, iter->orig_bm))
1183  *state = NODE_DOWN;
1184  else
1185  *state = NODE_UP;
1186 
1187  iter->curnode = bit;
1188  return bit;
1189 }
1190 
1191 
1192 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1193  struct dlm_lock_resource *res,
1194  struct dlm_master_list_entry *mle,
1195  int blocked)
1196 {
1197  struct dlm_bitmap_diff_iter bdi;
1199  int node;
1200  int ret = 0;
1201 
1202  mlog(0, "something happened such that the "
1203  "master process may need to be restarted!\n");
1204 
1206 
1207  dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1208  node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1209  while (node >= 0) {
1210  if (sc == NODE_UP) {
1211  /* a node came up. clear any old vote from
1212  * the response map and set it in the vote map
1213  * then restart the mastery. */
1214  mlog(ML_NOTICE, "node %d up while restarting\n", node);
1215 
1216  /* redo the master request, but only for the new node */
1217  mlog(0, "sending request to new node\n");
1218  clear_bit(node, mle->response_map);
1219  set_bit(node, mle->vote_map);
1220  } else {
1221  mlog(ML_ERROR, "node down! %d\n", node);
1222  if (blocked) {
1223  int lowest = find_next_bit(mle->maybe_map,
1224  O2NM_MAX_NODES, 0);
1225 
1226  /* act like it was never there */
1227  clear_bit(node, mle->maybe_map);
1228 
1229  if (node == lowest) {
1230  mlog(0, "expected master %u died"
1231  " while this node was blocked "
1232  "waiting on it!\n", node);
1233  lowest = find_next_bit(mle->maybe_map,
1235  lowest+1);
1236  if (lowest < O2NM_MAX_NODES) {
1237  mlog(0, "%s:%.*s:still "
1238  "blocked. waiting on %u "
1239  "now\n", dlm->name,
1240  res->lockname.len,
1241  res->lockname.name,
1242  lowest);
1243  } else {
1244  /* mle is an MLE_BLOCK, but
1245  * there is now nothing left to
1246  * block on. we need to return
1247  * all the way back out and try
1248  * again with an MLE_MASTER.
1249  * dlm_do_local_recovery_cleanup
1250  * has already run, so the mle
1251  * refcount is ok */
1252  mlog(0, "%s:%.*s: no "
1253  "longer blocking. try to "
1254  "master this here\n",
1255  dlm->name,
1256  res->lockname.len,
1257  res->lockname.name);
1258  mle->type = DLM_MLE_MASTER;
1259  mle->mleres = res;
1260  }
1261  }
1262  }
1263 
1264  /* now blank out everything, as if we had never
1265  * contacted anyone */
1266  memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1267  memset(mle->response_map, 0, sizeof(mle->response_map));
1268  /* reset the vote_map to the current node_map */
1269  memcpy(mle->vote_map, mle->node_map,
1270  sizeof(mle->node_map));
1271  /* put myself into the maybe map */
1272  if (mle->type != DLM_MLE_BLOCK)
1273  set_bit(dlm->node_num, mle->maybe_map);
1274  }
1275  ret = -EAGAIN;
1276  node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1277  }
1278  return ret;
1279 }
1280 
1281 
1282 /*
1283  * DLM_MASTER_REQUEST_MSG
1284  *
1285  * returns: 0 on success,
1286  * -errno on a network error
1287  *
1288  * on error, the caller should assume the target node is "dead"
1289  *
1290  */
1291 
1292 static int dlm_do_master_request(struct dlm_lock_resource *res,
1293  struct dlm_master_list_entry *mle, int to)
1294 {
1295  struct dlm_ctxt *dlm = mle->dlm;
1296  struct dlm_master_request request;
1297  int ret, response=0, resend;
1298 
1299  memset(&request, 0, sizeof(request));
1300  request.node_idx = dlm->node_num;
1301 
1302  BUG_ON(mle->type == DLM_MLE_MIGRATION);
1303 
1304  request.namelen = (u8)mle->mnamelen;
1305  memcpy(request.name, mle->mname, request.namelen);
1306 
1307 again:
1309  sizeof(request), to, &response);
1310  if (ret < 0) {
1311  if (ret == -ESRCH) {
1312  /* should never happen */
1313  mlog(ML_ERROR, "TCP stack not ready!\n");
1314  BUG();
1315  } else if (ret == -EINVAL) {
1316  mlog(ML_ERROR, "bad args passed to o2net!\n");
1317  BUG();
1318  } else if (ret == -ENOMEM) {
1319  mlog(ML_ERROR, "out of memory while trying to send "
1320  "network message! retrying\n");
1321  /* this is totally crude */
1322  msleep(50);
1323  goto again;
1324  } else if (!dlm_is_host_down(ret)) {
1325  /* not a network error. bad. */
1326  mlog_errno(ret);
1327  mlog(ML_ERROR, "unhandled error!");
1328  BUG();
1329  }
1330  /* all other errors should be network errors,
1331  * and likely indicate node death */
1332  mlog(ML_ERROR, "link to %d went down!\n", to);
1333  goto out;
1334  }
1335 
1336  ret = 0;
1337  resend = 0;
1338  spin_lock(&mle->spinlock);
1339  switch (response) {
1340  case DLM_MASTER_RESP_YES:
1341  set_bit(to, mle->response_map);
1342  mlog(0, "node %u is the master, response=YES\n", to);
1343  mlog(0, "%s:%.*s: master node %u now knows I have a "
1344  "reference\n", dlm->name, res->lockname.len,
1345  res->lockname.name, to);
1346  mle->master = to;
1347  break;
1348  case DLM_MASTER_RESP_NO:
1349  mlog(0, "node %u not master, response=NO\n", to);
1350  set_bit(to, mle->response_map);
1351  break;
1352  case DLM_MASTER_RESP_MAYBE:
1353  mlog(0, "node %u not master, response=MAYBE\n", to);
1354  set_bit(to, mle->response_map);
1355  set_bit(to, mle->maybe_map);
1356  break;
1357  case DLM_MASTER_RESP_ERROR:
1358  mlog(0, "node %u hit an error, resending\n", to);
1359  resend = 1;
1360  response = 0;
1361  break;
1362  default:
1363  mlog(ML_ERROR, "bad response! %u\n", response);
1364  BUG();
1365  }
1366  spin_unlock(&mle->spinlock);
1367  if (resend) {
1368  /* this is also totally crude */
1369  msleep(50);
1370  goto again;
1371  }
1372 
1373 out:
1374  return ret;
1375 }
1376 
1377 /*
1378  * locks that can be taken here:
1379  * dlm->spinlock
1380  * res->spinlock
1381  * mle->spinlock
1382  * dlm->master_list
1383  *
1384  * if possible, TRIM THIS DOWN!!!
1385  */
1387  void **ret_data)
1388 {
1389  u8 response = DLM_MASTER_RESP_MAYBE;
1390  struct dlm_ctxt *dlm = data;
1391  struct dlm_lock_resource *res = NULL;
1392  struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1393  struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1394  char *name;
1395  unsigned int namelen, hash;
1396  int found, ret;
1397  int set_maybe;
1398  int dispatch_assert = 0;
1399 
1400  if (!dlm_grab(dlm))
1401  return DLM_MASTER_RESP_NO;
1402 
1403  if (!dlm_domain_fully_joined(dlm)) {
1404  response = DLM_MASTER_RESP_NO;
1405  goto send_response;
1406  }
1407 
1408  name = request->name;
1409  namelen = request->namelen;
1410  hash = dlm_lockid_hash(name, namelen);
1411 
1412  if (namelen > DLM_LOCKID_NAME_MAX) {
1413  response = DLM_IVBUFLEN;
1414  goto send_response;
1415  }
1416 
1417 way_up_top:
1418  spin_lock(&dlm->spinlock);
1419  res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1420  if (res) {
1421  spin_unlock(&dlm->spinlock);
1422 
1423  /* take care of the easy cases up front */
1424  spin_lock(&res->spinlock);
1425  if (res->state & (DLM_LOCK_RES_RECOVERING|
1427  spin_unlock(&res->spinlock);
1428  mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1429  "being recovered/migrated\n");
1430  response = DLM_MASTER_RESP_ERROR;
1431  if (mle)
1432  kmem_cache_free(dlm_mle_cache, mle);
1433  goto send_response;
1434  }
1435 
1436  if (res->owner == dlm->node_num) {
1437  dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1438  spin_unlock(&res->spinlock);
1439  response = DLM_MASTER_RESP_YES;
1440  if (mle)
1441  kmem_cache_free(dlm_mle_cache, mle);
1442 
1443  /* this node is the owner.
1444  * there is some extra work that needs to
1445  * happen now. the requesting node has
1446  * caused all nodes up to this one to
1447  * create mles. this node now needs to
1448  * go back and clean those up. */
1449  dispatch_assert = 1;
1450  goto send_response;
1451  } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1452  spin_unlock(&res->spinlock);
1453  // mlog(0, "node %u is the master\n", res->owner);
1454  response = DLM_MASTER_RESP_NO;
1455  if (mle)
1456  kmem_cache_free(dlm_mle_cache, mle);
1457  goto send_response;
1458  }
1459 
1460  /* ok, there is no owner. either this node is
1461  * being blocked, or it is actively trying to
1462  * master this lock. */
1463  if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1464  mlog(ML_ERROR, "lock with no owner should be "
1465  "in-progress!\n");
1466  BUG();
1467  }
1468 
1469  // mlog(0, "lockres is in progress...\n");
1470  spin_lock(&dlm->master_lock);
1471  found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1472  if (!found) {
1473  mlog(ML_ERROR, "no mle found for this lock!\n");
1474  BUG();
1475  }
1476  set_maybe = 1;
1477  spin_lock(&tmpmle->spinlock);
1478  if (tmpmle->type == DLM_MLE_BLOCK) {
1479  // mlog(0, "this node is waiting for "
1480  // "lockres to be mastered\n");
1481  response = DLM_MASTER_RESP_NO;
1482  } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1483  mlog(0, "node %u is master, but trying to migrate to "
1484  "node %u.\n", tmpmle->master, tmpmle->new_master);
1485  if (tmpmle->master == dlm->node_num) {
1486  mlog(ML_ERROR, "no owner on lockres, but this "
1487  "node is trying to migrate it to %u?!\n",
1488  tmpmle->new_master);
1489  BUG();
1490  } else {
1491  /* the real master can respond on its own */
1492  response = DLM_MASTER_RESP_NO;
1493  }
1494  } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1495  set_maybe = 0;
1496  if (tmpmle->master == dlm->node_num) {
1497  response = DLM_MASTER_RESP_YES;
1498  /* this node will be the owner.
1499  * go back and clean the mles on any
1500  * other nodes */
1501  dispatch_assert = 1;
1502  dlm_lockres_set_refmap_bit(dlm, res,
1503  request->node_idx);
1504  } else
1505  response = DLM_MASTER_RESP_NO;
1506  } else {
1507  // mlog(0, "this node is attempting to "
1508  // "master lockres\n");
1509  response = DLM_MASTER_RESP_MAYBE;
1510  }
1511  if (set_maybe)
1512  set_bit(request->node_idx, tmpmle->maybe_map);
1513  spin_unlock(&tmpmle->spinlock);
1514 
1515  spin_unlock(&dlm->master_lock);
1516  spin_unlock(&res->spinlock);
1517 
1518  /* keep the mle attached to heartbeat events */
1519  dlm_put_mle(tmpmle);
1520  if (mle)
1521  kmem_cache_free(dlm_mle_cache, mle);
1522  goto send_response;
1523  }
1524 
1525  /*
1526  * lockres doesn't exist on this node
1527  * if there is an MLE_BLOCK, return NO
1528  * if there is an MLE_MASTER, return MAYBE
1529  * otherwise, add an MLE_BLOCK, return NO
1530  */
1531  spin_lock(&dlm->master_lock);
1532  found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1533  if (!found) {
1534  /* this lockid has never been seen on this node yet */
1535  // mlog(0, "no mle found\n");
1536  if (!mle) {
1537  spin_unlock(&dlm->master_lock);
1538  spin_unlock(&dlm->spinlock);
1539 
1540  mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1541  if (!mle) {
1542  response = DLM_MASTER_RESP_ERROR;
1543  mlog_errno(-ENOMEM);
1544  goto send_response;
1545  }
1546  goto way_up_top;
1547  }
1548 
1549  // mlog(0, "this is second time thru, already allocated, "
1550  // "add the block.\n");
1551  dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1552  set_bit(request->node_idx, mle->maybe_map);
1553  __dlm_insert_mle(dlm, mle);
1554  response = DLM_MASTER_RESP_NO;
1555  } else {
1556  // mlog(0, "mle was found\n");
1557  set_maybe = 1;
1558  spin_lock(&tmpmle->spinlock);
1559  if (tmpmle->master == dlm->node_num) {
1560  mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1561  BUG();
1562  }
1563  if (tmpmle->type == DLM_MLE_BLOCK)
1564  response = DLM_MASTER_RESP_NO;
1565  else if (tmpmle->type == DLM_MLE_MIGRATION) {
1566  mlog(0, "migration mle was found (%u->%u)\n",
1567  tmpmle->master, tmpmle->new_master);
1568  /* real master can respond on its own */
1569  response = DLM_MASTER_RESP_NO;
1570  } else
1571  response = DLM_MASTER_RESP_MAYBE;
1572  if (set_maybe)
1573  set_bit(request->node_idx, tmpmle->maybe_map);
1574  spin_unlock(&tmpmle->spinlock);
1575  }
1576  spin_unlock(&dlm->master_lock);
1577  spin_unlock(&dlm->spinlock);
1578 
1579  if (found) {
1580  /* keep the mle attached to heartbeat events */
1581  dlm_put_mle(tmpmle);
1582  }
1583 send_response:
1584  /*
1585  * __dlm_lookup_lockres() grabbed a reference to this lockres.
1586  * The reference is released by dlm_assert_master_worker() under
1587  * the call to dlm_dispatch_assert_master(). If
1588  * dlm_assert_master_worker() isn't called, we drop it here.
1589  */
1590  if (dispatch_assert) {
1591  if (response != DLM_MASTER_RESP_YES)
1592  mlog(ML_ERROR, "invalid response %d\n", response);
1593  if (!res) {
1594  mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1595  BUG();
1596  }
1597  mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1598  dlm->node_num, res->lockname.len, res->lockname.name);
1599  ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1601  if (ret < 0) {
1602  mlog(ML_ERROR, "failed to dispatch assert master work\n");
1603  response = DLM_MASTER_RESP_ERROR;
1604  dlm_lockres_put(res);
1605  }
1606  } else {
1607  if (res)
1608  dlm_lockres_put(res);
1609  }
1610 
1611  dlm_put(dlm);
1612  return response;
1613 }
1614 
1615 /*
1616  * DLM_ASSERT_MASTER_MSG
1617  */
1618 
1619 
1620 /*
1621  * NOTE: this can be used for debugging
1622  * can periodically run all locks owned by this node
1623  * and re-assert across the cluster...
1624  */
1625 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1626  struct dlm_lock_resource *res,
1627  void *nodemap, u32 flags)
1628 {
1629  struct dlm_assert_master assert;
1630  int to, tmpret;
1631  struct dlm_node_iter iter;
1632  int ret = 0;
1633  int reassert;
1634  const char *lockname = res->lockname.name;
1635  unsigned int namelen = res->lockname.len;
1636 
1637  BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1638 
1639  spin_lock(&res->spinlock);
1641  spin_unlock(&res->spinlock);
1642 
1643 again:
1644  reassert = 0;
1645 
1646  /* note that if this nodemap is empty, it returns 0 */
1647  dlm_node_iter_init(nodemap, &iter);
1648  while ((to = dlm_node_iter_next(&iter)) >= 0) {
1649  int r = 0;
1650  struct dlm_master_list_entry *mle = NULL;
1651 
1652  mlog(0, "sending assert master to %d (%.*s)\n", to,
1653  namelen, lockname);
1654  memset(&assert, 0, sizeof(assert));
1655  assert.node_idx = dlm->node_num;
1656  assert.namelen = namelen;
1657  memcpy(assert.name, lockname, namelen);
1658  assert.flags = cpu_to_be32(flags);
1659 
1661  &assert, sizeof(assert), to, &r);
1662  if (tmpret < 0) {
1663  mlog(ML_ERROR, "Error %d when sending message %u (key "
1664  "0x%x) to node %u\n", tmpret,
1665  DLM_ASSERT_MASTER_MSG, dlm->key, to);
1666  if (!dlm_is_host_down(tmpret)) {
1667  mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1668  BUG();
1669  }
1670  /* a node died. finish out the rest of the nodes. */
1671  mlog(0, "link to %d went down!\n", to);
1672  /* any nonzero status return will do */
1673  ret = tmpret;
1674  r = 0;
1675  } else if (r < 0) {
1676  /* ok, something horribly messed. kill thyself. */
1677  mlog(ML_ERROR,"during assert master of %.*s to %u, "
1678  "got %d.\n", namelen, lockname, to, r);
1679  spin_lock(&dlm->spinlock);
1680  spin_lock(&dlm->master_lock);
1681  if (dlm_find_mle(dlm, &mle, (char *)lockname,
1682  namelen)) {
1683  dlm_print_one_mle(mle);
1684  __dlm_put_mle(mle);
1685  }
1686  spin_unlock(&dlm->master_lock);
1687  spin_unlock(&dlm->spinlock);
1688  BUG();
1689  }
1690 
1691  if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1693  mlog(ML_ERROR, "%.*s: very strange, "
1694  "master MLE but no lockres on %u\n",
1695  namelen, lockname, to);
1696  }
1697 
1698  if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1699  mlog(0, "%.*s: node %u create mles on other "
1700  "nodes and requests a re-assert\n",
1701  namelen, lockname, to);
1702  reassert = 1;
1703  }
1704  if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1705  mlog(0, "%.*s: node %u has a reference to this "
1706  "lockres, set the bit in the refmap\n",
1707  namelen, lockname, to);
1708  spin_lock(&res->spinlock);
1709  dlm_lockres_set_refmap_bit(dlm, res, to);
1710  spin_unlock(&res->spinlock);
1711  }
1712  }
1713 
1714  if (reassert)
1715  goto again;
1716 
1717  spin_lock(&res->spinlock);
1719  spin_unlock(&res->spinlock);
1720  wake_up(&res->wq);
1721 
1722  return ret;
1723 }
1724 
1725 /*
1726  * locks that can be taken here:
1727  * dlm->spinlock
1728  * res->spinlock
1729  * mle->spinlock
1730  * dlm->master_list
1731  *
1732  * if possible, TRIM THIS DOWN!!!
1733  */
1735  void **ret_data)
1736 {
1737  struct dlm_ctxt *dlm = data;
1738  struct dlm_master_list_entry *mle = NULL;
1739  struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1740  struct dlm_lock_resource *res = NULL;
1741  char *name;
1742  unsigned int namelen, hash;
1743  u32 flags;
1744  int master_request = 0, have_lockres_ref = 0;
1745  int ret = 0;
1746 
1747  if (!dlm_grab(dlm))
1748  return 0;
1749 
1750  name = assert->name;
1751  namelen = assert->namelen;
1752  hash = dlm_lockid_hash(name, namelen);
1753  flags = be32_to_cpu(assert->flags);
1754 
1755  if (namelen > DLM_LOCKID_NAME_MAX) {
1756  mlog(ML_ERROR, "Invalid name length!");
1757  goto done;
1758  }
1759 
1760  spin_lock(&dlm->spinlock);
1761 
1762  if (flags)
1763  mlog(0, "assert_master with flags: %u\n", flags);
1764 
1765  /* find the MLE */
1766  spin_lock(&dlm->master_lock);
1767  if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1768  /* not an error, could be master just re-asserting */
1769  mlog(0, "just got an assert_master from %u, but no "
1770  "MLE for it! (%.*s)\n", assert->node_idx,
1771  namelen, name);
1772  } else {
1773  int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1774  if (bit >= O2NM_MAX_NODES) {
1775  /* not necessarily an error, though less likely.
1776  * could be master just re-asserting. */
1777  mlog(0, "no bits set in the maybe_map, but %u "
1778  "is asserting! (%.*s)\n", assert->node_idx,
1779  namelen, name);
1780  } else if (bit != assert->node_idx) {
1781  if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1782  mlog(0, "master %u was found, %u should "
1783  "back off\n", assert->node_idx, bit);
1784  } else {
1785  /* with the fix for bug 569, a higher node
1786  * number winning the mastery will respond
1787  * YES to mastery requests, but this node
1788  * had no way of knowing. let it pass. */
1789  mlog(0, "%u is the lowest node, "
1790  "%u is asserting. (%.*s) %u must "
1791  "have begun after %u won.\n", bit,
1792  assert->node_idx, namelen, name, bit,
1793  assert->node_idx);
1794  }
1795  }
1796  if (mle->type == DLM_MLE_MIGRATION) {
1797  if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1798  mlog(0, "%s:%.*s: got cleanup assert"
1799  " from %u for migration\n",
1800  dlm->name, namelen, name,
1801  assert->node_idx);
1802  } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1803  mlog(0, "%s:%.*s: got unrelated assert"
1804  " from %u for migration, ignoring\n",
1805  dlm->name, namelen, name,
1806  assert->node_idx);
1807  __dlm_put_mle(mle);
1808  spin_unlock(&dlm->master_lock);
1809  spin_unlock(&dlm->spinlock);
1810  goto done;
1811  }
1812  }
1813  }
1814  spin_unlock(&dlm->master_lock);
1815 
1816  /* ok everything checks out with the MLE
1817  * now check to see if there is a lockres */
1818  res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1819  if (res) {
1820  spin_lock(&res->spinlock);
1821  if (res->state & DLM_LOCK_RES_RECOVERING) {
1822  mlog(ML_ERROR, "%u asserting but %.*s is "
1823  "RECOVERING!\n", assert->node_idx, namelen, name);
1824  goto kill;
1825  }
1826  if (!mle) {
1827  if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1828  res->owner != assert->node_idx) {
1829  mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1830  "but current owner is %u! (%.*s)\n",
1831  assert->node_idx, res->owner, namelen,
1832  name);
1834  BUG();
1835  }
1836  } else if (mle->type != DLM_MLE_MIGRATION) {
1837  if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1838  /* owner is just re-asserting */
1839  if (res->owner == assert->node_idx) {
1840  mlog(0, "owner %u re-asserting on "
1841  "lock %.*s\n", assert->node_idx,
1842  namelen, name);
1843  goto ok;
1844  }
1845  mlog(ML_ERROR, "got assert_master from "
1846  "node %u, but %u is the owner! "
1847  "(%.*s)\n", assert->node_idx,
1848  res->owner, namelen, name);
1849  goto kill;
1850  }
1851  if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1852  mlog(ML_ERROR, "got assert from %u, but lock "
1853  "with no owner should be "
1854  "in-progress! (%.*s)\n",
1855  assert->node_idx,
1856  namelen, name);
1857  goto kill;
1858  }
1859  } else /* mle->type == DLM_MLE_MIGRATION */ {
1860  /* should only be getting an assert from new master */
1861  if (assert->node_idx != mle->new_master) {
1862  mlog(ML_ERROR, "got assert from %u, but "
1863  "new master is %u, and old master "
1864  "was %u (%.*s)\n",
1865  assert->node_idx, mle->new_master,
1866  mle->master, namelen, name);
1867  goto kill;
1868  }
1869 
1870  }
1871 ok:
1872  spin_unlock(&res->spinlock);
1873  }
1874 
1875  // mlog(0, "woo! got an assert_master from node %u!\n",
1876  // assert->node_idx);
1877  if (mle) {
1878  int extra_ref = 0;
1879  int nn = -1;
1880  int rr, err = 0;
1881 
1882  spin_lock(&mle->spinlock);
1883  if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1884  extra_ref = 1;
1885  else {
1886  /* MASTER mle: if any bits set in the response map
1887  * then the calling node needs to re-assert to clear
1888  * up nodes that this node contacted */
1889  while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1890  nn+1)) < O2NM_MAX_NODES) {
1891  if (nn != dlm->node_num && nn != assert->node_idx)
1892  master_request = 1;
1893  }
1894  }
1895  mle->master = assert->node_idx;
1896  atomic_set(&mle->woken, 1);
1897  wake_up(&mle->wq);
1898  spin_unlock(&mle->spinlock);
1899 
1900  if (res) {
1901  int wake = 0;
1902  spin_lock(&res->spinlock);
1903  if (mle->type == DLM_MLE_MIGRATION) {
1904  mlog(0, "finishing off migration of lockres %.*s, "
1905  "from %u to %u\n",
1906  res->lockname.len, res->lockname.name,
1907  dlm->node_num, mle->new_master);
1908  res->state &= ~DLM_LOCK_RES_MIGRATING;
1909  wake = 1;
1910  dlm_change_lockres_owner(dlm, res, mle->new_master);
1912  } else {
1913  dlm_change_lockres_owner(dlm, res, mle->master);
1914  }
1915  spin_unlock(&res->spinlock);
1916  have_lockres_ref = 1;
1917  if (wake)
1918  wake_up(&res->wq);
1919  }
1920 
1921  /* master is known, detach if not already detached.
1922  * ensures that only one assert_master call will happen
1923  * on this mle. */
1924  spin_lock(&dlm->master_lock);
1925 
1926  rr = atomic_read(&mle->mle_refs.refcount);
1927  if (mle->inuse > 0) {
1928  if (extra_ref && rr < 3)
1929  err = 1;
1930  else if (!extra_ref && rr < 2)
1931  err = 1;
1932  } else {
1933  if (extra_ref && rr < 2)
1934  err = 1;
1935  else if (!extra_ref && rr < 1)
1936  err = 1;
1937  }
1938  if (err) {
1939  mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1940  "that will mess up this node, refs=%d, extra=%d, "
1941  "inuse=%d\n", dlm->name, namelen, name,
1942  assert->node_idx, rr, extra_ref, mle->inuse);
1943  dlm_print_one_mle(mle);
1944  }
1945  __dlm_unlink_mle(dlm, mle);
1946  __dlm_mle_detach_hb_events(dlm, mle);
1947  __dlm_put_mle(mle);
1948  if (extra_ref) {
1949  /* the assert master message now balances the extra
1950  * ref given by the master / migration request message.
1951  * if this is the last put, it will be removed
1952  * from the list. */
1953  __dlm_put_mle(mle);
1954  }
1955  spin_unlock(&dlm->master_lock);
1956  } else if (res) {
1957  if (res->owner != assert->node_idx) {
1958  mlog(0, "assert_master from %u, but current "
1959  "owner is %u (%.*s), no mle\n", assert->node_idx,
1960  res->owner, namelen, name);
1961  }
1962  }
1963  spin_unlock(&dlm->spinlock);
1964 
1965 done:
1966  ret = 0;
1967  if (res) {
1968  spin_lock(&res->spinlock);
1970  spin_unlock(&res->spinlock);
1971  *ret_data = (void *)res;
1972  }
1973  dlm_put(dlm);
1974  if (master_request) {
1975  mlog(0, "need to tell master to reassert\n");
1976  /* positive. negative would shoot down the node. */
1978  if (!have_lockres_ref) {
1979  mlog(ML_ERROR, "strange, got assert from %u, MASTER "
1980  "mle present here for %s:%.*s, but no lockres!\n",
1981  assert->node_idx, dlm->name, namelen, name);
1982  }
1983  }
1984  if (have_lockres_ref) {
1985  /* let the master know we have a reference to the lockres */
1987  mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
1988  dlm->name, namelen, name, assert->node_idx);
1989  }
1990  return ret;
1991 
1992 kill:
1993  /* kill the caller! */
1994  mlog(ML_ERROR, "Bad message received from another node. Dumping state "
1995  "and killing the other node now! This node is OK and can continue.\n");
1997  spin_unlock(&res->spinlock);
1998  spin_unlock(&dlm->spinlock);
1999  *ret_data = (void *)res;
2000  dlm_put(dlm);
2001  return -EINVAL;
2002 }
2003 
2004 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2005 {
2006  struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2007 
2008  if (ret_data) {
2009  spin_lock(&res->spinlock);
2011  spin_unlock(&res->spinlock);
2012  wake_up(&res->wq);
2013  dlm_lockres_put(res);
2014  }
2015  return;
2016 }
2017 
2019  struct dlm_lock_resource *res,
2020  int ignore_higher, u8 request_from, u32 flags)
2021 {
2022  struct dlm_work_item *item;
2023  item = kzalloc(sizeof(*item), GFP_NOFS);
2024  if (!item)
2025  return -ENOMEM;
2026 
2027 
2028  /* queue up work for dlm_assert_master_worker */
2029  dlm_grab(dlm); /* get an extra ref for the work item */
2030  dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2031  item->u.am.lockres = res; /* already have a ref */
2032  /* can optionally ignore node numbers higher than this node */
2033  item->u.am.ignore_higher = ignore_higher;
2034  item->u.am.request_from = request_from;
2035  item->u.am.flags = flags;
2036 
2037  if (ignore_higher)
2038  mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2039  res->lockname.name);
2040 
2041  spin_lock(&dlm->work_lock);
2042  list_add_tail(&item->list, &dlm->work_list);
2043  spin_unlock(&dlm->work_lock);
2044 
2045  queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2046  return 0;
2047 }
2048 
2049 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2050 {
2051  struct dlm_ctxt *dlm = data;
2052  int ret = 0;
2053  struct dlm_lock_resource *res;
2054  unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2055  int ignore_higher;
2056  int bit;
2057  u8 request_from;
2058  u32 flags;
2059 
2060  dlm = item->dlm;
2061  res = item->u.am.lockres;
2062  ignore_higher = item->u.am.ignore_higher;
2063  request_from = item->u.am.request_from;
2064  flags = item->u.am.flags;
2065 
2066  spin_lock(&dlm->spinlock);
2067  memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2068  spin_unlock(&dlm->spinlock);
2069 
2070  clear_bit(dlm->node_num, nodemap);
2071  if (ignore_higher) {
2072  /* if is this just to clear up mles for nodes below
2073  * this node, do not send the message to the original
2074  * caller or any node number higher than this */
2075  clear_bit(request_from, nodemap);
2076  bit = dlm->node_num;
2077  while (1) {
2078  bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2079  bit+1);
2080  if (bit >= O2NM_MAX_NODES)
2081  break;
2082  clear_bit(bit, nodemap);
2083  }
2084  }
2085 
2086  /*
2087  * If we're migrating this lock to someone else, we are no
2088  * longer allowed to assert out own mastery. OTOH, we need to
2089  * prevent migration from starting while we're still asserting
2090  * our dominance. The reserved ast delays migration.
2091  */
2092  spin_lock(&res->spinlock);
2093  if (res->state & DLM_LOCK_RES_MIGRATING) {
2094  mlog(0, "Someone asked us to assert mastery, but we're "
2095  "in the middle of migration. Skipping assert, "
2096  "the new master will handle that.\n");
2097  spin_unlock(&res->spinlock);
2098  goto put;
2099  } else
2101  spin_unlock(&res->spinlock);
2102 
2103  /* this call now finishes out the nodemap
2104  * even if one or more nodes die */
2105  mlog(0, "worker about to master %.*s here, this=%u\n",
2106  res->lockname.len, res->lockname.name, dlm->node_num);
2107  ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2108  if (ret < 0) {
2109  /* no need to restart, we are done */
2110  if (!dlm_is_host_down(ret))
2111  mlog_errno(ret);
2112  }
2113 
2114  /* Ok, we've asserted ourselves. Let's let migration start. */
2115  dlm_lockres_release_ast(dlm, res);
2116 
2117 put:
2118  dlm_lockres_put(res);
2119 
2120  mlog(0, "finished with dlm_assert_master_worker\n");
2121 }
2122 
2123 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2124  * We cannot wait for node recovery to complete to begin mastering this
2125  * lockres because this lockres is used to kick off recovery! ;-)
2126  * So, do a pre-check on all living nodes to see if any of those nodes
2127  * think that $RECOVERY is currently mastered by a dead node. If so,
2128  * we wait a short time to allow that node to get notified by its own
2129  * heartbeat stack, then check again. All $RECOVERY lock resources
2130  * mastered by dead nodes are purged when the hearbeat callback is
2131  * fired, so we can know for sure that it is safe to continue once
2132  * the node returns a live node or no node. */
2133 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2134  struct dlm_lock_resource *res)
2135 {
2136  struct dlm_node_iter iter;
2137  int nodenum;
2138  int ret = 0;
2139  u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2140 
2141  spin_lock(&dlm->spinlock);
2142  dlm_node_iter_init(dlm->domain_map, &iter);
2143  spin_unlock(&dlm->spinlock);
2144 
2145  while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2146  /* do not send to self */
2147  if (nodenum == dlm->node_num)
2148  continue;
2149  ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2150  if (ret < 0) {
2151  mlog_errno(ret);
2152  if (!dlm_is_host_down(ret))
2153  BUG();
2154  /* host is down, so answer for that node would be
2155  * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
2156  ret = 0;
2157  }
2158 
2159  if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2160  /* check to see if this master is in the recovery map */
2161  spin_lock(&dlm->spinlock);
2162  if (test_bit(master, dlm->recovery_map)) {
2163  mlog(ML_NOTICE, "%s: node %u has not seen "
2164  "node %u go down yet, and thinks the "
2165  "dead node is mastering the recovery "
2166  "lock. must wait.\n", dlm->name,
2167  nodenum, master);
2168  ret = -EAGAIN;
2169  }
2170  spin_unlock(&dlm->spinlock);
2171  mlog(0, "%s: reco lock master is %u\n", dlm->name,
2172  master);
2173  break;
2174  }
2175  }
2176  return ret;
2177 }
2178 
2179 /*
2180  * DLM_DEREF_LOCKRES_MSG
2181  */
2182 
2183 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2184 {
2185  struct dlm_deref_lockres deref;
2186  int ret = 0, r;
2187  const char *lockname;
2188  unsigned int namelen;
2189 
2190  lockname = res->lockname.name;
2191  namelen = res->lockname.len;
2192  BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2193 
2194  memset(&deref, 0, sizeof(deref));
2195  deref.node_idx = dlm->node_num;
2196  deref.namelen = namelen;
2197  memcpy(deref.name, lockname, namelen);
2198 
2200  &deref, sizeof(deref), res->owner, &r);
2201  if (ret < 0)
2202  mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
2203  dlm->name, namelen, lockname, ret, res->owner);
2204  else if (r < 0) {
2205  /* BAD. other node says I did not have a ref. */
2206  mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
2207  dlm->name, namelen, lockname, res->owner, r);
2209  BUG();
2210  }
2211  return ret;
2212 }
2213 
2214 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2215  void **ret_data)
2216 {
2217  struct dlm_ctxt *dlm = data;
2218  struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2219  struct dlm_lock_resource *res = NULL;
2220  char *name;
2221  unsigned int namelen;
2222  int ret = -EINVAL;
2223  u8 node;
2224  unsigned int hash;
2225  struct dlm_work_item *item;
2226  int cleared = 0;
2227  int dispatch = 0;
2228 
2229  if (!dlm_grab(dlm))
2230  return 0;
2231 
2232  name = deref->name;
2233  namelen = deref->namelen;
2234  node = deref->node_idx;
2235 
2236  if (namelen > DLM_LOCKID_NAME_MAX) {
2237  mlog(ML_ERROR, "Invalid name length!");
2238  goto done;
2239  }
2240  if (deref->node_idx >= O2NM_MAX_NODES) {
2241  mlog(ML_ERROR, "Invalid node number: %u\n", node);
2242  goto done;
2243  }
2244 
2245  hash = dlm_lockid_hash(name, namelen);
2246 
2247  spin_lock(&dlm->spinlock);
2248  res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2249  if (!res) {
2250  spin_unlock(&dlm->spinlock);
2251  mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2252  dlm->name, namelen, name);
2253  goto done;
2254  }
2255  spin_unlock(&dlm->spinlock);
2256 
2257  spin_lock(&res->spinlock);
2258  if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2259  dispatch = 1;
2260  else {
2262  if (test_bit(node, res->refmap)) {
2263  dlm_lockres_clear_refmap_bit(dlm, res, node);
2264  cleared = 1;
2265  }
2266  }
2267  spin_unlock(&res->spinlock);
2268 
2269  if (!dispatch) {
2270  if (cleared)
2271  dlm_lockres_calc_usage(dlm, res);
2272  else {
2273  mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2274  "but it is already dropped!\n", dlm->name,
2275  res->lockname.len, res->lockname.name, node);
2277  }
2278  ret = 0;
2279  goto done;
2280  }
2281 
2282  item = kzalloc(sizeof(*item), GFP_NOFS);
2283  if (!item) {
2284  ret = -ENOMEM;
2285  mlog_errno(ret);
2286  goto done;
2287  }
2288 
2289  dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2290  item->u.dl.deref_res = res;
2291  item->u.dl.deref_node = node;
2292 
2293  spin_lock(&dlm->work_lock);
2294  list_add_tail(&item->list, &dlm->work_list);
2295  spin_unlock(&dlm->work_lock);
2296 
2297  queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2298  return 0;
2299 
2300 done:
2301  if (res)
2302  dlm_lockres_put(res);
2303  dlm_put(dlm);
2304 
2305  return ret;
2306 }
2307 
2308 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2309 {
2310  struct dlm_ctxt *dlm;
2311  struct dlm_lock_resource *res;
2312  u8 node;
2313  u8 cleared = 0;
2314 
2315  dlm = item->dlm;
2316  res = item->u.dl.deref_res;
2317  node = item->u.dl.deref_node;
2318 
2319  spin_lock(&res->spinlock);
2321  if (test_bit(node, res->refmap)) {
2323  dlm_lockres_clear_refmap_bit(dlm, res, node);
2324  cleared = 1;
2325  }
2326  spin_unlock(&res->spinlock);
2327 
2328  if (cleared) {
2329  mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2330  dlm->name, res->lockname.len, res->lockname.name, node);
2331  dlm_lockres_calc_usage(dlm, res);
2332  } else {
2333  mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2334  "but it is already dropped!\n", dlm->name,
2335  res->lockname.len, res->lockname.name, node);
2337  }
2338 
2339  dlm_lockres_put(res);
2340 }
2341 
2342 /*
2343  * A migrateable resource is one that is :
2344  * 1. locally mastered, and,
2345  * 2. zero local locks, and,
2346  * 3. one or more non-local locks, or, one or more references
2347  * Returns 1 if yes, 0 if not.
2348  */
2349 static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2350  struct dlm_lock_resource *res)
2351 {
2352  enum dlm_lockres_list idx;
2353  int nonlocal = 0, node_ref;
2354  struct list_head *queue;
2355  struct dlm_lock *lock;
2356  u64 cookie;
2357 
2359 
2360  if (res->owner != dlm->node_num)
2361  return 0;
2362 
2363  for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2364  queue = dlm_list_idx_to_ptr(res, idx);
2365  list_for_each_entry(lock, queue, list) {
2366  if (lock->ml.node != dlm->node_num) {
2367  nonlocal++;
2368  continue;
2369  }
2370  cookie = be64_to_cpu(lock->ml.cookie);
2371  mlog(0, "%s: Not migrateable res %.*s, lock %u:%llu on "
2372  "%s list\n", dlm->name, res->lockname.len,
2373  res->lockname.name,
2374  dlm_get_lock_cookie_node(cookie),
2375  dlm_get_lock_cookie_seq(cookie),
2376  dlm_list_in_text(idx));
2377  return 0;
2378  }
2379  }
2380 
2381  if (!nonlocal) {
2382  node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
2383  if (node_ref >= O2NM_MAX_NODES)
2384  return 0;
2385  }
2386 
2387  mlog(0, "%s: res %.*s, Migrateable\n", dlm->name, res->lockname.len,
2388  res->lockname.name);
2389 
2390  return 1;
2391 }
2392 
2393 /*
2394  * DLM_MIGRATE_LOCKRES
2395  */
2396 
2397 
2398 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2399  struct dlm_lock_resource *res, u8 target)
2400 {
2401  struct dlm_master_list_entry *mle = NULL;
2402  struct dlm_master_list_entry *oldmle = NULL;
2403  struct dlm_migratable_lockres *mres = NULL;
2404  int ret = 0;
2405  const char *name;
2406  unsigned int namelen;
2407  int mle_added = 0;
2408  int wake = 0;
2409 
2410  if (!dlm_grab(dlm))
2411  return -EINVAL;
2412 
2413  BUG_ON(target == O2NM_MAX_NODES);
2414 
2415  name = res->lockname.name;
2416  namelen = res->lockname.len;
2417 
2418  mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2419  target);
2420 
2421  /* preallocate up front. if this fails, abort */
2422  ret = -ENOMEM;
2423  mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2424  if (!mres) {
2425  mlog_errno(ret);
2426  goto leave;
2427  }
2428 
2429  mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2430  if (!mle) {
2431  mlog_errno(ret);
2432  goto leave;
2433  }
2434  ret = 0;
2435 
2436  /*
2437  * clear any existing master requests and
2438  * add the migration mle to the list
2439  */
2440  spin_lock(&dlm->spinlock);
2441  spin_lock(&dlm->master_lock);
2442  ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2443  namelen, target, dlm->node_num);
2444  spin_unlock(&dlm->master_lock);
2445  spin_unlock(&dlm->spinlock);
2446 
2447  if (ret == -EEXIST) {
2448  mlog(0, "another process is already migrating it\n");
2449  goto fail;
2450  }
2451  mle_added = 1;
2452 
2453  /*
2454  * set the MIGRATING flag and flush asts
2455  * if we fail after this we need to re-dirty the lockres
2456  */
2457  if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2458  mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2459  "the target went down.\n", res->lockname.len,
2460  res->lockname.name, target);
2461  spin_lock(&res->spinlock);
2462  res->state &= ~DLM_LOCK_RES_MIGRATING;
2463  wake = 1;
2464  spin_unlock(&res->spinlock);
2465  ret = -EINVAL;
2466  }
2467 
2468 fail:
2469  if (oldmle) {
2470  /* master is known, detach if not already detached */
2471  dlm_mle_detach_hb_events(dlm, oldmle);
2472  dlm_put_mle(oldmle);
2473  }
2474 
2475  if (ret < 0) {
2476  if (mle_added) {
2477  dlm_mle_detach_hb_events(dlm, mle);
2478  dlm_put_mle(mle);
2479  } else if (mle) {
2480  kmem_cache_free(dlm_mle_cache, mle);
2481  mle = NULL;
2482  }
2483  goto leave;
2484  }
2485 
2486  /*
2487  * at this point, we have a migration target, an mle
2488  * in the master list, and the MIGRATING flag set on
2489  * the lockres
2490  */
2491 
2492  /* now that remote nodes are spinning on the MIGRATING flag,
2493  * ensure that all assert_master work is flushed. */
2495 
2496  /* get an extra reference on the mle.
2497  * otherwise the assert_master from the new
2498  * master will destroy this.
2499  * also, make sure that all callers of dlm_get_mle
2500  * take both dlm->spinlock and dlm->master_lock */
2501  spin_lock(&dlm->spinlock);
2502  spin_lock(&dlm->master_lock);
2503  dlm_get_mle_inuse(mle);
2504  spin_unlock(&dlm->master_lock);
2505  spin_unlock(&dlm->spinlock);
2506 
2507  /* notify new node and send all lock state */
2508  /* call send_one_lockres with migration flag.
2509  * this serves as notice to the target node that a
2510  * migration is starting. */
2511  ret = dlm_send_one_lockres(dlm, res, mres, target,
2513 
2514  if (ret < 0) {
2515  mlog(0, "migration to node %u failed with %d\n",
2516  target, ret);
2517  /* migration failed, detach and clean up mle */
2518  dlm_mle_detach_hb_events(dlm, mle);
2519  dlm_put_mle(mle);
2520  dlm_put_mle_inuse(mle);
2521  spin_lock(&res->spinlock);
2522  res->state &= ~DLM_LOCK_RES_MIGRATING;
2523  wake = 1;
2524  spin_unlock(&res->spinlock);
2525  if (dlm_is_host_down(ret))
2526  dlm_wait_for_node_death(dlm, target,
2528  goto leave;
2529  }
2530 
2531  /* at this point, the target sends a message to all nodes,
2532  * (using dlm_do_migrate_request). this node is skipped since
2533  * we had to put an mle in the list to begin the process. this
2534  * node now waits for target to do an assert master. this node
2535  * will be the last one notified, ensuring that the migration
2536  * is complete everywhere. if the target dies while this is
2537  * going on, some nodes could potentially see the target as the
2538  * master, so it is important that my recovery finds the migration
2539  * mle and sets the master to UNKNOWN. */
2540 
2541 
2542  /* wait for new node to assert master */
2543  while (1) {
2545  (atomic_read(&mle->woken) == 1),
2546  msecs_to_jiffies(5000));
2547 
2548  if (ret >= 0) {
2549  if (atomic_read(&mle->woken) == 1 ||
2550  res->owner == target)
2551  break;
2552 
2553  mlog(0, "%s:%.*s: timed out during migration\n",
2554  dlm->name, res->lockname.len, res->lockname.name);
2555  /* avoid hang during shutdown when migrating lockres
2556  * to a node which also goes down */
2557  if (dlm_is_node_dead(dlm, target)) {
2558  mlog(0, "%s:%.*s: expected migration "
2559  "target %u is no longer up, restarting\n",
2560  dlm->name, res->lockname.len,
2561  res->lockname.name, target);
2562  ret = -EINVAL;
2563  /* migration failed, detach and clean up mle */
2564  dlm_mle_detach_hb_events(dlm, mle);
2565  dlm_put_mle(mle);
2566  dlm_put_mle_inuse(mle);
2567  spin_lock(&res->spinlock);
2568  res->state &= ~DLM_LOCK_RES_MIGRATING;
2569  wake = 1;
2570  spin_unlock(&res->spinlock);
2571  goto leave;
2572  }
2573  } else
2574  mlog(0, "%s:%.*s: caught signal during migration\n",
2575  dlm->name, res->lockname.len, res->lockname.name);
2576  }
2577 
2578  /* all done, set the owner, clear the flag */
2579  spin_lock(&res->spinlock);
2580  dlm_set_lockres_owner(dlm, res, target);
2581  res->state &= ~DLM_LOCK_RES_MIGRATING;
2582  dlm_remove_nonlocal_locks(dlm, res);
2583  spin_unlock(&res->spinlock);
2584  wake_up(&res->wq);
2585 
2586  /* master is known, detach if not already detached */
2587  dlm_mle_detach_hb_events(dlm, mle);
2588  dlm_put_mle_inuse(mle);
2589  ret = 0;
2590 
2591  dlm_lockres_calc_usage(dlm, res);
2592 
2593 leave:
2594  /* re-dirty the lockres if we failed */
2595  if (ret < 0)
2596  dlm_kick_thread(dlm, res);
2597 
2598  /* wake up waiters if the MIGRATING flag got set
2599  * but migration failed */
2600  if (wake)
2601  wake_up(&res->wq);
2602 
2603  if (mres)
2604  free_page((unsigned long)mres);
2605 
2606  dlm_put(dlm);
2607 
2608  mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2609  name, target, ret);
2610  return ret;
2611 }
2612 
2613 #define DLM_MIGRATION_RETRY_MS 100
2614 
2615 /*
2616  * Should be called only after beginning the domain leave process.
2617  * There should not be any remaining locks on nonlocal lock resources,
2618  * and there should be no local locks left on locally mastered resources.
2619  *
2620  * Called with the dlm spinlock held, may drop it to do migration, but
2621  * will re-acquire before exit.
2622  *
2623  * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2624  */
2625 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2626 {
2627  int ret;
2628  int lock_dropped = 0;
2629  u8 target = O2NM_MAX_NODES;
2630 
2632 
2633  spin_lock(&res->spinlock);
2634  if (dlm_is_lockres_migrateable(dlm, res))
2635  target = dlm_pick_migration_target(dlm, res);
2636  spin_unlock(&res->spinlock);
2637 
2638  if (target == O2NM_MAX_NODES)
2639  goto leave;
2640 
2641  /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2642  spin_unlock(&dlm->spinlock);
2643  lock_dropped = 1;
2644  ret = dlm_migrate_lockres(dlm, res, target);
2645  if (ret)
2646  mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
2647  dlm->name, res->lockname.len, res->lockname.name,
2648  target, ret);
2649  spin_lock(&dlm->spinlock);
2650 leave:
2651  return lock_dropped;
2652 }
2653 
2654 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2655 {
2656  int ret;
2657  spin_lock(&dlm->ast_lock);
2658  spin_lock(&lock->spinlock);
2659  ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2660  spin_unlock(&lock->spinlock);
2661  spin_unlock(&dlm->ast_lock);
2662  return ret;
2663 }
2664 
2665 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2666  struct dlm_lock_resource *res,
2667  u8 mig_target)
2668 {
2669  int can_proceed;
2670  spin_lock(&res->spinlock);
2671  can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2672  spin_unlock(&res->spinlock);
2673 
2674  /* target has died, so make the caller break out of the
2675  * wait_event, but caller must recheck the domain_map */
2676  spin_lock(&dlm->spinlock);
2677  if (!test_bit(mig_target, dlm->domain_map))
2678  can_proceed = 1;
2679  spin_unlock(&dlm->spinlock);
2680  return can_proceed;
2681 }
2682 
2683 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2684  struct dlm_lock_resource *res)
2685 {
2686  int ret;
2687  spin_lock(&res->spinlock);
2688  ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2689  spin_unlock(&res->spinlock);
2690  return ret;
2691 }
2692 
2693 
2694 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2695  struct dlm_lock_resource *res,
2696  u8 target)
2697 {
2698  int ret = 0;
2699 
2700  mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2701  res->lockname.len, res->lockname.name, dlm->node_num,
2702  target);
2703  /* need to set MIGRATING flag on lockres. this is done by
2704  * ensuring that all asts have been flushed for this lockres. */
2705  spin_lock(&res->spinlock);
2706  BUG_ON(res->migration_pending);
2707  res->migration_pending = 1;
2708  /* strategy is to reserve an extra ast then release
2709  * it below, letting the release do all of the work */
2711  spin_unlock(&res->spinlock);
2712 
2713  /* now flush all the pending asts */
2714  dlm_kick_thread(dlm, res);
2715  /* before waiting on DIRTY, block processes which may
2716  * try to dirty the lockres before MIGRATING is set */
2717  spin_lock(&res->spinlock);
2720  spin_unlock(&res->spinlock);
2721  /* now wait on any pending asts and the DIRTY state */
2722  wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2723  dlm_lockres_release_ast(dlm, res);
2724 
2725  mlog(0, "about to wait on migration_wq, dirty=%s\n",
2726  res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2727  /* if the extra ref we just put was the final one, this
2728  * will pass thru immediately. otherwise, we need to wait
2729  * for the last ast to finish. */
2730 again:
2732  dlm_migration_can_proceed(dlm, res, target),
2733  msecs_to_jiffies(1000));
2734  if (ret < 0) {
2735  mlog(0, "woken again: migrating? %s, dead? %s\n",
2736  res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2737  test_bit(target, dlm->domain_map) ? "no":"yes");
2738  } else {
2739  mlog(0, "all is well: migrating? %s, dead? %s\n",
2740  res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2741  test_bit(target, dlm->domain_map) ? "no":"yes");
2742  }
2743  if (!dlm_migration_can_proceed(dlm, res, target)) {
2744  mlog(0, "trying again...\n");
2745  goto again;
2746  }
2747 
2748  ret = 0;
2749  /* did the target go down or die? */
2750  spin_lock(&dlm->spinlock);
2751  if (!test_bit(target, dlm->domain_map)) {
2752  mlog(ML_ERROR, "aha. migration target %u just went down\n",
2753  target);
2754  ret = -EHOSTDOWN;
2755  }
2756  spin_unlock(&dlm->spinlock);
2757 
2758  /*
2759  * if target is down, we need to clear DLM_LOCK_RES_BLOCK_DIRTY for
2760  * another try; otherwise, we are sure the MIGRATING state is there,
2761  * drop the unneded state which blocked threads trying to DIRTY
2762  */
2763  spin_lock(&res->spinlock);
2766  if (!ret)
2767  BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2768  spin_unlock(&res->spinlock);
2769 
2770  /*
2771  * at this point:
2772  *
2773  * o the DLM_LOCK_RES_MIGRATING flag is set if target not down
2774  * o there are no pending asts on this lockres
2775  * o all processes trying to reserve an ast on this
2776  * lockres must wait for the MIGRATING flag to clear
2777  */
2778  return ret;
2779 }
2780 
2781 /* last step in the migration process.
2782  * original master calls this to free all of the dlm_lock
2783  * structures that used to be for other nodes. */
2784 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2785  struct dlm_lock_resource *res)
2786 {
2787  struct list_head *queue = &res->granted;
2788  int i, bit;
2789  struct dlm_lock *lock, *next;
2790 
2792 
2793  BUG_ON(res->owner == dlm->node_num);
2794 
2795  for (i=0; i<3; i++) {
2796  list_for_each_entry_safe(lock, next, queue, list) {
2797  if (lock->ml.node != dlm->node_num) {
2798  mlog(0, "putting lock for node %u\n",
2799  lock->ml.node);
2800  /* be extra careful */
2801  BUG_ON(!list_empty(&lock->ast_list));
2802  BUG_ON(!list_empty(&lock->bast_list));
2803  BUG_ON(lock->ast_pending);
2804  BUG_ON(lock->bast_pending);
2806  lock->ml.node);
2807  list_del_init(&lock->list);
2808  dlm_lock_put(lock);
2809  /* In a normal unlock, we would have added a
2810  * DLM_UNLOCK_FREE_LOCK action. Force it. */
2811  dlm_lock_put(lock);
2812  }
2813  }
2814  queue++;
2815  }
2816  bit = 0;
2817  while (1) {
2818  bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2819  if (bit >= O2NM_MAX_NODES)
2820  break;
2821  /* do not clear the local node reference, if there is a
2822  * process holding this, let it drop the ref itself */
2823  if (bit != dlm->node_num) {
2824  mlog(0, "%s:%.*s: node %u had a ref to this "
2825  "migrating lockres, clearing\n", dlm->name,
2826  res->lockname.len, res->lockname.name, bit);
2827  dlm_lockres_clear_refmap_bit(dlm, res, bit);
2828  }
2829  bit++;
2830  }
2831 }
2832 
2833 /*
2834  * Pick a node to migrate the lock resource to. This function selects a
2835  * potential target based first on the locks and then on refmap. It skips
2836  * nodes that are in the process of exiting the domain.
2837  */
2838 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2839  struct dlm_lock_resource *res)
2840 {
2841  enum dlm_lockres_list idx;
2842  struct list_head *queue = &res->granted;
2843  struct dlm_lock *lock;
2844  int noderef;
2845  u8 nodenum = O2NM_MAX_NODES;
2846 
2849 
2850  /* Go through all the locks */
2851  for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
2852  queue = dlm_list_idx_to_ptr(res, idx);
2853  list_for_each_entry(lock, queue, list) {
2854  if (lock->ml.node == dlm->node_num)
2855  continue;
2856  if (test_bit(lock->ml.node, dlm->exit_domain_map))
2857  continue;
2858  nodenum = lock->ml.node;
2859  goto bail;
2860  }
2861  }
2862 
2863  /* Go thru the refmap */
2864  noderef = -1;
2865  while (1) {
2866  noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
2867  noderef + 1);
2868  if (noderef >= O2NM_MAX_NODES)
2869  break;
2870  if (noderef == dlm->node_num)
2871  continue;
2872  if (test_bit(noderef, dlm->exit_domain_map))
2873  continue;
2874  nodenum = noderef;
2875  goto bail;
2876  }
2877 
2878 bail:
2879  return nodenum;
2880 }
2881 
2882 /* this is called by the new master once all lockres
2883  * data has been received */
2884 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2885  struct dlm_lock_resource *res,
2886  u8 master, u8 new_master,
2887  struct dlm_node_iter *iter)
2888 {
2889  struct dlm_migrate_request migrate;
2890  int ret, skip, status = 0;
2891  int nodenum;
2892 
2893  memset(&migrate, 0, sizeof(migrate));
2894  migrate.namelen = res->lockname.len;
2895  memcpy(migrate.name, res->lockname.name, migrate.namelen);
2896  migrate.new_master = new_master;
2897  migrate.master = master;
2898 
2899  ret = 0;
2900 
2901  /* send message to all nodes, except the master and myself */
2902  while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2903  if (nodenum == master ||
2904  nodenum == new_master)
2905  continue;
2906 
2907  /* We could race exit domain. If exited, skip. */
2908  spin_lock(&dlm->spinlock);
2909  skip = (!test_bit(nodenum, dlm->domain_map));
2910  spin_unlock(&dlm->spinlock);
2911  if (skip) {
2912  clear_bit(nodenum, iter->node_map);
2913  continue;
2914  }
2915 
2917  &migrate, sizeof(migrate), nodenum,
2918  &status);
2919  if (ret < 0) {
2920  mlog(ML_ERROR, "%s: res %.*s, Error %d send "
2921  "MIGRATE_REQUEST to node %u\n", dlm->name,
2922  migrate.namelen, migrate.name, ret, nodenum);
2923  if (!dlm_is_host_down(ret)) {
2924  mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2925  BUG();
2926  }
2927  clear_bit(nodenum, iter->node_map);
2928  ret = 0;
2929  } else if (status < 0) {
2930  mlog(0, "migrate request (node %u) returned %d!\n",
2931  nodenum, status);
2932  ret = status;
2933  } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
2934  /* during the migration request we short-circuited
2935  * the mastery of the lockres. make sure we have
2936  * a mastery ref for nodenum */
2937  mlog(0, "%s:%.*s: need ref for node %u\n",
2938  dlm->name, res->lockname.len, res->lockname.name,
2939  nodenum);
2940  spin_lock(&res->spinlock);
2941  dlm_lockres_set_refmap_bit(dlm, res, nodenum);
2942  spin_unlock(&res->spinlock);
2943  }
2944  }
2945 
2946  if (ret < 0)
2947  mlog_errno(ret);
2948 
2949  mlog(0, "returning ret=%d\n", ret);
2950  return ret;
2951 }
2952 
2953 
2954 /* if there is an existing mle for this lockres, we now know who the master is.
2955  * (the one who sent us *this* message) we can clear it up right away.
2956  * since the process that put the mle on the list still has a reference to it,
2957  * we can unhash it now, set the master and wake the process. as a result,
2958  * we will have no mle in the list to start with. now we can add an mle for
2959  * the migration and this should be the only one found for those scanning the
2960  * list. */
2961 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
2962  void **ret_data)
2963 {
2964  struct dlm_ctxt *dlm = data;
2965  struct dlm_lock_resource *res = NULL;
2966  struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2967  struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2968  const char *name;
2969  unsigned int namelen, hash;
2970  int ret = 0;
2971 
2972  if (!dlm_grab(dlm))
2973  return -EINVAL;
2974 
2975  name = migrate->name;
2976  namelen = migrate->namelen;
2977  hash = dlm_lockid_hash(name, namelen);
2978 
2979  /* preallocate.. if this fails, abort */
2980  mle = kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
2981 
2982  if (!mle) {
2983  ret = -ENOMEM;
2984  goto leave;
2985  }
2986 
2987  /* check for pre-existing lock */
2988  spin_lock(&dlm->spinlock);
2989  res = __dlm_lookup_lockres(dlm, name, namelen, hash);
2990  if (res) {
2991  spin_lock(&res->spinlock);
2992  if (res->state & DLM_LOCK_RES_RECOVERING) {
2993  /* if all is working ok, this can only mean that we got
2994  * a migrate request from a node that we now see as
2995  * dead. what can we do here? drop it to the floor? */
2996  spin_unlock(&res->spinlock);
2997  mlog(ML_ERROR, "Got a migrate request, but the "
2998  "lockres is marked as recovering!");
2999  kmem_cache_free(dlm_mle_cache, mle);
3000  ret = -EINVAL; /* need a better solution */
3001  goto unlock;
3002  }
3003  res->state |= DLM_LOCK_RES_MIGRATING;
3004  spin_unlock(&res->spinlock);
3005  }
3006 
3007  spin_lock(&dlm->master_lock);
3008  /* ignore status. only nonzero status would BUG. */
3009  ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3010  name, namelen,
3011  migrate->new_master,
3012  migrate->master);
3013 
3014  spin_unlock(&dlm->master_lock);
3015 unlock:
3016  spin_unlock(&dlm->spinlock);
3017 
3018  if (oldmle) {
3019  /* master is known, detach if not already detached */
3020  dlm_mle_detach_hb_events(dlm, oldmle);
3021  dlm_put_mle(oldmle);
3022  }
3023 
3024  if (res)
3025  dlm_lockres_put(res);
3026 leave:
3027  dlm_put(dlm);
3028  return ret;
3029 }
3030 
3031 /* must be holding dlm->spinlock and dlm->master_lock
3032  * when adding a migration mle, we can clear any other mles
3033  * in the master list because we know with certainty that
3034  * the master is "master". so we remove any old mle from
3035  * the list after setting it's master field, and then add
3036  * the new migration mle. this way we can hold with the rule
3037  * of having only one mle for a given lock name at all times. */
3038 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3039  struct dlm_lock_resource *res,
3040  struct dlm_master_list_entry *mle,
3041  struct dlm_master_list_entry **oldmle,
3042  const char *name, unsigned int namelen,
3043  u8 new_master, u8 master)
3044 {
3045  int found;
3046  int ret = 0;
3047 
3048  *oldmle = NULL;
3049 
3052 
3053  /* caller is responsible for any ref taken here on oldmle */
3054  found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3055  if (found) {
3056  struct dlm_master_list_entry *tmp = *oldmle;
3057  spin_lock(&tmp->spinlock);
3058  if (tmp->type == DLM_MLE_MIGRATION) {
3059  if (master == dlm->node_num) {
3060  /* ah another process raced me to it */
3061  mlog(0, "tried to migrate %.*s, but some "
3062  "process beat me to it\n",
3063  namelen, name);
3064  ret = -EEXIST;
3065  } else {
3066  /* bad. 2 NODES are trying to migrate! */
3067  mlog(ML_ERROR, "migration error mle: "
3068  "master=%u new_master=%u // request: "
3069  "master=%u new_master=%u // "
3070  "lockres=%.*s\n",
3071  tmp->master, tmp->new_master,
3072  master, new_master,
3073  namelen, name);
3074  BUG();
3075  }
3076  } else {
3077  /* this is essentially what assert_master does */
3078  tmp->master = master;
3079  atomic_set(&tmp->woken, 1);
3080  wake_up(&tmp->wq);
3081  /* remove it so that only one mle will be found */
3082  __dlm_unlink_mle(dlm, tmp);
3083  __dlm_mle_detach_hb_events(dlm, tmp);
3085  mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3086  "telling master to get ref for cleared out mle "
3087  "during migration\n", dlm->name, namelen, name,
3088  master, new_master);
3089  }
3090  spin_unlock(&tmp->spinlock);
3091  }
3092 
3093  /* now add a migration mle to the tail of the list */
3094  dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3095  mle->new_master = new_master;
3096  /* the new master will be sending an assert master for this.
3097  * at that point we will get the refmap reference */
3098  mle->master = master;
3099  /* do this for consistency with other mle types */
3100  set_bit(new_master, mle->maybe_map);
3101  __dlm_insert_mle(dlm, mle);
3102 
3103  return ret;
3104 }
3105 
3106 /*
3107  * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3108  */
3109 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3110  struct dlm_master_list_entry *mle)
3111 {
3112  struct dlm_lock_resource *res;
3113 
3114  /* Find the lockres associated to the mle and set its owner to UNK */
3115  res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3116  mle->mnamehash);
3117  if (res) {
3118  spin_unlock(&dlm->master_lock);
3119 
3120  /* move lockres onto recovery list */
3121  spin_lock(&res->spinlock);
3122  dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3124  spin_unlock(&res->spinlock);
3125  dlm_lockres_put(res);
3126 
3127  /* about to get rid of mle, detach from heartbeat */
3128  __dlm_mle_detach_hb_events(dlm, mle);
3129 
3130  /* dump the mle */
3131  spin_lock(&dlm->master_lock);
3132  __dlm_put_mle(mle);
3133  spin_unlock(&dlm->master_lock);
3134  }
3135 
3136  return res;
3137 }
3138 
3139 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3140  struct dlm_master_list_entry *mle)
3141 {
3142  __dlm_mle_detach_hb_events(dlm, mle);
3143 
3144  spin_lock(&mle->spinlock);
3145  __dlm_unlink_mle(dlm, mle);
3146  atomic_set(&mle->woken, 1);
3147  spin_unlock(&mle->spinlock);
3148 
3149  wake_up(&mle->wq);
3150 }
3151 
3152 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3153  struct dlm_master_list_entry *mle, u8 dead_node)
3154 {
3155  int bit;
3156 
3157  BUG_ON(mle->type != DLM_MLE_BLOCK);
3158 
3159  spin_lock(&mle->spinlock);
3160  bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3161  if (bit != dead_node) {
3162  mlog(0, "mle found, but dead node %u would not have been "
3163  "master\n", dead_node);
3164  spin_unlock(&mle->spinlock);
3165  } else {
3166  /* Must drop the refcount by one since the assert_master will
3167  * never arrive. This may result in the mle being unlinked and
3168  * freed, but there may still be a process waiting in the
3169  * dlmlock path which is fine. */
3170  mlog(0, "node %u was expected master\n", dead_node);
3171  atomic_set(&mle->woken, 1);
3172  spin_unlock(&mle->spinlock);
3173  wake_up(&mle->wq);
3174 
3175  /* Do not need events any longer, so detach from heartbeat */
3176  __dlm_mle_detach_hb_events(dlm, mle);
3177  __dlm_put_mle(mle);
3178  }
3179 }
3180 
3181 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3182 {
3183  struct dlm_master_list_entry *mle;
3184  struct dlm_lock_resource *res;
3185  struct hlist_head *bucket;
3186  struct hlist_node *list;
3187  unsigned int i;
3188 
3189  mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3190 top:
3192 
3193  /* clean the master list */
3194  spin_lock(&dlm->master_lock);
3195  for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3196  bucket = dlm_master_hash(dlm, i);
3197  hlist_for_each(list, bucket) {
3198  mle = hlist_entry(list, struct dlm_master_list_entry,
3199  master_hash_node);
3200 
3201  BUG_ON(mle->type != DLM_MLE_BLOCK &&
3202  mle->type != DLM_MLE_MASTER &&
3203  mle->type != DLM_MLE_MIGRATION);
3204 
3205  /* MASTER mles are initiated locally. The waiting
3206  * process will notice the node map change shortly.
3207  * Let that happen as normal. */
3208  if (mle->type == DLM_MLE_MASTER)
3209  continue;
3210 
3211  /* BLOCK mles are initiated by other nodes. Need to
3212  * clean up if the dead node would have been the
3213  * master. */
3214  if (mle->type == DLM_MLE_BLOCK) {
3215  dlm_clean_block_mle(dlm, mle, dead_node);
3216  continue;
3217  }
3218 
3219  /* Everything else is a MIGRATION mle */
3220 
3221  /* The rule for MIGRATION mles is that the master
3222  * becomes UNKNOWN if *either* the original or the new
3223  * master dies. All UNKNOWN lockres' are sent to
3224  * whichever node becomes the recovery master. The new
3225  * master is responsible for determining if there is
3226  * still a master for this lockres, or if he needs to
3227  * take over mastery. Either way, this node should
3228  * expect another message to resolve this. */
3229 
3230  if (mle->master != dead_node &&
3231  mle->new_master != dead_node)
3232  continue;
3233 
3234  /* If we have reached this point, this mle needs to be
3235  * removed from the list and freed. */
3236  dlm_clean_migration_mle(dlm, mle);
3237 
3238  mlog(0, "%s: node %u died during migration from "
3239  "%u to %u!\n", dlm->name, dead_node, mle->master,
3240  mle->new_master);
3241 
3242  /* If we find a lockres associated with the mle, we've
3243  * hit this rare case that messes up our lock ordering.
3244  * If so, we need to drop the master lock so that we can
3245  * take the lockres lock, meaning that we will have to
3246  * restart from the head of list. */
3247  res = dlm_reset_mleres_owner(dlm, mle);
3248  if (res)
3249  /* restart */
3250  goto top;
3251 
3252  /* This may be the last reference */
3253  __dlm_put_mle(mle);
3254  }
3255  }
3256  spin_unlock(&dlm->master_lock);
3257 }
3258 
3259 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3260  u8 old_master)
3261 {
3262  struct dlm_node_iter iter;
3263  int ret = 0;
3264 
3265  spin_lock(&dlm->spinlock);
3266  dlm_node_iter_init(dlm->domain_map, &iter);
3267  clear_bit(old_master, iter.node_map);
3268  clear_bit(dlm->node_num, iter.node_map);
3269  spin_unlock(&dlm->spinlock);
3270 
3271  /* ownership of the lockres is changing. account for the
3272  * mastery reference here since old_master will briefly have
3273  * a reference after the migration completes */
3274  spin_lock(&res->spinlock);
3275  dlm_lockres_set_refmap_bit(dlm, res, old_master);
3276  spin_unlock(&res->spinlock);
3277 
3278  mlog(0, "now time to do a migrate request to other nodes\n");
3279  ret = dlm_do_migrate_request(dlm, res, old_master,
3280  dlm->node_num, &iter);
3281  if (ret < 0) {
3282  mlog_errno(ret);
3283  goto leave;
3284  }
3285 
3286  mlog(0, "doing assert master of %.*s to all except the original node\n",
3287  res->lockname.len, res->lockname.name);
3288  /* this call now finishes out the nodemap
3289  * even if one or more nodes die */
3290  ret = dlm_do_assert_master(dlm, res, iter.node_map,
3292  if (ret < 0) {
3293  /* no longer need to retry. all living nodes contacted. */
3294  mlog_errno(ret);
3295  ret = 0;
3296  }
3297 
3298  memset(iter.node_map, 0, sizeof(iter.node_map));
3299  set_bit(old_master, iter.node_map);
3300  mlog(0, "doing assert master of %.*s back to %u\n",
3301  res->lockname.len, res->lockname.name, old_master);
3302  ret = dlm_do_assert_master(dlm, res, iter.node_map,
3304  if (ret < 0) {
3305  mlog(0, "assert master to original master failed "
3306  "with %d.\n", ret);
3307  /* the only nonzero status here would be because of
3308  * a dead original node. we're done. */
3309  ret = 0;
3310  }
3311 
3312  /* all done, set the owner, clear the flag */
3313  spin_lock(&res->spinlock);
3314  dlm_set_lockres_owner(dlm, res, dlm->node_num);
3315  res->state &= ~DLM_LOCK_RES_MIGRATING;
3316  spin_unlock(&res->spinlock);
3317  /* re-dirty it on the new master */
3318  dlm_kick_thread(dlm, res);
3319  wake_up(&res->wq);
3320 leave:
3321  return ret;
3322 }
3323 
3324 /*
3325  * LOCKRES AST REFCOUNT
3326  * this is integral to migration
3327  */
3328 
3329 /* for future intent to call an ast, reserve one ahead of time.
3330  * this should be called only after waiting on the lockres
3331  * with dlm_wait_on_lockres, and while still holding the
3332  * spinlock after the call. */
3334 {
3336  if (res->state & DLM_LOCK_RES_MIGRATING) {
3338  }
3340 
3341  atomic_inc(&res->asts_reserved);
3342 }
3343 
3344 /*
3345  * used to drop the reserved ast, either because it went unused,
3346  * or because the ast/bast was actually called.
3347  *
3348  * also, if there is a pending migration on this lockres,
3349  * and this was the last pending ast on the lockres,
3350  * atomically set the MIGRATING flag before we drop the lock.
3351  * this is how we ensure that migration can proceed with no
3352  * asts in progress. note that it is ok if the state of the
3353  * queues is such that a lock should be granted in the future
3354  * or that a bast should be fired, because the new master will
3355  * shuffle the lists on this lockres as soon as it is migrated.
3356  */
3358  struct dlm_lock_resource *res)
3359 {
3360  if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3361  return;
3362 
3363  if (!res->migration_pending) {
3364  spin_unlock(&res->spinlock);
3365  return;
3366  }
3367 
3369  res->migration_pending = 0;
3370  res->state |= DLM_LOCK_RES_MIGRATING;
3371  spin_unlock(&res->spinlock);
3372  wake_up(&res->wq);
3373  wake_up(&dlm->migration_wq);
3374 }
3375 
3377 {
3378  int i;
3379  struct hlist_head *bucket;
3380  struct dlm_master_list_entry *mle;
3381  struct hlist_node *tmp, *list;
3382 
3383  /*
3384  * We notified all other nodes that we are exiting the domain and
3385  * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3386  * around we force free them and wake any processes that are waiting
3387  * on the mles
3388  */
3389  spin_lock(&dlm->spinlock);
3390  spin_lock(&dlm->master_lock);
3391 
3394 
3395  for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3396  bucket = dlm_master_hash(dlm, i);
3397  hlist_for_each_safe(list, tmp, bucket) {
3398  mle = hlist_entry(list, struct dlm_master_list_entry,
3399  master_hash_node);
3400  if (mle->type != DLM_MLE_BLOCK) {
3401  mlog(ML_ERROR, "bad mle: %p\n", mle);
3402  dlm_print_one_mle(mle);
3403  }
3404  atomic_set(&mle->woken, 1);
3405  wake_up(&mle->wq);
3406 
3407  __dlm_unlink_mle(dlm, mle);
3408  __dlm_mle_detach_hb_events(dlm, mle);
3409  __dlm_put_mle(mle);
3410  }
3411  }
3412  spin_unlock(&dlm->master_lock);
3413  spin_unlock(&dlm->spinlock);
3414 }