Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dlmdomain.c
Go to the documentation of this file.
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle. All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/init.h>
32 #include <linux/spinlock.h>
33 #include <linux/delay.h>
34 #include <linux/err.h>
35 #include <linux/debugfs.h>
36 
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40 
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43 #include "dlmdomain.h"
44 #include "dlmdebug.h"
45 
46 #include "dlmver.h"
47 
48 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49 #include "cluster/masklog.h"
50 
51 /*
52  * ocfs2 node maps are array of long int, which limits to send them freely
53  * across the wire due to endianness issues. To workaround this, we convert
54  * long ints to byte arrays. Following 3 routines are helper functions to
55  * set/test/copy bits within those array of bytes
56  */
57 static inline void byte_set_bit(u8 nr, u8 map[])
58 {
59  map[nr >> 3] |= (1UL << (nr & 7));
60 }
61 
62 static inline int byte_test_bit(u8 nr, u8 map[])
63 {
64  return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
65 }
66 
67 static inline void byte_copymap(u8 dmap[], unsigned long smap[],
68  unsigned int sz)
69 {
70  unsigned int nn;
71 
72  if (!sz)
73  return;
74 
75  memset(dmap, 0, ((sz + 7) >> 3));
76  for (nn = 0 ; nn < sz; nn++)
77  if (test_bit(nn, smap))
78  byte_set_bit(nn, dmap);
79 }
80 
81 static void dlm_free_pagevec(void **vec, int pages)
82 {
83  while (pages--)
84  free_page((unsigned long)vec[pages]);
85  kfree(vec);
86 }
87 
88 static void **dlm_alloc_pagevec(int pages)
89 {
90  void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
91  int i;
92 
93  if (!vec)
94  return NULL;
95 
96  for (i = 0; i < pages; i++)
97  if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
98  goto out_free;
99 
100  mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
101  pages, (unsigned long)DLM_HASH_PAGES,
102  (unsigned long)DLM_BUCKETS_PER_PAGE);
103  return vec;
104 out_free:
105  dlm_free_pagevec(vec, i);
106  return NULL;
107 }
108 
109 /*
110  *
111  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
112  * dlm_domain_lock
113  * struct dlm_ctxt->spinlock
114  * struct dlm_lock_resource->spinlock
115  * struct dlm_ctxt->master_lock
116  * struct dlm_ctxt->ast_lock
117  * dlm_master_list_entry->spinlock
118  * dlm_lock->spinlock
119  *
120  */
121 
122 DEFINE_SPINLOCK(dlm_domain_lock);
123 LIST_HEAD(dlm_domains);
124 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
125 
126 /*
127  * The supported protocol version for DLM communication. Running domains
128  * will have a negotiated version with the same major number and a minor
129  * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
130  * be used to determine what a running domain is actually using.
131  *
132  * New in version 1.1:
133  * - Message DLM_QUERY_REGION added to support global heartbeat
134  * - Message DLM_QUERY_NODEINFO added to allow online node removes
135  * New in version 1.2:
136  * - Message DLM_BEGIN_EXIT_DOMAIN_MSG added to mark start of exit domain
137  */
138 static const struct dlm_protocol_version dlm_protocol = {
139  .pv_major = 1,
140  .pv_minor = 2,
141 };
142 
143 #define DLM_DOMAIN_BACKOFF_MS 200
144 
145 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
146  void **ret_data);
147 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
148  void **ret_data);
149 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
150  void **ret_data);
151 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
152  void *data, void **ret_data);
153 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
154  void **ret_data);
155 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
156  struct dlm_protocol_version *request);
157 
158 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
159 
161 {
162  if (hlist_unhashed(&res->hash_node))
163  return;
164 
165  mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len,
166  res->lockname.name);
167  hlist_del_init(&res->hash_node);
168  dlm_lockres_put(res);
169 }
170 
172 {
173  struct hlist_head *bucket;
174  struct qstr *q;
175 
177 
178  q = &res->lockname;
179  bucket = dlm_lockres_hash(dlm, q->hash);
180 
181  /* get a reference for our hashtable */
182  dlm_lockres_get(res);
183 
184  hlist_add_head(&res->hash_node, bucket);
185 
186  mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len,
187  res->lockname.name);
188 }
189 
191  const char *name,
192  unsigned int len,
193  unsigned int hash)
194 {
195  struct hlist_head *bucket;
196  struct hlist_node *list;
197 
198  mlog(0, "%.*s\n", len, name);
199 
201 
202  bucket = dlm_lockres_hash(dlm, hash);
203 
204  hlist_for_each(list, bucket) {
205  struct dlm_lock_resource *res = hlist_entry(list,
206  struct dlm_lock_resource, hash_node);
207  if (res->lockname.name[0] != name[0])
208  continue;
209  if (unlikely(res->lockname.len != len))
210  continue;
211  if (memcmp(res->lockname.name + 1, name + 1, len - 1))
212  continue;
213  dlm_lockres_get(res);
214  return res;
215  }
216  return NULL;
217 }
218 
219 /* intended to be called by functions which do not care about lock
220  * resources which are being purged (most net _handler functions).
221  * this will return NULL for any lock resource which is found but
222  * currently in the process of dropping its mastery reference.
223  * use __dlm_lookup_lockres_full when you need the lock resource
224  * regardless (e.g. dlm_get_lock_resource) */
226  const char *name,
227  unsigned int len,
228  unsigned int hash)
229 {
230  struct dlm_lock_resource *res = NULL;
231 
232  mlog(0, "%.*s\n", len, name);
233 
235 
236  res = __dlm_lookup_lockres_full(dlm, name, len, hash);
237  if (res) {
238  spin_lock(&res->spinlock);
239  if (res->state & DLM_LOCK_RES_DROPPING_REF) {
240  spin_unlock(&res->spinlock);
241  dlm_lockres_put(res);
242  return NULL;
243  }
244  spin_unlock(&res->spinlock);
245  }
246 
247  return res;
248 }
249 
251  const char *name,
252  unsigned int len)
253 {
254  struct dlm_lock_resource *res;
255  unsigned int hash = dlm_lockid_hash(name, len);
256 
257  spin_lock(&dlm->spinlock);
258  res = __dlm_lookup_lockres(dlm, name, len, hash);
259  spin_unlock(&dlm->spinlock);
260  return res;
261 }
262 
263 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
264 {
265  struct dlm_ctxt *tmp = NULL;
266  struct list_head *iter;
267 
268  assert_spin_locked(&dlm_domain_lock);
269 
270  /* tmp->name here is always NULL terminated,
271  * but domain may not be! */
272  list_for_each(iter, &dlm_domains) {
273  tmp = list_entry (iter, struct dlm_ctxt, list);
274  if (strlen(tmp->name) == len &&
275  memcmp(tmp->name, domain, len)==0)
276  break;
277  tmp = NULL;
278  }
279 
280  return tmp;
281 }
282 
283 /* For null terminated domain strings ONLY */
284 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
285 {
286  assert_spin_locked(&dlm_domain_lock);
287 
288  return __dlm_lookup_domain_full(domain, strlen(domain));
289 }
290 
291 
292 /* returns true on one of two conditions:
293  * 1) the domain does not exist
294  * 2) the domain exists and it's state is "joined" */
295 static int dlm_wait_on_domain_helper(const char *domain)
296 {
297  int ret = 0;
298  struct dlm_ctxt *tmp = NULL;
299 
300  spin_lock(&dlm_domain_lock);
301 
302  tmp = __dlm_lookup_domain(domain);
303  if (!tmp)
304  ret = 1;
305  else if (tmp->dlm_state == DLM_CTXT_JOINED)
306  ret = 1;
307 
308  spin_unlock(&dlm_domain_lock);
309  return ret;
310 }
311 
312 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
313 {
314  dlm_destroy_debugfs_subroot(dlm);
315 
316  if (dlm->lockres_hash)
317  dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
318 
319  if (dlm->master_hash)
320  dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
321 
322  if (dlm->name)
323  kfree(dlm->name);
324 
325  kfree(dlm);
326 }
327 
328 /* A little strange - this function will be called while holding
329  * dlm_domain_lock and is expected to be holding it on the way out. We
330  * will however drop and reacquire it multiple times */
331 static void dlm_ctxt_release(struct kref *kref)
332 {
333  struct dlm_ctxt *dlm;
334 
335  dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
336 
337  BUG_ON(dlm->num_joins);
339 
340  /* we may still be in the list if we hit an error during join. */
341  list_del_init(&dlm->list);
342 
343  spin_unlock(&dlm_domain_lock);
344 
345  mlog(0, "freeing memory from domain %s\n", dlm->name);
346 
347  wake_up(&dlm_domain_events);
348 
349  dlm_free_ctxt_mem(dlm);
350 
351  spin_lock(&dlm_domain_lock);
352 }
353 
354 void dlm_put(struct dlm_ctxt *dlm)
355 {
356  spin_lock(&dlm_domain_lock);
357  kref_put(&dlm->dlm_refs, dlm_ctxt_release);
358  spin_unlock(&dlm_domain_lock);
359 }
360 
361 static void __dlm_get(struct dlm_ctxt *dlm)
362 {
363  kref_get(&dlm->dlm_refs);
364 }
365 
366 /* given a questionable reference to a dlm object, gets a reference if
367  * it can find it in the list, otherwise returns NULL in which case
368  * you shouldn't trust your pointer. */
369 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
370 {
371  struct list_head *iter;
372  struct dlm_ctxt *target = NULL;
373 
374  spin_lock(&dlm_domain_lock);
375 
376  list_for_each(iter, &dlm_domains) {
377  target = list_entry (iter, struct dlm_ctxt, list);
378 
379  if (target == dlm) {
380  __dlm_get(target);
381  break;
382  }
383 
384  target = NULL;
385  }
386 
387  spin_unlock(&dlm_domain_lock);
388 
389  return target;
390 }
391 
393 {
394  int ret;
395 
396  spin_lock(&dlm_domain_lock);
397  ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
399  spin_unlock(&dlm_domain_lock);
400 
401  return ret;
402 }
403 
404 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
405 {
406  if (dlm->dlm_worker) {
409  dlm->dlm_worker = NULL;
410  }
411 }
412 
413 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
414 {
415  dlm_unregister_domain_handlers(dlm);
416  dlm_debug_shutdown(dlm);
417  dlm_complete_thread(dlm);
419  dlm_destroy_dlm_worker(dlm);
420 
421  /* We've left the domain. Now we can take ourselves out of the
422  * list and allow the kref stuff to help us free the
423  * memory. */
424  spin_lock(&dlm_domain_lock);
425  list_del_init(&dlm->list);
426  spin_unlock(&dlm_domain_lock);
427 
428  /* Wake up anyone waiting for us to remove this domain */
429  wake_up(&dlm_domain_events);
430 }
431 
432 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
433 {
434  int i, num, n, ret = 0;
435  struct dlm_lock_resource *res;
436  struct hlist_node *iter;
437  struct hlist_head *bucket;
438  int dropped;
439 
440  mlog(0, "Migrating locks from domain %s\n", dlm->name);
441 
442  num = 0;
443  spin_lock(&dlm->spinlock);
444  for (i = 0; i < DLM_HASH_BUCKETS; i++) {
445 redo_bucket:
446  n = 0;
447  bucket = dlm_lockres_hash(dlm, i);
448  iter = bucket->first;
449  while (iter) {
450  n++;
451  res = hlist_entry(iter, struct dlm_lock_resource,
452  hash_node);
453  dlm_lockres_get(res);
454  /* migrate, if necessary. this will drop the dlm
455  * spinlock and retake it if it does migration. */
456  dropped = dlm_empty_lockres(dlm, res);
457 
458  spin_lock(&res->spinlock);
459  if (dropped)
460  __dlm_lockres_calc_usage(dlm, res);
461  else
462  iter = res->hash_node.next;
463  spin_unlock(&res->spinlock);
464 
465  dlm_lockres_put(res);
466 
467  if (dropped) {
469  goto redo_bucket;
470  }
471  }
473  num += n;
474  }
475  spin_unlock(&dlm->spinlock);
476  wake_up(&dlm->dlm_thread_wq);
477 
478  /* let the dlm thread take care of purging, keep scanning until
479  * nothing remains in the hash */
480  if (num) {
481  mlog(0, "%s: %d lock resources in hash last pass\n",
482  dlm->name, num);
483  ret = -EAGAIN;
484  }
485  mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
486  return ret;
487 }
488 
489 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
490 {
491  int ret;
492 
493  spin_lock(&dlm->spinlock);
495  spin_unlock(&dlm->spinlock);
496 
497  return ret;
498 }
499 
500 static int dlm_begin_exit_domain_handler(struct o2net_msg *msg, u32 len,
501  void *data, void **ret_data)
502 {
503  struct dlm_ctxt *dlm = data;
504  unsigned int node;
505  struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
506 
507  if (!dlm_grab(dlm))
508  return 0;
509 
510  node = exit_msg->node_idx;
511  mlog(0, "%s: Node %u sent a begin exit domain message\n", dlm->name, node);
512 
513  spin_lock(&dlm->spinlock);
514  set_bit(node, dlm->exit_domain_map);
515  spin_unlock(&dlm->spinlock);
516 
517  dlm_put(dlm);
518 
519  return 0;
520 }
521 
522 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
523 {
524  /* Yikes, a double spinlock! I need domain_lock for the dlm
525  * state and the dlm spinlock for join state... Sorry! */
526 again:
527  spin_lock(&dlm_domain_lock);
528  spin_lock(&dlm->spinlock);
529 
531  mlog(0, "Node %d is joining, we wait on it.\n",
532  dlm->joining_node);
533  spin_unlock(&dlm->spinlock);
534  spin_unlock(&dlm_domain_lock);
535 
536  wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
537  goto again;
538  }
539 
541  spin_unlock(&dlm->spinlock);
542  spin_unlock(&dlm_domain_lock);
543 }
544 
545 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
546 {
547  int node = -1, num = 0;
548 
550 
551  printk("( ");
552  while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
553  node + 1)) < O2NM_MAX_NODES) {
554  printk("%d ", node);
555  ++num;
556  }
557  printk(") %u nodes\n", num);
558 }
559 
560 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
561  void **ret_data)
562 {
563  struct dlm_ctxt *dlm = data;
564  unsigned int node;
565  struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
566 
567  mlog(0, "%p %u %p", msg, len, data);
568 
569  if (!dlm_grab(dlm))
570  return 0;
571 
572  node = exit_msg->node_idx;
573 
574  spin_lock(&dlm->spinlock);
575  clear_bit(node, dlm->domain_map);
576  clear_bit(node, dlm->exit_domain_map);
577  printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name);
578  __dlm_print_nodes(dlm);
579 
580  /* notify anything attached to the heartbeat events */
581  dlm_hb_event_notify_attached(dlm, node, 0);
582 
583  spin_unlock(&dlm->spinlock);
584 
585  dlm_put(dlm);
586 
587  return 0;
588 }
589 
590 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, u32 msg_type,
591  unsigned int node)
592 {
593  int status;
594  struct dlm_exit_domain leave_msg;
595 
596  mlog(0, "%s: Sending domain exit message %u to node %u\n", dlm->name,
597  msg_type, node);
598 
599  memset(&leave_msg, 0, sizeof(leave_msg));
600  leave_msg.node_idx = dlm->node_num;
601 
602  status = o2net_send_message(msg_type, dlm->key, &leave_msg,
603  sizeof(leave_msg), node, NULL);
604  if (status < 0)
605  mlog(ML_ERROR, "Error %d sending domain exit message %u "
606  "to node %u on domain %s\n", status, msg_type, node,
607  dlm->name);
608 
609  return status;
610 }
611 
612 static void dlm_begin_exit_domain(struct dlm_ctxt *dlm)
613 {
614  int node = -1;
615 
616  /* Support for begin exit domain was added in 1.2 */
617  if (dlm->dlm_locking_proto.pv_major == 1 &&
618  dlm->dlm_locking_proto.pv_minor < 2)
619  return;
620 
621  /*
622  * Unlike DLM_EXIT_DOMAIN_MSG, DLM_BEGIN_EXIT_DOMAIN_MSG is purely
623  * informational. Meaning if a node does not receive the message,
624  * so be it.
625  */
626  spin_lock(&dlm->spinlock);
627  while (1) {
628  node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1);
629  if (node >= O2NM_MAX_NODES)
630  break;
631  if (node == dlm->node_num)
632  continue;
633 
634  spin_unlock(&dlm->spinlock);
635  dlm_send_one_domain_exit(dlm, DLM_BEGIN_EXIT_DOMAIN_MSG, node);
636  spin_lock(&dlm->spinlock);
637  }
638  spin_unlock(&dlm->spinlock);
639 }
640 
641 static void dlm_leave_domain(struct dlm_ctxt *dlm)
642 {
643  int node, clear_node, status;
644 
645  /* At this point we've migrated away all our locks and won't
646  * accept mastership of new ones. The dlm is responsible for
647  * almost nothing now. We make sure not to confuse any joining
648  * nodes and then commence shutdown procedure. */
649 
650  spin_lock(&dlm->spinlock);
651  /* Clear ourselves from the domain map */
652  clear_bit(dlm->node_num, dlm->domain_map);
653  while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
654  0)) < O2NM_MAX_NODES) {
655  /* Drop the dlm spinlock. This is safe wrt the domain_map.
656  * -nodes cannot be added now as the
657  * query_join_handlers knows to respond with OK_NO_MAP
658  * -we catch the right network errors if a node is
659  * removed from the map while we're sending him the
660  * exit message. */
661  spin_unlock(&dlm->spinlock);
662 
663  clear_node = 1;
664 
665  status = dlm_send_one_domain_exit(dlm, DLM_EXIT_DOMAIN_MSG,
666  node);
667  if (status < 0 &&
668  status != -ENOPROTOOPT &&
669  status != -ENOTCONN) {
670  mlog(ML_NOTICE, "Error %d sending domain exit message "
671  "to node %d\n", status, node);
672 
673  /* Not sure what to do here but lets sleep for
674  * a bit in case this was a transient
675  * error... */
677  clear_node = 0;
678  }
679 
680  spin_lock(&dlm->spinlock);
681  /* If we're not clearing the node bit then we intend
682  * to loop back around to try again. */
683  if (clear_node)
684  clear_bit(node, dlm->domain_map);
685  }
686  spin_unlock(&dlm->spinlock);
687 }
688 
689 int dlm_joined(struct dlm_ctxt *dlm)
690 {
691  int ret = 0;
692 
693  spin_lock(&dlm_domain_lock);
694 
695  if (dlm->dlm_state == DLM_CTXT_JOINED)
696  ret = 1;
697 
698  spin_unlock(&dlm_domain_lock);
699 
700  return ret;
701 }
702 
703 int dlm_shutting_down(struct dlm_ctxt *dlm)
704 {
705  int ret = 0;
706 
707  spin_lock(&dlm_domain_lock);
708 
709  if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
710  ret = 1;
711 
712  spin_unlock(&dlm_domain_lock);
713 
714  return ret;
715 }
716 
718 {
719  int leave = 0;
720  struct dlm_lock_resource *res;
721 
722  spin_lock(&dlm_domain_lock);
724  BUG_ON(!dlm->num_joins);
725 
726  dlm->num_joins--;
727  if (!dlm->num_joins) {
728  /* We mark it "in shutdown" now so new register
729  * requests wait until we've completely left the
730  * domain. Don't use DLM_CTXT_LEAVING yet as we still
731  * want new domain joins to communicate with us at
732  * least until we've completed migration of our
733  * resources. */
735  leave = 1;
736  }
737  spin_unlock(&dlm_domain_lock);
738 
739  if (leave) {
740  mlog(0, "shutting down domain %s\n", dlm->name);
741  dlm_begin_exit_domain(dlm);
742 
743  /* We changed dlm state, notify the thread */
744  dlm_kick_thread(dlm, NULL);
745 
746  while (dlm_migrate_all_locks(dlm)) {
747  /* Give dlm_thread time to purge the lockres' */
748  msleep(500);
749  mlog(0, "%s: more migration to do\n", dlm->name);
750  }
751 
752  /* This list should be empty. If not, print remaining lockres */
753  if (!list_empty(&dlm->tracking_list)) {
754  mlog(ML_ERROR, "Following lockres' are still on the "
755  "tracking list:\n");
758  }
759 
760  dlm_mark_domain_leaving(dlm);
761  dlm_leave_domain(dlm);
762  printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name);
763  dlm_force_free_mles(dlm);
764  dlm_complete_dlm_shutdown(dlm);
765  }
766  dlm_put(dlm);
767 }
769 
770 static int dlm_query_join_proto_check(char *proto_type, int node,
771  struct dlm_protocol_version *ours,
773 {
774  int rc;
776 
777  if (!dlm_protocol_compare(ours, &proto)) {
778  mlog(0,
779  "node %u wanted to join with %s locking protocol "
780  "%u.%u, we respond with %u.%u\n",
781  node, proto_type,
782  request->pv_major,
783  request->pv_minor,
784  proto.pv_major, proto.pv_minor);
785  request->pv_minor = proto.pv_minor;
786  rc = 0;
787  } else {
788  mlog(ML_NOTICE,
789  "Node %u wanted to join with %s locking "
790  "protocol %u.%u, but we have %u.%u, disallowing\n",
791  node, proto_type,
792  request->pv_major,
793  request->pv_minor,
794  ours->pv_major,
795  ours->pv_minor);
796  rc = 1;
797  }
798 
799  return rc;
800 }
801 
802 /*
803  * struct dlm_query_join_packet is made up of four one-byte fields. They
804  * are effectively in big-endian order already. However, little-endian
805  * machines swap them before putting the packet on the wire (because
806  * query_join's response is a status, and that status is treated as a u32
807  * on the wire). Thus, a big-endian and little-endian machines will treat
808  * this structure differently.
809  *
810  * The solution is to have little-endian machines swap the structure when
811  * converting from the structure to the u32 representation. This will
812  * result in the structure having the correct format on the wire no matter
813  * the host endian format.
814  */
815 static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
816  u32 *wire)
817 {
819 
820  response.packet = *packet;
821  *wire = be32_to_cpu(response.intval);
822 }
823 
824 static void dlm_query_join_wire_to_packet(u32 wire,
826 {
828 
829  response.intval = cpu_to_be32(wire);
830  *packet = response.packet;
831 }
832 
833 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
834  void **ret_data)
835 {
837  struct dlm_query_join_packet packet = {
838  .code = JOIN_DISALLOW,
839  };
840  struct dlm_ctxt *dlm = NULL;
841  u32 response;
842  u8 nodenum;
843 
844  query = (struct dlm_query_join_request *) msg->buf;
845 
846  mlog(0, "node %u wants to join domain %s\n", query->node_idx,
847  query->domain);
848 
849  /*
850  * If heartbeat doesn't consider the node live, tell it
851  * to back off and try again. This gives heartbeat a chance
852  * to catch up.
853  */
854  if (!o2hb_check_node_heartbeating(query->node_idx)) {
855  mlog(0, "node %u is not in our live map yet\n",
856  query->node_idx);
857 
858  packet.code = JOIN_DISALLOW;
859  goto respond;
860  }
861 
862  packet.code = JOIN_OK_NO_MAP;
863 
864  spin_lock(&dlm_domain_lock);
865  dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
866  if (!dlm)
867  goto unlock_respond;
868 
869  /*
870  * There is a small window where the joining node may not see the
871  * node(s) that just left but still part of the cluster. DISALLOW
872  * join request if joining node has different node map.
873  */
874  nodenum=0;
875  while (nodenum < O2NM_MAX_NODES) {
876  if (test_bit(nodenum, dlm->domain_map)) {
877  if (!byte_test_bit(nodenum, query->node_map)) {
878  mlog(0, "disallow join as node %u does not "
879  "have node %u in its nodemap\n",
880  query->node_idx, nodenum);
881  packet.code = JOIN_DISALLOW;
882  goto unlock_respond;
883  }
884  }
885  nodenum++;
886  }
887 
888  /* Once the dlm ctxt is marked as leaving then we don't want
889  * to be put in someone's domain map.
890  * Also, explicitly disallow joining at certain troublesome
891  * times (ie. during recovery). */
892  if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
893  int bit = query->node_idx;
894  spin_lock(&dlm->spinlock);
895 
896  if (dlm->dlm_state == DLM_CTXT_NEW &&
898  /*If this is a brand new context and we
899  * haven't started our join process yet, then
900  * the other node won the race. */
901  packet.code = JOIN_OK_NO_MAP;
902  } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
903  /* Disallow parallel joins. */
904  packet.code = JOIN_DISALLOW;
905  } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
906  mlog(0, "node %u trying to join, but recovery "
907  "is ongoing.\n", bit);
908  packet.code = JOIN_DISALLOW;
909  } else if (test_bit(bit, dlm->recovery_map)) {
910  mlog(0, "node %u trying to join, but it "
911  "still needs recovery.\n", bit);
912  packet.code = JOIN_DISALLOW;
913  } else if (test_bit(bit, dlm->domain_map)) {
914  mlog(0, "node %u trying to join, but it "
915  "is still in the domain! needs recovery?\n",
916  bit);
917  packet.code = JOIN_DISALLOW;
918  } else {
919  /* Alright we're fully a part of this domain
920  * so we keep some state as to who's joining
921  * and indicate to him that needs to be fixed
922  * up. */
923 
924  /* Make sure we speak compatible locking protocols. */
925  if (dlm_query_join_proto_check("DLM", bit,
926  &dlm->dlm_locking_proto,
927  &query->dlm_proto)) {
928  packet.code = JOIN_PROTOCOL_MISMATCH;
929  } else if (dlm_query_join_proto_check("fs", bit,
930  &dlm->fs_locking_proto,
931  &query->fs_proto)) {
932  packet.code = JOIN_PROTOCOL_MISMATCH;
933  } else {
934  packet.dlm_minor = query->dlm_proto.pv_minor;
935  packet.fs_minor = query->fs_proto.pv_minor;
936  packet.code = JOIN_OK;
937  __dlm_set_joining_node(dlm, query->node_idx);
938  }
939  }
940 
941  spin_unlock(&dlm->spinlock);
942  }
943 unlock_respond:
944  spin_unlock(&dlm_domain_lock);
945 
946 respond:
947  mlog(0, "We respond with %u\n", packet.code);
948 
949  dlm_query_join_packet_to_wire(&packet, &response);
950  return response;
951 }
952 
953 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
954  void **ret_data)
955 {
956  struct dlm_assert_joined *assert;
957  struct dlm_ctxt *dlm = NULL;
958 
959  assert = (struct dlm_assert_joined *) msg->buf;
960 
961  mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
962  assert->domain);
963 
964  spin_lock(&dlm_domain_lock);
965  dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
966  /* XXX should we consider no dlm ctxt an error? */
967  if (dlm) {
968  spin_lock(&dlm->spinlock);
969 
970  /* Alright, this node has officially joined our
971  * domain. Set him in the map and clean up our
972  * leftover join state. */
973  BUG_ON(dlm->joining_node != assert->node_idx);
974  set_bit(assert->node_idx, dlm->domain_map);
975  clear_bit(assert->node_idx, dlm->exit_domain_map);
976  __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
977 
978  printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ",
979  assert->node_idx, dlm->name);
980  __dlm_print_nodes(dlm);
981 
982  /* notify anything attached to the heartbeat events */
983  dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
984 
985  spin_unlock(&dlm->spinlock);
986  }
987  spin_unlock(&dlm_domain_lock);
988 
989  return 0;
990 }
991 
992 static int dlm_match_regions(struct dlm_ctxt *dlm,
993  struct dlm_query_region *qr,
994  char *local, int locallen)
995 {
996  char *remote = qr->qr_regions;
997  char *l, *r;
998  int localnr, i, j, foundit;
999  int status = 0;
1000 
1002  if (qr->qr_numregions) {
1003  mlog(ML_ERROR, "Domain %s: Joining node %d has global "
1004  "heartbeat enabled but local node %d does not\n",
1005  qr->qr_domain, qr->qr_node, dlm->node_num);
1006  status = -EINVAL;
1007  }
1008  goto bail;
1009  }
1010 
1012  mlog(ML_ERROR, "Domain %s: Local node %d has global "
1013  "heartbeat enabled but joining node %d does not\n",
1014  qr->qr_domain, dlm->node_num, qr->qr_node);
1015  status = -EINVAL;
1016  goto bail;
1017  }
1018 
1019  r = remote;
1020  for (i = 0; i < qr->qr_numregions; ++i) {
1021  mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
1023  }
1024 
1025  localnr = min(O2NM_MAX_REGIONS, locallen/O2HB_MAX_REGION_NAME_LEN);
1026  localnr = o2hb_get_all_regions(local, (u8)localnr);
1027 
1028  /* compare local regions with remote */
1029  l = local;
1030  for (i = 0; i < localnr; ++i) {
1031  foundit = 0;
1032  r = remote;
1033  for (j = 0; j <= qr->qr_numregions; ++j) {
1034  if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
1035  foundit = 1;
1036  break;
1037  }
1039  }
1040  if (!foundit) {
1041  status = -EINVAL;
1042  mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1043  "in local node %d but not in joining node %d\n",
1045  dlm->node_num, qr->qr_node);
1046  goto bail;
1047  }
1049  }
1050 
1051  /* compare remote with local regions */
1052  r = remote;
1053  for (i = 0; i < qr->qr_numregions; ++i) {
1054  foundit = 0;
1055  l = local;
1056  for (j = 0; j < localnr; ++j) {
1057  if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
1058  foundit = 1;
1059  break;
1060  }
1062  }
1063  if (!foundit) {
1064  status = -EINVAL;
1065  mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1066  "in joining node %d but not in local node %d\n",
1068  qr->qr_node, dlm->node_num);
1069  goto bail;
1070  }
1072  }
1073 
1074 bail:
1075  return status;
1076 }
1077 
1078 static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1079 {
1080  struct dlm_query_region *qr = NULL;
1081  int status, ret = 0, i;
1082  char *p;
1083 
1084  if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1085  goto bail;
1086 
1087  qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1088  if (!qr) {
1089  ret = -ENOMEM;
1090  mlog_errno(ret);
1091  goto bail;
1092  }
1093 
1094  qr->qr_node = dlm->node_num;
1095  qr->qr_namelen = strlen(dlm->name);
1096  memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1097  /* if local hb, the numregions will be zero */
1101 
1102  p = qr->qr_regions;
1103  for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1104  mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1105 
1106  i = -1;
1107  while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1108  i + 1)) < O2NM_MAX_NODES) {
1109  if (i == dlm->node_num)
1110  continue;
1111 
1112  mlog(0, "Sending regions to node %d\n", i);
1113 
1115  sizeof(struct dlm_query_region),
1116  i, &status);
1117  if (ret >= 0)
1118  ret = status;
1119  if (ret) {
1120  mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1121  ret, i);
1122  break;
1123  }
1124  }
1125 
1126 bail:
1127  kfree(qr);
1128  return ret;
1129 }
1130 
1131 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1132  void *data, void **ret_data)
1133 {
1134  struct dlm_query_region *qr;
1135  struct dlm_ctxt *dlm = NULL;
1136  char *local = NULL;
1137  int status = 0;
1138  int locked = 0;
1139 
1140  qr = (struct dlm_query_region *) msg->buf;
1141 
1142  mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1143  qr->qr_domain);
1144 
1145  /* buffer used in dlm_mast_regions() */
1146  local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
1147  if (!local) {
1148  status = -ENOMEM;
1149  goto bail;
1150  }
1151 
1152  status = -EINVAL;
1153 
1154  spin_lock(&dlm_domain_lock);
1155  dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1156  if (!dlm) {
1157  mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1158  "before join domain\n", qr->qr_node, qr->qr_domain);
1159  goto bail;
1160  }
1161 
1162  spin_lock(&dlm->spinlock);
1163  locked = 1;
1164  if (dlm->joining_node != qr->qr_node) {
1165  mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1166  "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1167  dlm->joining_node);
1168  goto bail;
1169  }
1170 
1171  /* Support for global heartbeat was added in 1.1 */
1172  if (dlm->dlm_locking_proto.pv_major == 1 &&
1173  dlm->dlm_locking_proto.pv_minor == 0) {
1174  mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1175  "but active dlm protocol is %d.%d\n", qr->qr_node,
1176  qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1177  dlm->dlm_locking_proto.pv_minor);
1178  goto bail;
1179  }
1180 
1181  status = dlm_match_regions(dlm, qr, local, sizeof(qr->qr_regions));
1182 
1183 bail:
1184  if (locked)
1185  spin_unlock(&dlm->spinlock);
1186  spin_unlock(&dlm_domain_lock);
1187 
1188  kfree(local);
1189 
1190  return status;
1191 }
1192 
1193 static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1194 {
1195  struct o2nm_node *local;
1196  struct dlm_node_info *remote;
1197  int i, j;
1198  int status = 0;
1199 
1200  for (j = 0; j < qn->qn_numnodes; ++j)
1201  mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1202  &(qn->qn_nodes[j].ni_ipv4_address),
1203  ntohs(qn->qn_nodes[j].ni_ipv4_port));
1204 
1205  for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1206  local = o2nm_get_node_by_num(i);
1207  remote = NULL;
1208  for (j = 0; j < qn->qn_numnodes; ++j) {
1209  if (qn->qn_nodes[j].ni_nodenum == i) {
1210  remote = &(qn->qn_nodes[j]);
1211  break;
1212  }
1213  }
1214 
1215  if (!local && !remote)
1216  continue;
1217 
1218  if ((local && !remote) || (!local && remote))
1219  status = -EINVAL;
1220 
1221  if (!status &&
1222  ((remote->ni_nodenum != local->nd_num) ||
1223  (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1224  (remote->ni_ipv4_address != local->nd_ipv4_address)))
1225  status = -EINVAL;
1226 
1227  if (status) {
1228  if (remote && !local)
1229  mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1230  "registered in joining node %d but not in "
1231  "local node %d\n", qn->qn_domain,
1232  remote->ni_nodenum,
1233  &(remote->ni_ipv4_address),
1234  ntohs(remote->ni_ipv4_port),
1235  qn->qn_nodenum, dlm->node_num);
1236  if (local && !remote)
1237  mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1238  "registered in local node %d but not in "
1239  "joining node %d\n", qn->qn_domain,
1240  local->nd_num, &(local->nd_ipv4_address),
1241  ntohs(local->nd_ipv4_port),
1242  dlm->node_num, qn->qn_nodenum);
1243  BUG_ON((!local && !remote));
1244  }
1245 
1246  if (local)
1247  o2nm_node_put(local);
1248  }
1249 
1250  return status;
1251 }
1252 
1253 static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1254 {
1255  struct dlm_query_nodeinfo *qn = NULL;
1256  struct o2nm_node *node;
1257  int ret = 0, status, count, i;
1258 
1259  if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1260  goto bail;
1261 
1262  qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1263  if (!qn) {
1264  ret = -ENOMEM;
1265  mlog_errno(ret);
1266  goto bail;
1267  }
1268 
1269  for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1270  node = o2nm_get_node_by_num(i);
1271  if (!node)
1272  continue;
1273  qn->qn_nodes[count].ni_nodenum = node->nd_num;
1274  qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1275  qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1276  mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1277  &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1278  ++count;
1279  o2nm_node_put(node);
1280  }
1281 
1282  qn->qn_nodenum = dlm->node_num;
1283  qn->qn_numnodes = count;
1284  qn->qn_namelen = strlen(dlm->name);
1285  memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1286 
1287  i = -1;
1288  while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1289  i + 1)) < O2NM_MAX_NODES) {
1290  if (i == dlm->node_num)
1291  continue;
1292 
1293  mlog(0, "Sending nodeinfo to node %d\n", i);
1294 
1296  qn, sizeof(struct dlm_query_nodeinfo),
1297  i, &status);
1298  if (ret >= 0)
1299  ret = status;
1300  if (ret) {
1301  mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1302  break;
1303  }
1304  }
1305 
1306 bail:
1307  kfree(qn);
1308  return ret;
1309 }
1310 
1311 static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1312  void *data, void **ret_data)
1313 {
1314  struct dlm_query_nodeinfo *qn;
1315  struct dlm_ctxt *dlm = NULL;
1316  int locked = 0, status = -EINVAL;
1317 
1318  qn = (struct dlm_query_nodeinfo *) msg->buf;
1319 
1320  mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1321  qn->qn_domain);
1322 
1323  spin_lock(&dlm_domain_lock);
1324  dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1325  if (!dlm) {
1326  mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1327  "join domain\n", qn->qn_nodenum, qn->qn_domain);
1328  goto bail;
1329  }
1330 
1331  spin_lock(&dlm->spinlock);
1332  locked = 1;
1333  if (dlm->joining_node != qn->qn_nodenum) {
1334  mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1335  "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1336  dlm->joining_node);
1337  goto bail;
1338  }
1339 
1340  /* Support for node query was added in 1.1 */
1341  if (dlm->dlm_locking_proto.pv_major == 1 &&
1342  dlm->dlm_locking_proto.pv_minor == 0) {
1343  mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1344  "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1345  qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1346  dlm->dlm_locking_proto.pv_minor);
1347  goto bail;
1348  }
1349 
1350  status = dlm_match_nodes(dlm, qn);
1351 
1352 bail:
1353  if (locked)
1354  spin_unlock(&dlm->spinlock);
1355  spin_unlock(&dlm_domain_lock);
1356 
1357  return status;
1358 }
1359 
1360 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
1361  void **ret_data)
1362 {
1363  struct dlm_cancel_join *cancel;
1364  struct dlm_ctxt *dlm = NULL;
1365 
1366  cancel = (struct dlm_cancel_join *) msg->buf;
1367 
1368  mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
1369  cancel->domain);
1370 
1371  spin_lock(&dlm_domain_lock);
1372  dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
1373 
1374  if (dlm) {
1375  spin_lock(&dlm->spinlock);
1376 
1377  /* Yikes, this guy wants to cancel his join. No
1378  * problem, we simply cleanup our join state. */
1379  BUG_ON(dlm->joining_node != cancel->node_idx);
1380  __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1381 
1382  spin_unlock(&dlm->spinlock);
1383  }
1384  spin_unlock(&dlm_domain_lock);
1385 
1386  return 0;
1387 }
1388 
1389 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
1390  unsigned int node)
1391 {
1392  int status;
1393  struct dlm_cancel_join cancel_msg;
1394 
1395  memset(&cancel_msg, 0, sizeof(cancel_msg));
1396  cancel_msg.node_idx = dlm->node_num;
1397  cancel_msg.name_len = strlen(dlm->name);
1398  memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
1399 
1401  &cancel_msg, sizeof(cancel_msg), node,
1402  NULL);
1403  if (status < 0) {
1404  mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1405  "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1406  node);
1407  goto bail;
1408  }
1409 
1410 bail:
1411  return status;
1412 }
1413 
1414 /* map_size should be in bytes. */
1415 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
1416  unsigned long *node_map,
1417  unsigned int map_size)
1418 {
1419  int status, tmpstat;
1420  unsigned int node;
1421 
1422  if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
1423  sizeof(unsigned long))) {
1424  mlog(ML_ERROR,
1425  "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
1426  map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
1427  return -EINVAL;
1428  }
1429 
1430  status = 0;
1431  node = -1;
1432  while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1433  node + 1)) < O2NM_MAX_NODES) {
1434  if (node == dlm->node_num)
1435  continue;
1436 
1437  tmpstat = dlm_send_one_join_cancel(dlm, node);
1438  if (tmpstat) {
1439  mlog(ML_ERROR, "Error return %d cancelling join on "
1440  "node %d\n", tmpstat, node);
1441  if (!status)
1442  status = tmpstat;
1443  }
1444  }
1445 
1446  if (status)
1447  mlog_errno(status);
1448  return status;
1449 }
1450 
1451 static int dlm_request_join(struct dlm_ctxt *dlm,
1452  int node,
1453  enum dlm_query_join_response_code *response)
1454 {
1455  int status;
1456  struct dlm_query_join_request join_msg;
1457  struct dlm_query_join_packet packet;
1458  u32 join_resp;
1459 
1460  mlog(0, "querying node %d\n", node);
1461 
1462  memset(&join_msg, 0, sizeof(join_msg));
1463  join_msg.node_idx = dlm->node_num;
1464  join_msg.name_len = strlen(dlm->name);
1465  memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1466  join_msg.dlm_proto = dlm->dlm_locking_proto;
1467  join_msg.fs_proto = dlm->fs_locking_proto;
1468 
1469  /* copy live node map to join message */
1470  byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1471 
1472  status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1473  sizeof(join_msg), node, &join_resp);
1474  if (status < 0 && status != -ENOPROTOOPT) {
1475  mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1476  "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1477  node);
1478  goto bail;
1479  }
1480  dlm_query_join_wire_to_packet(join_resp, &packet);
1481 
1482  /* -ENOPROTOOPT from the net code means the other side isn't
1483  listening for our message type -- that's fine, it means
1484  his dlm isn't up, so we can consider him a 'yes' but not
1485  joined into the domain. */
1486  if (status == -ENOPROTOOPT) {
1487  status = 0;
1488  *response = JOIN_OK_NO_MAP;
1489  } else if (packet.code == JOIN_DISALLOW ||
1490  packet.code == JOIN_OK_NO_MAP) {
1491  *response = packet.code;
1492  } else if (packet.code == JOIN_PROTOCOL_MISMATCH) {
1493  mlog(ML_NOTICE,
1494  "This node requested DLM locking protocol %u.%u and "
1495  "filesystem locking protocol %u.%u. At least one of "
1496  "the protocol versions on node %d is not compatible, "
1497  "disconnecting\n",
1498  dlm->dlm_locking_proto.pv_major,
1499  dlm->dlm_locking_proto.pv_minor,
1500  dlm->fs_locking_proto.pv_major,
1501  dlm->fs_locking_proto.pv_minor,
1502  node);
1503  status = -EPROTO;
1504  *response = packet.code;
1505  } else if (packet.code == JOIN_OK) {
1506  *response = packet.code;
1507  /* Use the same locking protocol as the remote node */
1508  dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1509  dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1510  mlog(0,
1511  "Node %d responds JOIN_OK with DLM locking protocol "
1512  "%u.%u and fs locking protocol %u.%u\n",
1513  node,
1514  dlm->dlm_locking_proto.pv_major,
1515  dlm->dlm_locking_proto.pv_minor,
1516  dlm->fs_locking_proto.pv_major,
1517  dlm->fs_locking_proto.pv_minor);
1518  } else {
1519  status = -EINVAL;
1520  mlog(ML_ERROR, "invalid response %d from node %u\n",
1521  packet.code, node);
1522  }
1523 
1524  mlog(0, "status %d, node %d response is %d\n", status, node,
1525  *response);
1526 
1527 bail:
1528  return status;
1529 }
1530 
1531 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1532  unsigned int node)
1533 {
1534  int status;
1535  struct dlm_assert_joined assert_msg;
1536 
1537  mlog(0, "Sending join assert to node %u\n", node);
1538 
1539  memset(&assert_msg, 0, sizeof(assert_msg));
1540  assert_msg.node_idx = dlm->node_num;
1541  assert_msg.name_len = strlen(dlm->name);
1542  memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1543 
1545  &assert_msg, sizeof(assert_msg), node,
1546  NULL);
1547  if (status < 0)
1548  mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1549  "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1550  node);
1551 
1552  return status;
1553 }
1554 
1555 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1556  unsigned long *node_map)
1557 {
1558  int status, node, live;
1559 
1560  status = 0;
1561  node = -1;
1562  while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1563  node + 1)) < O2NM_MAX_NODES) {
1564  if (node == dlm->node_num)
1565  continue;
1566 
1567  do {
1568  /* It is very important that this message be
1569  * received so we spin until either the node
1570  * has died or it gets the message. */
1571  status = dlm_send_one_join_assert(dlm, node);
1572 
1573  spin_lock(&dlm->spinlock);
1574  live = test_bit(node, dlm->live_nodes_map);
1575  spin_unlock(&dlm->spinlock);
1576 
1577  if (status) {
1578  mlog(ML_ERROR, "Error return %d asserting "
1579  "join on node %d\n", status, node);
1580 
1581  /* give us some time between errors... */
1582  if (live)
1584  }
1585  } while (status && live);
1586  }
1587 }
1588 
1590  unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1591  unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1592 };
1593 
1594 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1595  struct domain_join_ctxt *ctxt,
1596  enum dlm_query_join_response_code response)
1597 {
1598  int ret;
1599 
1600  if (response == JOIN_DISALLOW) {
1601  mlog(0, "Latest response of disallow -- should restart\n");
1602  return 1;
1603  }
1604 
1605  spin_lock(&dlm->spinlock);
1606  /* For now, we restart the process if the node maps have
1607  * changed at all */
1608  ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1609  sizeof(dlm->live_nodes_map));
1610  spin_unlock(&dlm->spinlock);
1611 
1612  if (ret)
1613  mlog(0, "Node maps changed -- should restart\n");
1614 
1615  return ret;
1616 }
1617 
1618 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1619 {
1620  int status = 0, tmpstat, node;
1621  struct domain_join_ctxt *ctxt;
1623 
1624  mlog(0, "%p", dlm);
1625 
1626  ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1627  if (!ctxt) {
1628  status = -ENOMEM;
1629  mlog_errno(status);
1630  goto bail;
1631  }
1632 
1633  /* group sem locking should work for us here -- we're already
1634  * registered for heartbeat events so filling this should be
1635  * atomic wrt getting those handlers called. */
1636  o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1637 
1638  spin_lock(&dlm->spinlock);
1639  memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1640 
1641  __dlm_set_joining_node(dlm, dlm->node_num);
1642 
1643  spin_unlock(&dlm->spinlock);
1644 
1645  node = -1;
1646  while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1647  node + 1)) < O2NM_MAX_NODES) {
1648  if (node == dlm->node_num)
1649  continue;
1650 
1651  status = dlm_request_join(dlm, node, &response);
1652  if (status < 0) {
1653  mlog_errno(status);
1654  goto bail;
1655  }
1656 
1657  /* Ok, either we got a response or the node doesn't have a
1658  * dlm up. */
1659  if (response == JOIN_OK)
1660  set_bit(node, ctxt->yes_resp_map);
1661 
1662  if (dlm_should_restart_join(dlm, ctxt, response)) {
1663  status = -EAGAIN;
1664  goto bail;
1665  }
1666  }
1667 
1668  mlog(0, "Yay, done querying nodes!\n");
1669 
1670  /* Yay, everyone agree's we can join the domain. My domain is
1671  * comprised of all nodes who were put in the
1672  * yes_resp_map. Copy that into our domain map and send a join
1673  * assert message to clean up everyone elses state. */
1674  spin_lock(&dlm->spinlock);
1675  memcpy(dlm->domain_map, ctxt->yes_resp_map,
1676  sizeof(ctxt->yes_resp_map));
1677  set_bit(dlm->node_num, dlm->domain_map);
1678  spin_unlock(&dlm->spinlock);
1679 
1680  /* Support for global heartbeat and node info was added in 1.1 */
1681  if (dlm->dlm_locking_proto.pv_major > 1 ||
1682  dlm->dlm_locking_proto.pv_minor > 0) {
1683  status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1684  if (status) {
1685  mlog_errno(status);
1686  goto bail;
1687  }
1688  status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1689  if (status) {
1690  mlog_errno(status);
1691  goto bail;
1692  }
1693  }
1694 
1695  dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1696 
1697  /* Joined state *must* be set before the joining node
1698  * information, otherwise the query_join handler may read no
1699  * current joiner but a state of NEW and tell joining nodes
1700  * we're not in the domain. */
1701  spin_lock(&dlm_domain_lock);
1702  dlm->dlm_state = DLM_CTXT_JOINED;
1703  dlm->num_joins++;
1704  spin_unlock(&dlm_domain_lock);
1705 
1706 bail:
1707  spin_lock(&dlm->spinlock);
1708  __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1709  if (!status) {
1710  printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name);
1711  __dlm_print_nodes(dlm);
1712  }
1713  spin_unlock(&dlm->spinlock);
1714 
1715  if (ctxt) {
1716  /* Do we need to send a cancel message to any nodes? */
1717  if (status < 0) {
1718  tmpstat = dlm_send_join_cancels(dlm,
1719  ctxt->yes_resp_map,
1720  sizeof(ctxt->yes_resp_map));
1721  if (tmpstat < 0)
1722  mlog_errno(tmpstat);
1723  }
1724  kfree(ctxt);
1725  }
1726 
1727  mlog(0, "returning %d\n", status);
1728  return status;
1729 }
1730 
1731 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1732 {
1736 }
1737 
1738 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1739 {
1740  int status;
1741 
1742  mlog(0, "registering handlers.\n");
1743 
1746  status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_down);
1747  if (status)
1748  goto bail;
1749 
1752  status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_up);
1753  if (status)
1754  goto bail;
1755 
1757  sizeof(struct dlm_master_request),
1759  dlm, NULL, &dlm->dlm_domain_handlers);
1760  if (status)
1761  goto bail;
1762 
1764  sizeof(struct dlm_assert_master),
1767  &dlm->dlm_domain_handlers);
1768  if (status)
1769  goto bail;
1770 
1772  sizeof(struct dlm_create_lock),
1774  dlm, NULL, &dlm->dlm_domain_handlers);
1775  if (status)
1776  goto bail;
1777 
1781  dlm, NULL, &dlm->dlm_domain_handlers);
1782  if (status)
1783  goto bail;
1784 
1788  dlm, NULL, &dlm->dlm_domain_handlers);
1789  if (status)
1790  goto bail;
1791 
1795  dlm, NULL, &dlm->dlm_domain_handlers);
1796  if (status)
1797  goto bail;
1798 
1800  sizeof(struct dlm_exit_domain),
1801  dlm_exit_domain_handler,
1802  dlm, NULL, &dlm->dlm_domain_handlers);
1803  if (status)
1804  goto bail;
1805 
1807  sizeof(struct dlm_deref_lockres),
1809  dlm, NULL, &dlm->dlm_domain_handlers);
1810  if (status)
1811  goto bail;
1812 
1814  sizeof(struct dlm_migrate_request),
1816  dlm, NULL, &dlm->dlm_domain_handlers);
1817  if (status)
1818  goto bail;
1819 
1823  dlm, NULL, &dlm->dlm_domain_handlers);
1824  if (status)
1825  goto bail;
1826 
1828  sizeof(struct dlm_master_requery),
1830  dlm, NULL, &dlm->dlm_domain_handlers);
1831  if (status)
1832  goto bail;
1833 
1835  sizeof(struct dlm_lock_request),
1837  dlm, NULL, &dlm->dlm_domain_handlers);
1838  if (status)
1839  goto bail;
1840 
1842  sizeof(struct dlm_reco_data_done),
1844  dlm, NULL, &dlm->dlm_domain_handlers);
1845  if (status)
1846  goto bail;
1847 
1849  sizeof(struct dlm_begin_reco),
1851  dlm, NULL, &dlm->dlm_domain_handlers);
1852  if (status)
1853  goto bail;
1854 
1856  sizeof(struct dlm_finalize_reco),
1858  dlm, NULL, &dlm->dlm_domain_handlers);
1859  if (status)
1860  goto bail;
1861 
1863  sizeof(struct dlm_exit_domain),
1864  dlm_begin_exit_domain_handler,
1865  dlm, NULL, &dlm->dlm_domain_handlers);
1866  if (status)
1867  goto bail;
1868 
1869 bail:
1870  if (status)
1871  dlm_unregister_domain_handlers(dlm);
1872 
1873  return status;
1874 }
1875 
1876 static int dlm_join_domain(struct dlm_ctxt *dlm)
1877 {
1878  int status;
1879  unsigned int backoff;
1880  unsigned int total_backoff = 0;
1881 
1882  BUG_ON(!dlm);
1883 
1884  mlog(0, "Join domain %s\n", dlm->name);
1885 
1886  status = dlm_register_domain_handlers(dlm);
1887  if (status) {
1888  mlog_errno(status);
1889  goto bail;
1890  }
1891 
1892  status = dlm_debug_init(dlm);
1893  if (status < 0) {
1894  mlog_errno(status);
1895  goto bail;
1896  }
1897 
1898  status = dlm_launch_thread(dlm);
1899  if (status < 0) {
1900  mlog_errno(status);
1901  goto bail;
1902  }
1903 
1904  status = dlm_launch_recovery_thread(dlm);
1905  if (status < 0) {
1906  mlog_errno(status);
1907  goto bail;
1908  }
1909 
1910  dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1911  if (!dlm->dlm_worker) {
1912  status = -ENOMEM;
1913  mlog_errno(status);
1914  goto bail;
1915  }
1916 
1917  do {
1918  status = dlm_try_to_join_domain(dlm);
1919 
1920  /* If we're racing another node to the join, then we
1921  * need to back off temporarily and let them
1922  * complete. */
1923 #define DLM_JOIN_TIMEOUT_MSECS 90000
1924  if (status == -EAGAIN) {
1925  if (signal_pending(current)) {
1926  status = -ERESTARTSYS;
1927  goto bail;
1928  }
1929 
1930  if (total_backoff >
1931  msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1932  status = -ERESTARTSYS;
1933  mlog(ML_NOTICE, "Timed out joining dlm domain "
1934  "%s after %u msecs\n", dlm->name,
1935  jiffies_to_msecs(total_backoff));
1936  goto bail;
1937  }
1938 
1939  /*
1940  * <chip> After you!
1941  * <dale> No, after you!
1942  * <chip> I insist!
1943  * <dale> But you first!
1944  * ...
1945  */
1946  backoff = (unsigned int)(jiffies & 0x3);
1947  backoff *= DLM_DOMAIN_BACKOFF_MS;
1948  total_backoff += backoff;
1949  mlog(0, "backoff %d\n", backoff);
1950  msleep(backoff);
1951  }
1952  } while (status == -EAGAIN);
1953 
1954  if (status < 0) {
1955  mlog_errno(status);
1956  goto bail;
1957  }
1958 
1959  status = 0;
1960 bail:
1961  wake_up(&dlm_domain_events);
1962 
1963  if (status) {
1964  dlm_unregister_domain_handlers(dlm);
1965  dlm_debug_shutdown(dlm);
1966  dlm_complete_thread(dlm);
1968  dlm_destroy_dlm_worker(dlm);
1969  }
1970 
1971  return status;
1972 }
1973 
1974 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1975  u32 key)
1976 {
1977  int i;
1978  int ret;
1979  struct dlm_ctxt *dlm = NULL;
1980 
1981  dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1982  if (!dlm) {
1983  mlog_errno(-ENOMEM);
1984  goto leave;
1985  }
1986 
1987  dlm->name = kstrdup(domain, GFP_KERNEL);
1988  if (dlm->name == NULL) {
1989  mlog_errno(-ENOMEM);
1990  kfree(dlm);
1991  dlm = NULL;
1992  goto leave;
1993  }
1994 
1995  dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1996  if (!dlm->lockres_hash) {
1997  mlog_errno(-ENOMEM);
1998  kfree(dlm->name);
1999  kfree(dlm);
2000  dlm = NULL;
2001  goto leave;
2002  }
2003 
2004  for (i = 0; i < DLM_HASH_BUCKETS; i++)
2005  INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
2006 
2007  dlm->master_hash = (struct hlist_head **)
2008  dlm_alloc_pagevec(DLM_HASH_PAGES);
2009  if (!dlm->master_hash) {
2010  mlog_errno(-ENOMEM);
2011  dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
2012  kfree(dlm->name);
2013  kfree(dlm);
2014  dlm = NULL;
2015  goto leave;
2016  }
2017 
2018  for (i = 0; i < DLM_HASH_BUCKETS; i++)
2019  INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
2020 
2021  dlm->key = key;
2022  dlm->node_num = o2nm_this_node();
2023 
2024  ret = dlm_create_debugfs_subroot(dlm);
2025  if (ret < 0) {
2026  dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
2027  dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
2028  kfree(dlm->name);
2029  kfree(dlm);
2030  dlm = NULL;
2031  goto leave;
2032  }
2033 
2034  spin_lock_init(&dlm->spinlock);
2035  spin_lock_init(&dlm->master_lock);
2036  spin_lock_init(&dlm->ast_lock);
2037  spin_lock_init(&dlm->track_lock);
2038  INIT_LIST_HEAD(&dlm->list);
2039  INIT_LIST_HEAD(&dlm->dirty_list);
2040  INIT_LIST_HEAD(&dlm->reco.resources);
2041  INIT_LIST_HEAD(&dlm->reco.received);
2042  INIT_LIST_HEAD(&dlm->reco.node_data);
2043  INIT_LIST_HEAD(&dlm->purge_list);
2044  INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
2045  INIT_LIST_HEAD(&dlm->tracking_list);
2046  dlm->reco.state = 0;
2047 
2048  INIT_LIST_HEAD(&dlm->pending_asts);
2049  INIT_LIST_HEAD(&dlm->pending_basts);
2050 
2051  mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
2052  dlm->recovery_map, &(dlm->recovery_map[0]));
2053 
2054  memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
2055  memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
2056  memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
2057 
2058  dlm->dlm_thread_task = NULL;
2059  dlm->dlm_reco_thread_task = NULL;
2060  dlm->dlm_worker = NULL;
2063  init_waitqueue_head(&dlm->reco.event);
2064  init_waitqueue_head(&dlm->ast_wq);
2066  INIT_LIST_HEAD(&dlm->mle_hb_events);
2067 
2070 
2071  dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
2072  dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
2073 
2074  atomic_set(&dlm->res_tot_count, 0);
2075  atomic_set(&dlm->res_cur_count, 0);
2076  for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
2077  atomic_set(&dlm->mle_tot_count[i], 0);
2078  atomic_set(&dlm->mle_cur_count[i], 0);
2079  }
2080 
2081  spin_lock_init(&dlm->work_lock);
2082  INIT_LIST_HEAD(&dlm->work_list);
2084 
2085  kref_init(&dlm->dlm_refs);
2086  dlm->dlm_state = DLM_CTXT_NEW;
2087 
2088  INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
2089 
2090  mlog(0, "context init: refcount %u\n",
2091  atomic_read(&dlm->dlm_refs.refcount));
2092 
2093 leave:
2094  return dlm;
2095 }
2096 
2097 /*
2098  * Compare a requested locking protocol version against the current one.
2099  *
2100  * If the major numbers are different, they are incompatible.
2101  * If the current minor is greater than the request, they are incompatible.
2102  * If the current minor is less than or equal to the request, they are
2103  * compatible, and the requester should run at the current minor version.
2104  */
2105 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
2106  struct dlm_protocol_version *request)
2107 {
2108  if (existing->pv_major != request->pv_major)
2109  return 1;
2110 
2111  if (existing->pv_minor > request->pv_minor)
2112  return 1;
2113 
2114  if (existing->pv_minor < request->pv_minor)
2115  request->pv_minor = existing->pv_minor;
2116 
2117  return 0;
2118 }
2119 
2120 /*
2121  * dlm_register_domain: one-time setup per "domain".
2122  *
2123  * The filesystem passes in the requested locking version via proto.
2124  * If registration was successful, proto will contain the negotiated
2125  * locking protocol.
2126  */
2127 struct dlm_ctxt * dlm_register_domain(const char *domain,
2128  u32 key,
2129  struct dlm_protocol_version *fs_proto)
2130 {
2131  int ret;
2132  struct dlm_ctxt *dlm = NULL;
2133  struct dlm_ctxt *new_ctxt = NULL;
2134 
2135  if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
2136  ret = -ENAMETOOLONG;
2137  mlog(ML_ERROR, "domain name length too long\n");
2138  goto leave;
2139  }
2140 
2141  mlog(0, "register called for domain \"%s\"\n", domain);
2142 
2143 retry:
2144  dlm = NULL;
2145  if (signal_pending(current)) {
2146  ret = -ERESTARTSYS;
2147  mlog_errno(ret);
2148  goto leave;
2149  }
2150 
2151  spin_lock(&dlm_domain_lock);
2152 
2153  dlm = __dlm_lookup_domain(domain);
2154  if (dlm) {
2155  if (dlm->dlm_state != DLM_CTXT_JOINED) {
2156  spin_unlock(&dlm_domain_lock);
2157 
2158  mlog(0, "This ctxt is not joined yet!\n");
2159  wait_event_interruptible(dlm_domain_events,
2160  dlm_wait_on_domain_helper(
2161  domain));
2162  goto retry;
2163  }
2164 
2165  if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
2166  spin_unlock(&dlm_domain_lock);
2167  mlog(ML_ERROR,
2168  "Requested locking protocol version is not "
2169  "compatible with already registered domain "
2170  "\"%s\"\n", domain);
2171  ret = -EPROTO;
2172  goto leave;
2173  }
2174 
2175  __dlm_get(dlm);
2176  dlm->num_joins++;
2177 
2178  spin_unlock(&dlm_domain_lock);
2179 
2180  ret = 0;
2181  goto leave;
2182  }
2183 
2184  /* doesn't exist */
2185  if (!new_ctxt) {
2186  spin_unlock(&dlm_domain_lock);
2187 
2188  new_ctxt = dlm_alloc_ctxt(domain, key);
2189  if (new_ctxt)
2190  goto retry;
2191 
2192  ret = -ENOMEM;
2193  mlog_errno(ret);
2194  goto leave;
2195  }
2196 
2197  /* a little variable switch-a-roo here... */
2198  dlm = new_ctxt;
2199  new_ctxt = NULL;
2200 
2201  /* add the new domain */
2202  list_add_tail(&dlm->list, &dlm_domains);
2203  spin_unlock(&dlm_domain_lock);
2204 
2205  /*
2206  * Pass the locking protocol version into the join. If the join
2207  * succeeds, it will have the negotiated protocol set.
2208  */
2209  dlm->dlm_locking_proto = dlm_protocol;
2210  dlm->fs_locking_proto = *fs_proto;
2211 
2212  ret = dlm_join_domain(dlm);
2213  if (ret) {
2214  mlog_errno(ret);
2215  dlm_put(dlm);
2216  goto leave;
2217  }
2218 
2219  /* Tell the caller what locking protocol we negotiated */
2220  *fs_proto = dlm->fs_locking_proto;
2221 
2222  ret = 0;
2223 leave:
2224  if (new_ctxt)
2225  dlm_free_ctxt_mem(new_ctxt);
2226 
2227  if (ret < 0)
2228  dlm = ERR_PTR(ret);
2229 
2230  return dlm;
2231 }
2233 
2234 static LIST_HEAD(dlm_join_handlers);
2235 
2236 static void dlm_unregister_net_handlers(void)
2237 {
2238  o2net_unregister_handler_list(&dlm_join_handlers);
2239 }
2240 
2241 static int dlm_register_net_handlers(void)
2242 {
2243  int status = 0;
2244 
2246  sizeof(struct dlm_query_join_request),
2247  dlm_query_join_handler,
2248  NULL, NULL, &dlm_join_handlers);
2249  if (status)
2250  goto bail;
2251 
2253  sizeof(struct dlm_assert_joined),
2254  dlm_assert_joined_handler,
2255  NULL, NULL, &dlm_join_handlers);
2256  if (status)
2257  goto bail;
2258 
2260  sizeof(struct dlm_cancel_join),
2261  dlm_cancel_join_handler,
2262  NULL, NULL, &dlm_join_handlers);
2263  if (status)
2264  goto bail;
2265 
2267  sizeof(struct dlm_query_region),
2268  dlm_query_region_handler,
2269  NULL, NULL, &dlm_join_handlers);
2270 
2271  if (status)
2272  goto bail;
2273 
2275  sizeof(struct dlm_query_nodeinfo),
2276  dlm_query_nodeinfo_handler,
2277  NULL, NULL, &dlm_join_handlers);
2278 bail:
2279  if (status < 0)
2280  dlm_unregister_net_handlers();
2281 
2282  return status;
2283 }
2284 
2285 /* Domain eviction callback handling.
2286  *
2287  * The file system requires notification of node death *before* the
2288  * dlm completes it's recovery work, otherwise it may be able to
2289  * acquire locks on resources requiring recovery. Since the dlm can
2290  * evict a node from it's domain *before* heartbeat fires, a similar
2291  * mechanism is required. */
2292 
2293 /* Eviction is not expected to happen often, so a per-domain lock is
2294  * not necessary. Eviction callbacks are allowed to sleep for short
2295  * periods of time. */
2296 static DECLARE_RWSEM(dlm_callback_sem);
2297 
2299  int node_num)
2300 {
2301  struct list_head *iter;
2302  struct dlm_eviction_cb *cb;
2303 
2304  down_read(&dlm_callback_sem);
2305  list_for_each(iter, &dlm->dlm_eviction_callbacks) {
2306  cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
2307 
2308  cb->ec_func(node_num, cb->ec_data);
2309  }
2310  up_read(&dlm_callback_sem);
2311 }
2312 
2315  void *data)
2316 {
2317  INIT_LIST_HEAD(&cb->ec_item);
2318  cb->ec_func = f;
2319  cb->ec_data = data;
2320 }
2322 
2324  struct dlm_eviction_cb *cb)
2325 {
2326  down_write(&dlm_callback_sem);
2328  up_write(&dlm_callback_sem);
2329 }
2331 
2333 {
2334  down_write(&dlm_callback_sem);
2335  list_del_init(&cb->ec_item);
2336  up_write(&dlm_callback_sem);
2337 }
2339 
2340 static int __init dlm_init(void)
2341 {
2342  int status;
2343 
2345 
2346  status = dlm_init_mle_cache();
2347  if (status) {
2348  mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
2349  goto error;
2350  }
2351 
2352  status = dlm_init_master_caches();
2353  if (status) {
2354  mlog(ML_ERROR, "Could not create o2dlm_lockres and "
2355  "o2dlm_lockname slabcaches\n");
2356  goto error;
2357  }
2358 
2359  status = dlm_init_lock_cache();
2360  if (status) {
2361  mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
2362  goto error;
2363  }
2364 
2365  status = dlm_register_net_handlers();
2366  if (status) {
2367  mlog(ML_ERROR, "Unable to register network handlers\n");
2368  goto error;
2369  }
2370 
2371  status = dlm_create_debugfs_root();
2372  if (status)
2373  goto error;
2374 
2375  return 0;
2376 error:
2377  dlm_unregister_net_handlers();
2381  return -1;
2382 }
2383 
2384 static void __exit dlm_exit (void)
2385 {
2386  dlm_destroy_debugfs_root();
2387  dlm_unregister_net_handlers();
2391 }
2392 
2393 MODULE_AUTHOR("Oracle");
2394 MODULE_LICENSE("GPL");
2395 
2396 module_init(dlm_init);
2397 module_exit(dlm_exit);