Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
lock.c
Go to the documentation of this file.
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
5 **
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15  dlm_lock()
16  dlm_unlock()
17 
18  request_lock(ls, lkb)
19  convert_lock(ls, lkb)
20  unlock_lock(ls, lkb)
21  cancel_lock(ls, lkb)
22 
23  _request_lock(r, lkb)
24  _convert_lock(r, lkb)
25  _unlock_lock(r, lkb)
26  _cancel_lock(r, lkb)
27 
28  do_request(r, lkb)
29  do_convert(r, lkb)
30  do_unlock(r, lkb)
31  do_cancel(r, lkb)
32 
33  Stage 1 (lock, unlock) is mainly about checking input args and
34  splitting into one of the four main operations:
35 
36  dlm_lock = request_lock
37  dlm_lock+CONVERT = convert_lock
38  dlm_unlock = unlock_lock
39  dlm_unlock+CANCEL = cancel_lock
40 
41  Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42  provided to the next stage.
43 
44  Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45  When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47  Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48  given rsb and lkb and queues callbacks.
49 
50  For remote operations, send_xxxx() results in the corresponding do_xxxx()
51  function being executed on the remote node. The connecting send/receive
52  calls on local (L) and remote (R) nodes:
53 
54  L: send_xxxx() -> R: receive_xxxx()
55  R: do_xxxx()
56  L: receive_xxxx_reply() <- R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89  struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94 
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
101  */
102 
103 static const int __dlm_compat_matrix[8][8] = {
104  /* UN NL CR CW PR PW EX PD */
105  {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
106  {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
107  {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
108  {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
109  {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
110  {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
111  {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
112  {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
113 };
114 
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123 
124 const int dlm_lvb_operations[8][8] = {
125  /* UN NL CR CW PR PW EX PD*/
126  { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
127  { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
128  { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
129  { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
130  { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
131  { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
132  { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
133  { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
134 };
135 
136 #define modes_compat(gr, rq) \
137  __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138 
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141  return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143 
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149 
150 static const int __quecvt_compat_matrix[8][8] = {
151  /* UN NL CR CW PR PW EX PD */
152  {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
153  {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
154  {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
155  {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
156  {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
157  {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
158  {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
159  {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
160 };
161 
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164  printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165  "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166  lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167  lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168  lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169  (unsigned long long)lkb->lkb_recover_seq);
170 }
171 
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174  printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175  "rlc %d name %s\n",
178  r->res_name);
179 }
180 
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183  struct dlm_lkb *lkb;
184 
185  dlm_print_rsb(r);
186 
187  printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188  list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189  printk(KERN_ERR "rsb lookup list\n");
191  dlm_print_lkb(lkb);
192  printk(KERN_ERR "rsb grant queue:\n");
194  dlm_print_lkb(lkb);
195  printk(KERN_ERR "rsb convert queue:\n");
197  dlm_print_lkb(lkb);
198  printk(KERN_ERR "rsb wait queue:\n");
200  dlm_print_lkb(lkb);
201 }
202 
203 /* Threads cannot use the lockspace while it's being recovered */
204 
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
208 }
209 
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212  up_read(&ls->ls_in_recovery);
213 }
214 
216 {
217  return down_read_trylock(&ls->ls_in_recovery);
218 }
219 
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222  return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224 
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227  return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229 
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232  return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234 
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237  return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239 
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242  return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244 
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247  DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248  return !!r->res_nodeid;
249 }
250 
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253  return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258  return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263  if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264  (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265  return 1;
266  return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271  return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276  return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281  return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286  return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292  if (is_master_copy(lkb))
293  return;
294 
295  del_timeout(lkb);
296 
297  DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298 
299  /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300  timeout caused the cancel then return -ETIMEDOUT */
301  if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
303  rv = -ETIMEDOUT;
304  }
305 
306  if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
308  rv = -EDEADLK;
309  }
310 
311  dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313 
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316  queue_cast(r, lkb,
317  is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319 
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322  if (is_master_copy(lkb)) {
323  send_bast(r, lkb, rqmode);
324  } else {
325  dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326  }
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
333 /* This is only called to add a reference when the code already holds
334  a valid reference to the rsb, so there's no need for locking. */
335 
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338  kref_get(&r->res_ref);
339 }
340 
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343  hold_rsb(r);
344 }
345 
346 /* When all references to the rsb are gone it's transferred to
347  the tossed list for later disposal. */
348 
349 static void put_rsb(struct dlm_rsb *r)
350 {
351  struct dlm_ls *ls = r->res_ls;
352  uint32_t bucket = r->res_bucket;
353 
354  spin_lock(&ls->ls_rsbtbl[bucket].lock);
355  kref_put(&r->res_ref, toss_rsb);
356  spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358 
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361  put_rsb(r);
362 }
363 
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366  struct dlm_rsb *r1, *r2;
367  int count = 0;
368 
369  spin_lock(&ls->ls_new_rsb_spin);
370  if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371  spin_unlock(&ls->ls_new_rsb_spin);
372  return 0;
373  }
374  spin_unlock(&ls->ls_new_rsb_spin);
375 
376  r1 = dlm_allocate_rsb(ls);
377  r2 = dlm_allocate_rsb(ls);
378 
379  spin_lock(&ls->ls_new_rsb_spin);
380  if (r1) {
381  list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382  ls->ls_new_rsb_count++;
383  }
384  if (r2) {
385  list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386  ls->ls_new_rsb_count++;
387  }
388  count = ls->ls_new_rsb_count;
389  spin_unlock(&ls->ls_new_rsb_spin);
390 
391  if (!count)
392  return -ENOMEM;
393  return 0;
394 }
395 
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397  unlock any spinlocks, go back and call pre_rsb_struct again.
398  Otherwise, take an rsb off the list and return it. */
399 
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401  struct dlm_rsb **r_ret)
402 {
403  struct dlm_rsb *r;
404  int count;
405 
406  spin_lock(&ls->ls_new_rsb_spin);
407  if (list_empty(&ls->ls_new_rsb)) {
408  count = ls->ls_new_rsb_count;
409  spin_unlock(&ls->ls_new_rsb_spin);
410  log_debug(ls, "find_rsb retry %d %d %s",
411  count, dlm_config.ci_new_rsb_count, name);
412  return -EAGAIN;
413  }
414 
416  list_del(&r->res_hashchain);
417  /* Convert the empty list_head to a NULL rb_node for tree usage: */
418  memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419  ls->ls_new_rsb_count--;
420  spin_unlock(&ls->ls_new_rsb_spin);
421 
422  r->res_ls = ls;
423  r->res_length = len;
424  memcpy(r->res_name, name, len);
425  mutex_init(&r->res_mutex);
426 
427  INIT_LIST_HEAD(&r->res_lookup);
428  INIT_LIST_HEAD(&r->res_grantqueue);
429  INIT_LIST_HEAD(&r->res_convertqueue);
430  INIT_LIST_HEAD(&r->res_waitqueue);
431  INIT_LIST_HEAD(&r->res_root_list);
432  INIT_LIST_HEAD(&r->res_recover_list);
433 
434  *r_ret = r;
435  return 0;
436 }
437 
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440  char maxname[DLM_RESNAME_MAXLEN];
441 
442  memset(maxname, 0, DLM_RESNAME_MAXLEN);
443  memcpy(maxname, name, nlen);
444  return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446 
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448  struct dlm_rsb **r_ret)
449 {
450  struct rb_node *node = tree->rb_node;
451  struct dlm_rsb *r;
452  int rc;
453 
454  while (node) {
455  r = rb_entry(node, struct dlm_rsb, res_hashnode);
456  rc = rsb_cmp(r, name, len);
457  if (rc < 0)
458  node = node->rb_left;
459  else if (rc > 0)
460  node = node->rb_right;
461  else
462  goto found;
463  }
464  *r_ret = NULL;
465  return -EBADR;
466 
467  found:
468  *r_ret = r;
469  return 0;
470 }
471 
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474  struct rb_node **newn = &tree->rb_node;
475  struct rb_node *parent = NULL;
476  int rc;
477 
478  while (*newn) {
479  struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480  res_hashnode);
481 
482  parent = *newn;
483  rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484  if (rc < 0)
485  newn = &parent->rb_left;
486  else if (rc > 0)
487  newn = &parent->rb_right;
488  else {
489  log_print("rsb_insert match");
490  dlm_dump_rsb(rsb);
491  dlm_dump_rsb(cur);
492  return -EEXIST;
493  }
494  }
495 
496  rb_link_node(&rsb->res_hashnode, parent, newn);
497  rb_insert_color(&rsb->res_hashnode, tree);
498  return 0;
499 }
500 
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused. Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list. When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  * moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  * (we are the dir node for the res, but are not using the res right now,
522  * but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups. rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings. So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544 
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
547  int dir_nodeid, int from_nodeid,
548  unsigned int flags, struct dlm_rsb **r_ret)
549 {
550  struct dlm_rsb *r = NULL;
551  int our_nodeid = dlm_our_nodeid();
552  int from_local = 0;
553  int from_other = 0;
554  int from_dir = 0;
555  int create = 0;
556  int error;
557 
558  if (flags & R_RECEIVE_REQUEST) {
559  if (from_nodeid == dir_nodeid)
560  from_dir = 1;
561  else
562  from_other = 1;
563  } else if (flags & R_REQUEST) {
564  from_local = 1;
565  }
566 
567  /*
568  * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569  * from_nodeid has sent us a lock in dlm_recover_locks, believing
570  * we're the new master. Our local recovery may not have set
571  * res_master_nodeid to our_nodeid yet, so allow either. Don't
572  * create the rsb; dlm_recover_process_copy() will handle EBADR
573  * by resending.
574  *
575  * If someone sends us a request, we are the dir node, and we do
576  * not find the rsb anywhere, then recreate it. This happens if
577  * someone sends us a request after we have removed/freed an rsb
578  * from our toss list. (They sent a request instead of lookup
579  * because they are using an rsb from their toss list.)
580  */
581 
582  if (from_local || from_dir ||
583  (from_other && (dir_nodeid == our_nodeid))) {
584  create = 1;
585  }
586 
587  retry:
588  if (create) {
589  error = pre_rsb_struct(ls);
590  if (error < 0)
591  goto out;
592  }
593 
594  spin_lock(&ls->ls_rsbtbl[b].lock);
595 
596  error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597  if (error)
598  goto do_toss;
599 
600  /*
601  * rsb is active, so we can't check master_nodeid without lock_rsb.
602  */
603 
604  kref_get(&r->res_ref);
605  error = 0;
606  goto out_unlock;
607 
608 
609  do_toss:
610  error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611  if (error)
612  goto do_new;
613 
614  /*
615  * rsb found inactive (master_nodeid may be out of date unless
616  * we are the dir_nodeid or were the master) No other thread
617  * is using this rsb because it's on the toss list, so we can
618  * look at or update res_master_nodeid without lock_rsb.
619  */
620 
621  if ((r->res_master_nodeid != our_nodeid) && from_other) {
622  /* our rsb was not master, and another node (not the dir node)
623  has sent us a request */
624  log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625  from_nodeid, r->res_master_nodeid, dir_nodeid,
626  r->res_name);
627  error = -ENOTBLK;
628  goto out_unlock;
629  }
630 
631  if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632  /* don't think this should ever happen */
633  log_error(ls, "find_rsb toss from_dir %d master %d",
634  from_nodeid, r->res_master_nodeid);
635  dlm_print_rsb(r);
636  /* fix it and go on */
637  r->res_master_nodeid = our_nodeid;
638  r->res_nodeid = 0;
639  rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640  r->res_first_lkid = 0;
641  }
642 
643  if (from_local && (r->res_master_nodeid != our_nodeid)) {
644  /* Because we have held no locks on this rsb,
645  res_master_nodeid could have become stale. */
646  rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647  r->res_first_lkid = 0;
648  }
649 
650  rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651  error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652  goto out_unlock;
653 
654 
655  do_new:
656  /*
657  * rsb not found
658  */
659 
660  if (error == -EBADR && !create)
661  goto out_unlock;
662 
663  error = get_rsb_struct(ls, name, len, &r);
664  if (error == -EAGAIN) {
665  spin_unlock(&ls->ls_rsbtbl[b].lock);
666  goto retry;
667  }
668  if (error)
669  goto out_unlock;
670 
671  r->res_hash = hash;
672  r->res_bucket = b;
673  r->res_dir_nodeid = dir_nodeid;
674  kref_init(&r->res_ref);
675 
676  if (from_dir) {
677  /* want to see how often this happens */
678  log_debug(ls, "find_rsb new from_dir %d recreate %s",
679  from_nodeid, r->res_name);
680  r->res_master_nodeid = our_nodeid;
681  r->res_nodeid = 0;
682  goto out_add;
683  }
684 
685  if (from_other && (dir_nodeid != our_nodeid)) {
686  /* should never happen */
687  log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689  dlm_free_rsb(r);
690  error = -ENOTBLK;
691  goto out_unlock;
692  }
693 
694  if (from_other) {
695  log_debug(ls, "find_rsb new from_other %d dir %d %s",
696  from_nodeid, dir_nodeid, r->res_name);
697  }
698 
699  if (dir_nodeid == our_nodeid) {
700  /* When we are the dir nodeid, we can set the master
701  node immediately */
702  r->res_master_nodeid = our_nodeid;
703  r->res_nodeid = 0;
704  } else {
705  /* set_master will send_lookup to dir_nodeid */
706  r->res_master_nodeid = 0;
707  r->res_nodeid = -1;
708  }
709 
710  out_add:
711  error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712  out_unlock:
713  spin_unlock(&ls->ls_rsbtbl[b].lock);
714  out:
715  *r_ret = r;
716  return error;
717 }
718 
719 /* During recovery, other nodes can send us new MSTCPY locks (from
720  dlm_recover_locks) before we've made ourself master (in
721  dlm_recover_masters). */
722 
723 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724  uint32_t hash, uint32_t b,
725  int dir_nodeid, int from_nodeid,
726  unsigned int flags, struct dlm_rsb **r_ret)
727 {
728  struct dlm_rsb *r = NULL;
729  int our_nodeid = dlm_our_nodeid();
730  int recover = (flags & R_RECEIVE_RECOVER);
731  int error;
732 
733  retry:
734  error = pre_rsb_struct(ls);
735  if (error < 0)
736  goto out;
737 
738  spin_lock(&ls->ls_rsbtbl[b].lock);
739 
740  error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
741  if (error)
742  goto do_toss;
743 
744  /*
745  * rsb is active, so we can't check master_nodeid without lock_rsb.
746  */
747 
748  kref_get(&r->res_ref);
749  goto out_unlock;
750 
751 
752  do_toss:
753  error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
754  if (error)
755  goto do_new;
756 
757  /*
758  * rsb found inactive. No other thread is using this rsb because
759  * it's on the toss list, so we can look at or update
760  * res_master_nodeid without lock_rsb.
761  */
762 
763  if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764  /* our rsb is not master, and another node has sent us a
765  request; this should never happen */
766  log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767  from_nodeid, r->res_master_nodeid, dir_nodeid);
768  dlm_print_rsb(r);
769  error = -ENOTBLK;
770  goto out_unlock;
771  }
772 
773  if (!recover && (r->res_master_nodeid != our_nodeid) &&
774  (dir_nodeid == our_nodeid)) {
775  /* our rsb is not master, and we are dir; may as well fix it;
776  this should never happen */
777  log_error(ls, "find_rsb toss our %d master %d dir %d",
778  our_nodeid, r->res_master_nodeid, dir_nodeid);
779  dlm_print_rsb(r);
780  r->res_master_nodeid = our_nodeid;
781  r->res_nodeid = 0;
782  }
783 
784  rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785  error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
786  goto out_unlock;
787 
788 
789  do_new:
790  /*
791  * rsb not found
792  */
793 
794  error = get_rsb_struct(ls, name, len, &r);
795  if (error == -EAGAIN) {
796  spin_unlock(&ls->ls_rsbtbl[b].lock);
797  goto retry;
798  }
799  if (error)
800  goto out_unlock;
801 
802  r->res_hash = hash;
803  r->res_bucket = b;
804  r->res_dir_nodeid = dir_nodeid;
805  r->res_master_nodeid = dir_nodeid;
806  r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807  kref_init(&r->res_ref);
808 
809  error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810  out_unlock:
811  spin_unlock(&ls->ls_rsbtbl[b].lock);
812  out:
813  *r_ret = r;
814  return error;
815 }
816 
817 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818  unsigned int flags, struct dlm_rsb **r_ret)
819 {
820  uint32_t hash, b;
821  int dir_nodeid;
822 
823  if (len > DLM_RESNAME_MAXLEN)
824  return -EINVAL;
825 
826  hash = jhash(name, len, 0);
827  b = hash & (ls->ls_rsbtbl_size - 1);
828 
829  dir_nodeid = dlm_hash2nodeid(ls, hash);
830 
831  if (dlm_no_directory(ls))
832  return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833  from_nodeid, flags, r_ret);
834  else
835  return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836  from_nodeid, flags, r_ret);
837 }
838 
839 /* we have received a request and found that res_master_nodeid != our_nodeid,
840  so we need to return an error or make ourself the master */
841 
842 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
843  int from_nodeid)
844 {
845  if (dlm_no_directory(ls)) {
846  log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847  from_nodeid, r->res_master_nodeid,
848  r->res_dir_nodeid);
849  dlm_print_rsb(r);
850  return -ENOTBLK;
851  }
852 
853  if (from_nodeid != r->res_dir_nodeid) {
854  /* our rsb is not master, and another node (not the dir node)
855  has sent us a request. this is much more common when our
856  master_nodeid is zero, so limit debug to non-zero. */
857 
858  if (r->res_master_nodeid) {
859  log_debug(ls, "validate master from_other %d master %d "
860  "dir %d first %x %s", from_nodeid,
862  r->res_first_lkid, r->res_name);
863  }
864  return -ENOTBLK;
865  } else {
866  /* our rsb is not master, but the dir nodeid has sent us a
867  request; this could happen with master 0 / res_nodeid -1 */
868 
869  if (r->res_master_nodeid) {
870  log_error(ls, "validate master from_dir %d master %d "
871  "first %x %s",
872  from_nodeid, r->res_master_nodeid,
873  r->res_first_lkid, r->res_name);
874  }
875 
877  r->res_nodeid = 0;
878  return 0;
879  }
880 }
881 
882 /*
883  * We're the dir node for this res and another node wants to know the
884  * master nodeid. During normal operation (non recovery) this is only
885  * called from receive_lookup(); master lookups when the local node is
886  * the dir node are done by find_rsb().
887  *
888  * normal operation, we are the dir node for a resource
889  * . _request_lock
890  * . set_master
891  * . send_lookup
892  * . receive_lookup
893  * . dlm_master_lookup flags 0
894  *
895  * recover directory, we are rebuilding dir for all resources
896  * . dlm_recover_directory
897  * . dlm_rcom_names
898  * remote node sends back the rsb names it is master of and we are dir of
899  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900  * we either create new rsb setting remote node as master, or find existing
901  * rsb and set master to be the remote node.
902  *
903  * recover masters, we are finding the new master for resources
904  * . dlm_recover_masters
905  * . recover_master
906  * . dlm_send_rcom_lookup
907  * . receive_rcom_lookup
908  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
909  */
910 
911 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
912  unsigned int flags, int *r_nodeid, int *result)
913 {
914  struct dlm_rsb *r = NULL;
915  uint32_t hash, b;
916  int from_master = (flags & DLM_LU_RECOVER_DIR);
917  int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918  int our_nodeid = dlm_our_nodeid();
919  int dir_nodeid, error, toss_list = 0;
920 
921  if (len > DLM_RESNAME_MAXLEN)
922  return -EINVAL;
923 
924  if (from_nodeid == our_nodeid) {
925  log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
926  our_nodeid, flags);
927  return -EINVAL;
928  }
929 
930  hash = jhash(name, len, 0);
931  b = hash & (ls->ls_rsbtbl_size - 1);
932 
933  dir_nodeid = dlm_hash2nodeid(ls, hash);
934  if (dir_nodeid != our_nodeid) {
935  log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936  from_nodeid, dir_nodeid, our_nodeid, hash,
937  ls->ls_num_nodes);
938  *r_nodeid = -1;
939  return -EINVAL;
940  }
941 
942  retry:
943  error = pre_rsb_struct(ls);
944  if (error < 0)
945  return error;
946 
947  spin_lock(&ls->ls_rsbtbl[b].lock);
948  error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
949  if (!error) {
950  /* because the rsb is active, we need to lock_rsb before
951  checking/changing re_master_nodeid */
952 
953  hold_rsb(r);
954  spin_unlock(&ls->ls_rsbtbl[b].lock);
955  lock_rsb(r);
956  goto found;
957  }
958 
959  error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
960  if (error)
961  goto not_found;
962 
963  /* because the rsb is inactive (on toss list), it's not refcounted
964  and lock_rsb is not used, but is protected by the rsbtbl lock */
965 
966  toss_list = 1;
967  found:
968  if (r->res_dir_nodeid != our_nodeid) {
969  /* should not happen, but may as well fix it and carry on */
970  log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971  r->res_dir_nodeid, our_nodeid, r->res_name);
972  r->res_dir_nodeid = our_nodeid;
973  }
974 
975  if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976  /* Recovery uses this function to set a new master when
977  the previous master failed. Setting NEW_MASTER will
978  force dlm_recover_masters to call recover_master on this
979  rsb even though the res_nodeid is no longer removed. */
980 
981  r->res_master_nodeid = from_nodeid;
982  r->res_nodeid = from_nodeid;
983  rsb_set_flag(r, RSB_NEW_MASTER);
984 
985  if (toss_list) {
986  /* I don't think we should ever find it on toss list. */
987  log_error(ls, "dlm_master_lookup fix_master on toss");
988  dlm_dump_rsb(r);
989  }
990  }
991 
992  if (from_master && (r->res_master_nodeid != from_nodeid)) {
993  /* this will happen if from_nodeid became master during
994  a previous recovery cycle, and we aborted the previous
995  cycle before recovering this master value */
996 
997  log_limit(ls, "dlm_master_lookup from_master %d "
998  "master_nodeid %d res_nodeid %d first %x %s",
999  from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000  r->res_first_lkid, r->res_name);
1001 
1002  if (r->res_master_nodeid == our_nodeid) {
1003  log_error(ls, "from_master %d our_master", from_nodeid);
1004  dlm_dump_rsb(r);
1005  dlm_send_rcom_lookup_dump(r, from_nodeid);
1006  goto out_found;
1007  }
1008 
1009  r->res_master_nodeid = from_nodeid;
1010  r->res_nodeid = from_nodeid;
1011  rsb_set_flag(r, RSB_NEW_MASTER);
1012  }
1013 
1014  if (!r->res_master_nodeid) {
1015  /* this will happen if recovery happens while we're looking
1016  up the master for this rsb */
1017 
1018  log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019  from_nodeid, r->res_first_lkid, r->res_name);
1020  r->res_master_nodeid = from_nodeid;
1021  r->res_nodeid = from_nodeid;
1022  }
1023 
1024  if (!from_master && !fix_master &&
1025  (r->res_master_nodeid == from_nodeid)) {
1026  /* this can happen when the master sends remove, the dir node
1027  finds the rsb on the keep list and ignores the remove,
1028  and the former master sends a lookup */
1029 
1030  log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031  "first %x %s", from_nodeid, flags,
1032  r->res_first_lkid, r->res_name);
1033  }
1034 
1035  out_found:
1036  *r_nodeid = r->res_master_nodeid;
1037  if (result)
1038  *result = DLM_LU_MATCH;
1039 
1040  if (toss_list) {
1041  r->res_toss_time = jiffies;
1042  /* the rsb was inactive (on toss list) */
1043  spin_unlock(&ls->ls_rsbtbl[b].lock);
1044  } else {
1045  /* the rsb was active */
1046  unlock_rsb(r);
1047  put_rsb(r);
1048  }
1049  return 0;
1050 
1051  not_found:
1052  error = get_rsb_struct(ls, name, len, &r);
1053  if (error == -EAGAIN) {
1054  spin_unlock(&ls->ls_rsbtbl[b].lock);
1055  goto retry;
1056  }
1057  if (error)
1058  goto out_unlock;
1059 
1060  r->res_hash = hash;
1061  r->res_bucket = b;
1062  r->res_dir_nodeid = our_nodeid;
1063  r->res_master_nodeid = from_nodeid;
1064  r->res_nodeid = from_nodeid;
1065  kref_init(&r->res_ref);
1066  r->res_toss_time = jiffies;
1067 
1068  error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1069  if (error) {
1070  /* should never happen */
1071  dlm_free_rsb(r);
1072  spin_unlock(&ls->ls_rsbtbl[b].lock);
1073  goto retry;
1074  }
1075 
1076  if (result)
1077  *result = DLM_LU_ADD;
1078  *r_nodeid = from_nodeid;
1079  error = 0;
1080  out_unlock:
1081  spin_unlock(&ls->ls_rsbtbl[b].lock);
1082  return error;
1083 }
1084 
1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1086 {
1087  struct rb_node *n;
1088  struct dlm_rsb *r;
1089  int i;
1090 
1091  for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092  spin_lock(&ls->ls_rsbtbl[i].lock);
1093  for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094  r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095  if (r->res_hash == hash)
1096  dlm_dump_rsb(r);
1097  }
1098  spin_unlock(&ls->ls_rsbtbl[i].lock);
1099  }
1100 }
1101 
1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1103 {
1104  struct dlm_rsb *r = NULL;
1105  uint32_t hash, b;
1106  int error;
1107 
1108  hash = jhash(name, len, 0);
1109  b = hash & (ls->ls_rsbtbl_size - 1);
1110 
1111  spin_lock(&ls->ls_rsbtbl[b].lock);
1112  error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113  if (!error)
1114  goto out_dump;
1115 
1116  error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117  if (error)
1118  goto out;
1119  out_dump:
1120  dlm_dump_rsb(r);
1121  out:
1122  spin_unlock(&ls->ls_rsbtbl[b].lock);
1123 }
1124 
1125 static void toss_rsb(struct kref *kref)
1126 {
1127  struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128  struct dlm_ls *ls = r->res_ls;
1129 
1130  DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131  kref_init(&r->res_ref);
1132  rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133  rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134  r->res_toss_time = jiffies;
1135  if (r->res_lvbptr) {
1137  r->res_lvbptr = NULL;
1138  }
1139 }
1140 
1141 /* See comment for unhold_lkb */
1142 
1143 static void unhold_rsb(struct dlm_rsb *r)
1144 {
1145  int rv;
1146  rv = kref_put(&r->res_ref, toss_rsb);
1147  DLM_ASSERT(!rv, dlm_dump_rsb(r););
1148 }
1149 
1150 static void kill_rsb(struct kref *kref)
1151 {
1152  struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1153 
1154  /* All work is done after the return from kref_put() so we
1155  can release the write_lock before the remove and free. */
1156 
1157  DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1158  DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1159  DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1160  DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1161  DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1162  DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1163 }
1164 
1165 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1166  The rsb must exist as long as any lkb's for it do. */
1167 
1168 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1169 {
1170  hold_rsb(r);
1171  lkb->lkb_resource = r;
1172 }
1173 
1174 static void detach_lkb(struct dlm_lkb *lkb)
1175 {
1176  if (lkb->lkb_resource) {
1177  put_rsb(lkb->lkb_resource);
1178  lkb->lkb_resource = NULL;
1179  }
1180 }
1181 
1182 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1183 {
1184  struct dlm_lkb *lkb;
1185  int rv, id;
1186 
1187  lkb = dlm_allocate_lkb(ls);
1188  if (!lkb)
1189  return -ENOMEM;
1190 
1191  lkb->lkb_nodeid = -1;
1192  lkb->lkb_grmode = DLM_LOCK_IV;
1193  kref_init(&lkb->lkb_ref);
1194  INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1195  INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1196  INIT_LIST_HEAD(&lkb->lkb_time_list);
1197  INIT_LIST_HEAD(&lkb->lkb_cb_list);
1198  mutex_init(&lkb->lkb_cb_mutex);
1200 
1201  retry:
1202  rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
1203  if (!rv)
1204  return -ENOMEM;
1205 
1206  spin_lock(&ls->ls_lkbidr_spin);
1207  rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
1208  if (!rv)
1209  lkb->lkb_id = id;
1210  spin_unlock(&ls->ls_lkbidr_spin);
1211 
1212  if (rv == -EAGAIN)
1213  goto retry;
1214 
1215  if (rv < 0) {
1216  log_error(ls, "create_lkb idr error %d", rv);
1217  return rv;
1218  }
1219 
1220  *lkb_ret = lkb;
1221  return 0;
1222 }
1223 
1224 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1225 {
1226  struct dlm_lkb *lkb;
1227 
1228  spin_lock(&ls->ls_lkbidr_spin);
1229  lkb = idr_find(&ls->ls_lkbidr, lkid);
1230  if (lkb)
1231  kref_get(&lkb->lkb_ref);
1232  spin_unlock(&ls->ls_lkbidr_spin);
1233 
1234  *lkb_ret = lkb;
1235  return lkb ? 0 : -ENOENT;
1236 }
1237 
1238 static void kill_lkb(struct kref *kref)
1239 {
1240  struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1241 
1242  /* All work is done after the return from kref_put() so we
1243  can release the write_lock before the detach_lkb */
1244 
1245  DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1246 }
1247 
1248 /* __put_lkb() is used when an lkb may not have an rsb attached to
1249  it so we need to provide the lockspace explicitly */
1250 
1251 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1252 {
1253  uint32_t lkid = lkb->lkb_id;
1254 
1255  spin_lock(&ls->ls_lkbidr_spin);
1256  if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1257  idr_remove(&ls->ls_lkbidr, lkid);
1258  spin_unlock(&ls->ls_lkbidr_spin);
1259 
1260  detach_lkb(lkb);
1261 
1262  /* for local/process lkbs, lvbptr points to caller's lksb */
1263  if (lkb->lkb_lvbptr && is_master_copy(lkb))
1264  dlm_free_lvb(lkb->lkb_lvbptr);
1265  dlm_free_lkb(lkb);
1266  return 1;
1267  } else {
1268  spin_unlock(&ls->ls_lkbidr_spin);
1269  return 0;
1270  }
1271 }
1272 
1273 int dlm_put_lkb(struct dlm_lkb *lkb)
1274 {
1275  struct dlm_ls *ls;
1276 
1277  DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1278  DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1279 
1280  ls = lkb->lkb_resource->res_ls;
1281  return __put_lkb(ls, lkb);
1282 }
1283 
1284 /* This is only called to add a reference when the code already holds
1285  a valid reference to the lkb, so there's no need for locking. */
1286 
1287 static inline void hold_lkb(struct dlm_lkb *lkb)
1288 {
1289  kref_get(&lkb->lkb_ref);
1290 }
1291 
1292 /* This is called when we need to remove a reference and are certain
1293  it's not the last ref. e.g. del_lkb is always called between a
1294  find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1295  put_lkb would work fine, but would involve unnecessary locking */
1296 
1297 static inline void unhold_lkb(struct dlm_lkb *lkb)
1298 {
1299  int rv;
1300  rv = kref_put(&lkb->lkb_ref, kill_lkb);
1301  DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1302 }
1303 
1304 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1305  int mode)
1306 {
1307  struct dlm_lkb *lkb = NULL;
1308 
1310  if (lkb->lkb_rqmode < mode)
1311  break;
1312 
1313  __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1314 }
1315 
1316 /* add/remove lkb to rsb's grant/convert/wait queue */
1317 
1318 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1319 {
1320  kref_get(&lkb->lkb_ref);
1321 
1322  DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1323 
1324  lkb->lkb_timestamp = ktime_get();
1325 
1326  lkb->lkb_status = status;
1327 
1328  switch (status) {
1329  case DLM_LKSTS_WAITING:
1330  if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1331  list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1332  else
1333  list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1334  break;
1335  case DLM_LKSTS_GRANTED:
1336  /* convention says granted locks kept in order of grmode */
1337  lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1338  lkb->lkb_grmode);
1339  break;
1340  case DLM_LKSTS_CONVERT:
1341  if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1342  list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1343  else
1344  list_add_tail(&lkb->lkb_statequeue,
1345  &r->res_convertqueue);
1346  break;
1347  default:
1348  DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1349  }
1350 }
1351 
1352 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1353 {
1354  lkb->lkb_status = 0;
1355  list_del(&lkb->lkb_statequeue);
1356  unhold_lkb(lkb);
1357 }
1358 
1359 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1360 {
1361  hold_lkb(lkb);
1362  del_lkb(r, lkb);
1363  add_lkb(r, lkb, sts);
1364  unhold_lkb(lkb);
1365 }
1366 
1367 static int msg_reply_type(int mstype)
1368 {
1369  switch (mstype) {
1370  case DLM_MSG_REQUEST:
1371  return DLM_MSG_REQUEST_REPLY;
1372  case DLM_MSG_CONVERT:
1373  return DLM_MSG_CONVERT_REPLY;
1374  case DLM_MSG_UNLOCK:
1375  return DLM_MSG_UNLOCK_REPLY;
1376  case DLM_MSG_CANCEL:
1377  return DLM_MSG_CANCEL_REPLY;
1378  case DLM_MSG_LOOKUP:
1379  return DLM_MSG_LOOKUP_REPLY;
1380  }
1381  return -1;
1382 }
1383 
1384 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1385 {
1386  int i;
1387 
1388  for (i = 0; i < num_nodes; i++) {
1389  if (!warned[i]) {
1390  warned[i] = nodeid;
1391  return 0;
1392  }
1393  if (warned[i] == nodeid)
1394  return 1;
1395  }
1396  return 0;
1397 }
1398 
1399 void dlm_scan_waiters(struct dlm_ls *ls)
1400 {
1401  struct dlm_lkb *lkb;
1402  ktime_t zero = ktime_set(0, 0);
1403  s64 us;
1404  s64 debug_maxus = 0;
1405  u32 debug_scanned = 0;
1406  u32 debug_expired = 0;
1407  int num_nodes = 0;
1408  int *warned = NULL;
1409 
1410  if (!dlm_config.ci_waitwarn_us)
1411  return;
1412 
1414 
1416  if (ktime_equal(lkb->lkb_wait_time, zero))
1417  continue;
1418 
1419  debug_scanned++;
1420 
1421  us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1422 
1423  if (us < dlm_config.ci_waitwarn_us)
1424  continue;
1425 
1426  lkb->lkb_wait_time = zero;
1427 
1428  debug_expired++;
1429  if (us > debug_maxus)
1430  debug_maxus = us;
1431 
1432  if (!num_nodes) {
1433  num_nodes = ls->ls_num_nodes;
1434  warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1435  }
1436  if (!warned)
1437  continue;
1438  if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1439  continue;
1440 
1441  log_error(ls, "waitwarn %x %lld %d us check connection to "
1442  "node %d", lkb->lkb_id, (long long)us,
1443  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1444  }
1446  kfree(warned);
1447 
1448  if (debug_expired)
1449  log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1450  debug_scanned, debug_expired,
1451  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1452 }
1453 
1454 /* add/remove lkb from global waiters list of lkb's waiting for
1455  a reply from a remote node */
1456 
1457 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1458 {
1459  struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1460  int error = 0;
1461 
1463 
1464  if (is_overlap_unlock(lkb) ||
1465  (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1466  error = -EINVAL;
1467  goto out;
1468  }
1469 
1470  if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1471  switch (mstype) {
1472  case DLM_MSG_UNLOCK:
1474  break;
1475  case DLM_MSG_CANCEL:
1477  break;
1478  default:
1479  error = -EBUSY;
1480  goto out;
1481  }
1482  lkb->lkb_wait_count++;
1483  hold_lkb(lkb);
1484 
1485  log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1486  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1487  lkb->lkb_wait_count, lkb->lkb_flags);
1488  goto out;
1489  }
1490 
1491  DLM_ASSERT(!lkb->lkb_wait_count,
1492  dlm_print_lkb(lkb);
1493  printk("wait_count %d\n", lkb->lkb_wait_count););
1494 
1495  lkb->lkb_wait_count++;
1496  lkb->lkb_wait_type = mstype;
1497  lkb->lkb_wait_time = ktime_get();
1498  lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1499  hold_lkb(lkb);
1500  list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1501  out:
1502  if (error)
1503  log_error(ls, "addwait error %x %d flags %x %d %d %s",
1504  lkb->lkb_id, error, lkb->lkb_flags, mstype,
1505  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1507  return error;
1508 }
1509 
1510 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1511  list as part of process_requestqueue (e.g. a lookup that has an optimized
1512  request reply on the requestqueue) between dlm_recover_waiters_pre() which
1513  set RESEND and dlm_recover_waiters_post() */
1514 
1515 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1516  struct dlm_message *ms)
1517 {
1518  struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1519  int overlap_done = 0;
1520 
1521  if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1522  log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1524  overlap_done = 1;
1525  goto out_del;
1526  }
1527 
1528  if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1529  log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1531  overlap_done = 1;
1532  goto out_del;
1533  }
1534 
1535  /* Cancel state was preemptively cleared by a successful convert,
1536  see next comment, nothing to do. */
1537 
1538  if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1539  (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1540  log_debug(ls, "remwait %x cancel_reply wait_type %d",
1541  lkb->lkb_id, lkb->lkb_wait_type);
1542  return -1;
1543  }
1544 
1545  /* Remove for the convert reply, and premptively remove for the
1546  cancel reply. A convert has been granted while there's still
1547  an outstanding cancel on it (the cancel is moot and the result
1548  in the cancel reply should be 0). We preempt the cancel reply
1549  because the app gets the convert result and then can follow up
1550  with another op, like convert. This subsequent op would see the
1551  lingering state of the cancel and fail with -EBUSY. */
1552 
1553  if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1554  (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1555  is_overlap_cancel(lkb) && ms && !ms->m_result) {
1556  log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1557  lkb->lkb_id);
1558  lkb->lkb_wait_type = 0;
1560  lkb->lkb_wait_count--;
1561  goto out_del;
1562  }
1563 
1564  /* N.B. type of reply may not always correspond to type of original
1565  msg due to lookup->request optimization, verify others? */
1566 
1567  if (lkb->lkb_wait_type) {
1568  lkb->lkb_wait_type = 0;
1569  goto out_del;
1570  }
1571 
1572  log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1573  lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1574  mstype, lkb->lkb_flags);
1575  return -1;
1576 
1577  out_del:
1578  /* the force-unlock/cancel has completed and we haven't recvd a reply
1579  to the op that was in progress prior to the unlock/cancel; we
1580  give up on any reply to the earlier op. FIXME: not sure when/how
1581  this would happen */
1582 
1583  if (overlap_done && lkb->lkb_wait_type) {
1584  log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1585  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1586  lkb->lkb_wait_count--;
1587  lkb->lkb_wait_type = 0;
1588  }
1589 
1591 
1592  lkb->lkb_flags &= ~DLM_IFL_RESEND;
1593  lkb->lkb_wait_count--;
1594  if (!lkb->lkb_wait_count)
1595  list_del_init(&lkb->lkb_wait_reply);
1596  unhold_lkb(lkb);
1597  return 0;
1598 }
1599 
1600 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1601 {
1602  struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1603  int error;
1604 
1606  error = _remove_from_waiters(lkb, mstype, NULL);
1608  return error;
1609 }
1610 
1611 /* Handles situations where we might be processing a "fake" or "stub" reply in
1612  which we can't try to take waiters_mutex again. */
1613 
1614 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1615 {
1616  struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1617  int error;
1618 
1619  if (ms->m_flags != DLM_IFL_STUB_MS)
1621  error = _remove_from_waiters(lkb, ms->m_type, ms);
1622  if (ms->m_flags != DLM_IFL_STUB_MS)
1624  return error;
1625 }
1626 
1627 /* If there's an rsb for the same resource being removed, ensure
1628  that the remove message is sent before the new lookup message.
1629  It should be rare to need a delay here, but if not, then it may
1630  be worthwhile to add a proper wait mechanism rather than a delay. */
1631 
1632 static void wait_pending_remove(struct dlm_rsb *r)
1633 {
1634  struct dlm_ls *ls = r->res_ls;
1635  restart:
1636  spin_lock(&ls->ls_remove_spin);
1637  if (ls->ls_remove_len &&
1638  !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639  log_debug(ls, "delay lookup for remove dir %d %s",
1640  r->res_dir_nodeid, r->res_name);
1641  spin_unlock(&ls->ls_remove_spin);
1642  msleep(1);
1643  goto restart;
1644  }
1645  spin_unlock(&ls->ls_remove_spin);
1646 }
1647 
1648 /*
1649  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650  * read by other threads in wait_pending_remove. ls_remove_names
1651  * and ls_remove_lens are only used by the scan thread, so they do
1652  * not need protection.
1653  */
1654 
1655 static void shrink_bucket(struct dlm_ls *ls, int b)
1656 {
1657  struct rb_node *n, *next;
1658  struct dlm_rsb *r;
1659  char *name;
1660  int our_nodeid = dlm_our_nodeid();
1661  int remote_count = 0;
1662  int i, len, rv;
1663 
1664  memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1665 
1666  spin_lock(&ls->ls_rsbtbl[b].lock);
1667  for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1668  next = rb_next(n);
1669  r = rb_entry(n, struct dlm_rsb, res_hashnode);
1670 
1671  /* If we're the directory record for this rsb, and
1672  we're not the master of it, then we need to wait
1673  for the master node to send us a dir remove for
1674  before removing the dir record. */
1675 
1676  if (!dlm_no_directory(ls) &&
1677  (r->res_master_nodeid != our_nodeid) &&
1678  (dlm_dir_nodeid(r) == our_nodeid)) {
1679  continue;
1680  }
1681 
1682  if (!time_after_eq(jiffies, r->res_toss_time +
1683  dlm_config.ci_toss_secs * HZ)) {
1684  continue;
1685  }
1686 
1687  if (!dlm_no_directory(ls) &&
1688  (r->res_master_nodeid == our_nodeid) &&
1689  (dlm_dir_nodeid(r) != our_nodeid)) {
1690 
1691  /* We're the master of this rsb but we're not
1692  the directory record, so we need to tell the
1693  dir node to remove the dir record. */
1694 
1695  ls->ls_remove_lens[remote_count] = r->res_length;
1696  memcpy(ls->ls_remove_names[remote_count], r->res_name,
1698  remote_count++;
1699 
1700  if (remote_count >= DLM_REMOVE_NAMES_MAX)
1701  break;
1702  continue;
1703  }
1704 
1705  if (!kref_put(&r->res_ref, kill_rsb)) {
1706  log_error(ls, "tossed rsb in use %s", r->res_name);
1707  continue;
1708  }
1709 
1710  rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1711  dlm_free_rsb(r);
1712  }
1713  spin_unlock(&ls->ls_rsbtbl[b].lock);
1714 
1715  /*
1716  * While searching for rsb's to free, we found some that require
1717  * remote removal. We leave them in place and find them again here
1718  * so there is a very small gap between removing them from the toss
1719  * list and sending the removal. Keeping this gap small is
1720  * important to keep us (the master node) from being out of sync
1721  * with the remote dir node for very long.
1722  *
1723  * From the time the rsb is removed from toss until just after
1724  * send_remove, the rsb name is saved in ls_remove_name. A new
1725  * lookup checks this to ensure that a new lookup message for the
1726  * same resource name is not sent just before the remove message.
1727  */
1728 
1729  for (i = 0; i < remote_count; i++) {
1730  name = ls->ls_remove_names[i];
1731  len = ls->ls_remove_lens[i];
1732 
1733  spin_lock(&ls->ls_rsbtbl[b].lock);
1734  rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1735  if (rv) {
1736  spin_unlock(&ls->ls_rsbtbl[b].lock);
1737  log_debug(ls, "remove_name not toss %s", name);
1738  continue;
1739  }
1740 
1741  if (r->res_master_nodeid != our_nodeid) {
1742  spin_unlock(&ls->ls_rsbtbl[b].lock);
1743  log_debug(ls, "remove_name master %d dir %d our %d %s",
1745  our_nodeid, name);
1746  continue;
1747  }
1748 
1749  if (r->res_dir_nodeid == our_nodeid) {
1750  /* should never happen */
1751  spin_unlock(&ls->ls_rsbtbl[b].lock);
1752  log_error(ls, "remove_name dir %d master %d our %d %s",
1754  our_nodeid, name);
1755  continue;
1756  }
1757 
1758  if (!time_after_eq(jiffies, r->res_toss_time +
1759  dlm_config.ci_toss_secs * HZ)) {
1760  spin_unlock(&ls->ls_rsbtbl[b].lock);
1761  log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762  r->res_toss_time, jiffies, name);
1763  continue;
1764  }
1765 
1766  if (!kref_put(&r->res_ref, kill_rsb)) {
1767  spin_unlock(&ls->ls_rsbtbl[b].lock);
1768  log_error(ls, "remove_name in use %s", name);
1769  continue;
1770  }
1771 
1772  rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1773 
1774  /* block lookup of same name until we've sent remove */
1775  spin_lock(&ls->ls_remove_spin);
1776  ls->ls_remove_len = len;
1778  spin_unlock(&ls->ls_remove_spin);
1779  spin_unlock(&ls->ls_rsbtbl[b].lock);
1780 
1781  send_remove(r);
1782 
1783  /* allow lookup of name again */
1784  spin_lock(&ls->ls_remove_spin);
1785  ls->ls_remove_len = 0;
1787  spin_unlock(&ls->ls_remove_spin);
1788 
1789  dlm_free_rsb(r);
1790  }
1791 }
1792 
1793 void dlm_scan_rsbs(struct dlm_ls *ls)
1794 {
1795  int i;
1796 
1797  for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1798  shrink_bucket(ls, i);
1799  if (dlm_locking_stopped(ls))
1800  break;
1801  cond_resched();
1802  }
1803 }
1804 
1805 static void add_timeout(struct dlm_lkb *lkb)
1806 {
1807  struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1808 
1809  if (is_master_copy(lkb))
1810  return;
1811 
1812  if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1813  !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1815  goto add_it;
1816  }
1817  if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1818  goto add_it;
1819  return;
1820 
1821  add_it:
1822  DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1824  hold_lkb(lkb);
1825  list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1827 }
1828 
1829 static void del_timeout(struct dlm_lkb *lkb)
1830 {
1831  struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1832 
1834  if (!list_empty(&lkb->lkb_time_list)) {
1835  list_del_init(&lkb->lkb_time_list);
1836  unhold_lkb(lkb);
1837  }
1839 }
1840 
1841 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1842  lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1843  and then lock rsb because of lock ordering in add_timeout. We may need
1844  to specify some special timeout-related bits in the lkb that are just to
1845  be accessed under the timeout_mutex. */
1846 
1847 void dlm_scan_timeout(struct dlm_ls *ls)
1848 {
1849  struct dlm_rsb *r;
1850  struct dlm_lkb *lkb;
1851  int do_cancel, do_warn;
1852  s64 wait_us;
1853 
1854  for (;;) {
1855  if (dlm_locking_stopped(ls))
1856  break;
1857 
1858  do_cancel = 0;
1859  do_warn = 0;
1862 
1863  wait_us = ktime_to_us(ktime_sub(ktime_get(),
1864  lkb->lkb_timestamp));
1865 
1866  if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1867  wait_us >= (lkb->lkb_timeout_cs * 10000))
1868  do_cancel = 1;
1869 
1870  if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1871  wait_us >= dlm_config.ci_timewarn_cs * 10000)
1872  do_warn = 1;
1873 
1874  if (!do_cancel && !do_warn)
1875  continue;
1876  hold_lkb(lkb);
1877  break;
1878  }
1880 
1881  if (!do_cancel && !do_warn)
1882  break;
1883 
1884  r = lkb->lkb_resource;
1885  hold_rsb(r);
1886  lock_rsb(r);
1887 
1888  if (do_warn) {
1889  /* clear flag so we only warn once */
1891  if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1892  del_timeout(lkb);
1893  dlm_timeout_warn(lkb);
1894  }
1895 
1896  if (do_cancel) {
1897  log_debug(ls, "timeout cancel %x node %d %s",
1898  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1901  del_timeout(lkb);
1902  _cancel_lock(r, lkb);
1903  }
1904 
1905  unlock_rsb(r);
1906  unhold_rsb(r);
1907  dlm_put_lkb(lkb);
1908  }
1909 }
1910 
1911 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1912  dlm_recoverd before checking/setting ls_recover_begin. */
1913 
1914 void dlm_adjust_timeouts(struct dlm_ls *ls)
1915 {
1916  struct dlm_lkb *lkb;
1917  u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1918 
1919  ls->ls_recover_begin = 0;
1922  lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1924 
1925  if (!dlm_config.ci_waitwarn_us)
1926  return;
1927 
1930  if (ktime_to_us(lkb->lkb_wait_time))
1931  lkb->lkb_wait_time = ktime_get();
1932  }
1934 }
1935 
1936 /* lkb is master or local copy */
1937 
1938 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1939 {
1940  int b, len = r->res_ls->ls_lvblen;
1941 
1942  /* b=1 lvb returned to caller
1943  b=0 lvb written to rsb or invalidated
1944  b=-1 do nothing */
1945 
1946  b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1947 
1948  if (b == 1) {
1949  if (!lkb->lkb_lvbptr)
1950  return;
1951 
1952  if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1953  return;
1954 
1955  if (!r->res_lvbptr)
1956  return;
1957 
1958  memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1959  lkb->lkb_lvbseq = r->res_lvbseq;
1960 
1961  } else if (b == 0) {
1962  if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1963  rsb_set_flag(r, RSB_VALNOTVALID);
1964  return;
1965  }
1966 
1967  if (!lkb->lkb_lvbptr)
1968  return;
1969 
1970  if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1971  return;
1972 
1973  if (!r->res_lvbptr)
1975 
1976  if (!r->res_lvbptr)
1977  return;
1978 
1979  memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1980  r->res_lvbseq++;
1981  lkb->lkb_lvbseq = r->res_lvbseq;
1982  rsb_clear_flag(r, RSB_VALNOTVALID);
1983  }
1984 
1985  if (rsb_flag(r, RSB_VALNOTVALID))
1987 }
1988 
1989 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1990 {
1991  if (lkb->lkb_grmode < DLM_LOCK_PW)
1992  return;
1993 
1994  if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1995  rsb_set_flag(r, RSB_VALNOTVALID);
1996  return;
1997  }
1998 
1999  if (!lkb->lkb_lvbptr)
2000  return;
2001 
2002  if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2003  return;
2004 
2005  if (!r->res_lvbptr)
2007 
2008  if (!r->res_lvbptr)
2009  return;
2010 
2011  memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2012  r->res_lvbseq++;
2013  rsb_clear_flag(r, RSB_VALNOTVALID);
2014 }
2015 
2016 /* lkb is process copy (pc) */
2017 
2018 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2019  struct dlm_message *ms)
2020 {
2021  int b;
2022 
2023  if (!lkb->lkb_lvbptr)
2024  return;
2025 
2026  if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2027  return;
2028 
2029  b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2030  if (b == 1) {
2031  int len = receive_extralen(ms);
2032  if (len > DLM_RESNAME_MAXLEN)
2033  len = DLM_RESNAME_MAXLEN;
2034  memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2035  lkb->lkb_lvbseq = ms->m_lvbseq;
2036  }
2037 }
2038 
2039 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2040  remove_lock -- used for unlock, removes lkb from granted
2041  revert_lock -- used for cancel, moves lkb from convert to granted
2042  grant_lock -- used for request and convert, adds lkb to granted or
2043  moves lkb from convert or waiting to granted
2044 
2045  Each of these is used for master or local copy lkb's. There is
2046  also a _pc() variation used to make the corresponding change on
2047  a process copy (pc) lkb. */
2048 
2049 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050 {
2051  del_lkb(r, lkb);
2052  lkb->lkb_grmode = DLM_LOCK_IV;
2053  /* this unhold undoes the original ref from create_lkb()
2054  so this leads to the lkb being freed */
2055  unhold_lkb(lkb);
2056 }
2057 
2058 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060  set_lvb_unlock(r, lkb);
2061  _remove_lock(r, lkb);
2062 }
2063 
2064 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2065 {
2066  _remove_lock(r, lkb);
2067 }
2068 
2069 /* returns: 0 did nothing
2070  1 moved lock to granted
2071  -1 removed lock */
2072 
2073 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2074 {
2075  int rv = 0;
2076 
2077  lkb->lkb_rqmode = DLM_LOCK_IV;
2078 
2079  switch (lkb->lkb_status) {
2080  case DLM_LKSTS_GRANTED:
2081  break;
2082  case DLM_LKSTS_CONVERT:
2083  move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2084  rv = 1;
2085  break;
2086  case DLM_LKSTS_WAITING:
2087  del_lkb(r, lkb);
2088  lkb->lkb_grmode = DLM_LOCK_IV;
2089  /* this unhold undoes the original ref from create_lkb()
2090  so this leads to the lkb being freed */
2091  unhold_lkb(lkb);
2092  rv = -1;
2093  break;
2094  default:
2095  log_print("invalid status for revert %d", lkb->lkb_status);
2096  }
2097  return rv;
2098 }
2099 
2100 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2101 {
2102  return revert_lock(r, lkb);
2103 }
2104 
2105 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106 {
2107  if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2108  lkb->lkb_grmode = lkb->lkb_rqmode;
2109  if (lkb->lkb_status)
2110  move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2111  else
2112  add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2113  }
2114 
2115  lkb->lkb_rqmode = DLM_LOCK_IV;
2116  lkb->lkb_highbast = 0;
2117 }
2118 
2119 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2120 {
2121  set_lvb_lock(r, lkb);
2122  _grant_lock(r, lkb);
2123 }
2124 
2125 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2126  struct dlm_message *ms)
2127 {
2128  set_lvb_lock_pc(r, lkb, ms);
2129  _grant_lock(r, lkb);
2130 }
2131 
2132 /* called by grant_pending_locks() which means an async grant message must
2133  be sent to the requesting node in addition to granting the lock if the
2134  lkb belongs to a remote node. */
2135 
2136 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2137 {
2138  grant_lock(r, lkb);
2139  if (is_master_copy(lkb))
2140  send_grant(r, lkb);
2141  else
2142  queue_cast(r, lkb, 0);
2143 }
2144 
2145 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2146  change the granted/requested modes. We're munging things accordingly in
2147  the process copy.
2148  CONVDEADLK: our grmode may have been forced down to NL to resolve a
2149  conversion deadlock
2150  ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2151  compatible with other granted locks */
2152 
2153 static void munge_demoted(struct dlm_lkb *lkb)
2154 {
2155  if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2156  log_print("munge_demoted %x invalid modes gr %d rq %d",
2157  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2158  return;
2159  }
2160 
2161  lkb->lkb_grmode = DLM_LOCK_NL;
2162 }
2163 
2164 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2165 {
2166  if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2167  ms->m_type != DLM_MSG_GRANT) {
2168  log_print("munge_altmode %x invalid reply type %d",
2169  lkb->lkb_id, ms->m_type);
2170  return;
2171  }
2172 
2173  if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2174  lkb->lkb_rqmode = DLM_LOCK_PR;
2175  else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2176  lkb->lkb_rqmode = DLM_LOCK_CW;
2177  else {
2178  log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2179  dlm_print_lkb(lkb);
2180  }
2181 }
2182 
2183 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2184 {
2185  struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2186  lkb_statequeue);
2187  if (lkb->lkb_id == first->lkb_id)
2188  return 1;
2189 
2190  return 0;
2191 }
2192 
2193 /* Check if the given lkb conflicts with another lkb on the queue. */
2194 
2195 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2196 {
2197  struct dlm_lkb *this;
2198 
2199  list_for_each_entry(this, head, lkb_statequeue) {
2200  if (this == lkb)
2201  continue;
2202  if (!modes_compat(this, lkb))
2203  return 1;
2204  }
2205  return 0;
2206 }
2207 
2208 /*
2209  * "A conversion deadlock arises with a pair of lock requests in the converting
2210  * queue for one resource. The granted mode of each lock blocks the requested
2211  * mode of the other lock."
2212  *
2213  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2214  * convert queue from being granted, then deadlk/demote lkb.
2215  *
2216  * Example:
2217  * Granted Queue: empty
2218  * Convert Queue: NL->EX (first lock)
2219  * PR->EX (second lock)
2220  *
2221  * The first lock can't be granted because of the granted mode of the second
2222  * lock and the second lock can't be granted because it's not first in the
2223  * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2224  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2225  * flag set and return DEMOTED in the lksb flags.
2226  *
2227  * Originally, this function detected conv-deadlk in a more limited scope:
2228  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2229  * - if lkb1 was the first entry in the queue (not just earlier), and was
2230  * blocked by the granted mode of lkb2, and there was nothing on the
2231  * granted queue preventing lkb1 from being granted immediately, i.e.
2232  * lkb2 was the only thing preventing lkb1 from being granted.
2233  *
2234  * That second condition meant we'd only say there was conv-deadlk if
2235  * resolving it (by demotion) would lead to the first lock on the convert
2236  * queue being granted right away. It allowed conversion deadlocks to exist
2237  * between locks on the convert queue while they couldn't be granted anyway.
2238  *
2239  * Now, we detect and take action on conversion deadlocks immediately when
2240  * they're created, even if they may not be immediately consequential. If
2241  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2242  * mode that would prevent lkb1's conversion from being granted, we do a
2243  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2244  * I think this means that the lkb_is_ahead condition below should always
2245  * be zero, i.e. there will never be conv-deadlk between two locks that are
2246  * both already on the convert queue.
2247  */
2248 
2249 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2250 {
2251  struct dlm_lkb *lkb1;
2252  int lkb_is_ahead = 0;
2253 
2254  list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2255  if (lkb1 == lkb2) {
2256  lkb_is_ahead = 1;
2257  continue;
2258  }
2259 
2260  if (!lkb_is_ahead) {
2261  if (!modes_compat(lkb2, lkb1))
2262  return 1;
2263  } else {
2264  if (!modes_compat(lkb2, lkb1) &&
2265  !modes_compat(lkb1, lkb2))
2266  return 1;
2267  }
2268  }
2269  return 0;
2270 }
2271 
2272 /*
2273  * Return 1 if the lock can be granted, 0 otherwise.
2274  * Also detect and resolve conversion deadlocks.
2275  *
2276  * lkb is the lock to be granted
2277  *
2278  * now is 1 if the function is being called in the context of the
2279  * immediate request, it is 0 if called later, after the lock has been
2280  * queued.
2281  *
2282  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2283  * after recovery.
2284  *
2285  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2286  */
2287 
2288 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2289  int recover)
2290 {
2291  int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2292 
2293  /*
2294  * 6-10: Version 5.4 introduced an option to address the phenomenon of
2295  * a new request for a NL mode lock being blocked.
2296  *
2297  * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2298  * request, then it would be granted. In essence, the use of this flag
2299  * tells the Lock Manager to expedite theis request by not considering
2300  * what may be in the CONVERTING or WAITING queues... As of this
2301  * writing, the EXPEDITE flag can be used only with new requests for NL
2302  * mode locks. This flag is not valid for conversion requests.
2303  *
2304  * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2305  * conversion or used with a non-NL requested mode. We also know an
2306  * EXPEDITE request is always granted immediately, so now must always
2307  * be 1. The full condition to grant an expedite request: (now &&
2308  * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2309  * therefore be shortened to just checking the flag.
2310  */
2311 
2312  if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2313  return 1;
2314 
2315  /*
2316  * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2317  * added to the remaining conditions.
2318  */
2319 
2320  if (queue_conflict(&r->res_grantqueue, lkb))
2321  return 0;
2322 
2323  /*
2324  * 6-3: By default, a conversion request is immediately granted if the
2325  * requested mode is compatible with the modes of all other granted
2326  * locks
2327  */
2328 
2329  if (queue_conflict(&r->res_convertqueue, lkb))
2330  return 0;
2331 
2332  /*
2333  * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2334  * locks for a recovered rsb, on which lkb's have been rebuilt.
2335  * The lkb's may have been rebuilt on the queues in a different
2336  * order than they were in on the previous master. So, granting
2337  * queued conversions in order after recovery doesn't make sense
2338  * since the order hasn't been preserved anyway. The new order
2339  * could also have created a new "in place" conversion deadlock.
2340  * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2341  * After recovery, there would be no granted locks, and possibly
2342  * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2343  * recovery, grant conversions without considering order.
2344  */
2345 
2346  if (conv && recover)
2347  return 1;
2348 
2349  /*
2350  * 6-5: But the default algorithm for deciding whether to grant or
2351  * queue conversion requests does not by itself guarantee that such
2352  * requests are serviced on a "first come first serve" basis. This, in
2353  * turn, can lead to a phenomenon known as "indefinate postponement".
2354  *
2355  * 6-7: This issue is dealt with by using the optional QUECVT flag with
2356  * the system service employed to request a lock conversion. This flag
2357  * forces certain conversion requests to be queued, even if they are
2358  * compatible with the granted modes of other locks on the same
2359  * resource. Thus, the use of this flag results in conversion requests
2360  * being ordered on a "first come first servce" basis.
2361  *
2362  * DCT: This condition is all about new conversions being able to occur
2363  * "in place" while the lock remains on the granted queue (assuming
2364  * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2365  * doesn't _have_ to go onto the convert queue where it's processed in
2366  * order. The "now" variable is necessary to distinguish converts
2367  * being received and processed for the first time now, because once a
2368  * convert is moved to the conversion queue the condition below applies
2369  * requiring fifo granting.
2370  */
2371 
2372  if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2373  return 1;
2374 
2375  /*
2376  * Even if the convert is compat with all granted locks,
2377  * QUECVT forces it behind other locks on the convert queue.
2378  */
2379 
2380  if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2381  if (list_empty(&r->res_convertqueue))
2382  return 1;
2383  else
2384  return 0;
2385  }
2386 
2387  /*
2388  * The NOORDER flag is set to avoid the standard vms rules on grant
2389  * order.
2390  */
2391 
2392  if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2393  return 1;
2394 
2395  /*
2396  * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2397  * granted until all other conversion requests ahead of it are granted
2398  * and/or canceled.
2399  */
2400 
2401  if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2402  return 1;
2403 
2404  /*
2405  * 6-4: By default, a new request is immediately granted only if all
2406  * three of the following conditions are satisfied when the request is
2407  * issued:
2408  * - The queue of ungranted conversion requests for the resource is
2409  * empty.
2410  * - The queue of ungranted new requests for the resource is empty.
2411  * - The mode of the new request is compatible with the most
2412  * restrictive mode of all granted locks on the resource.
2413  */
2414 
2415  if (now && !conv && list_empty(&r->res_convertqueue) &&
2416  list_empty(&r->res_waitqueue))
2417  return 1;
2418 
2419  /*
2420  * 6-4: Once a lock request is in the queue of ungranted new requests,
2421  * it cannot be granted until the queue of ungranted conversion
2422  * requests is empty, all ungranted new requests ahead of it are
2423  * granted and/or canceled, and it is compatible with the granted mode
2424  * of the most restrictive lock granted on the resource.
2425  */
2426 
2427  if (!now && !conv && list_empty(&r->res_convertqueue) &&
2428  first_in_list(lkb, &r->res_waitqueue))
2429  return 1;
2430 
2431  return 0;
2432 }
2433 
2434 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2435  int recover, int *err)
2436 {
2437  int rv;
2438  int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2439  int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2440 
2441  if (err)
2442  *err = 0;
2443 
2444  rv = _can_be_granted(r, lkb, now, recover);
2445  if (rv)
2446  goto out;
2447 
2448  /*
2449  * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2450  * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2451  * cancels one of the locks.
2452  */
2453 
2454  if (is_convert && can_be_queued(lkb) &&
2455  conversion_deadlock_detect(r, lkb)) {
2456  if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2457  lkb->lkb_grmode = DLM_LOCK_NL;
2458  lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2459  } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2460  if (err)
2461  *err = -EDEADLK;
2462  else {
2463  log_print("can_be_granted deadlock %x now %d",
2464  lkb->lkb_id, now);
2465  dlm_dump_rsb(r);
2466  }
2467  }
2468  goto out;
2469  }
2470 
2471  /*
2472  * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2473  * to grant a request in a mode other than the normal rqmode. It's a
2474  * simple way to provide a big optimization to applications that can
2475  * use them.
2476  */
2477 
2478  if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2479  alt = DLM_LOCK_PR;
2480  else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2481  alt = DLM_LOCK_CW;
2482 
2483  if (alt) {
2484  lkb->lkb_rqmode = alt;
2485  rv = _can_be_granted(r, lkb, now, 0);
2486  if (rv)
2487  lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2488  else
2489  lkb->lkb_rqmode = rqmode;
2490  }
2491  out:
2492  return rv;
2493 }
2494 
2495 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2496  for locks pending on the convert list. Once verified (watch for these
2497  log_prints), we should be able to just call _can_be_granted() and not
2498  bother with the demote/deadlk cases here (and there's no easy way to deal
2499  with a deadlk here, we'd have to generate something like grant_lock with
2500  the deadlk error.) */
2501 
2502 /* Returns the highest requested mode of all blocked conversions; sets
2503  cw if there's a blocked conversion to DLM_LOCK_CW. */
2504 
2505 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2506  unsigned int *count)
2507 {
2508  struct dlm_lkb *lkb, *s;
2509  int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2510  int hi, demoted, quit, grant_restart, demote_restart;
2511  int deadlk;
2512 
2513  quit = 0;
2514  restart:
2515  grant_restart = 0;
2516  demote_restart = 0;
2517  hi = DLM_LOCK_IV;
2518 
2519  list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2520  demoted = is_demoted(lkb);
2521  deadlk = 0;
2522 
2523  if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2524  grant_lock_pending(r, lkb);
2525  grant_restart = 1;
2526  if (count)
2527  (*count)++;
2528  continue;
2529  }
2530 
2531  if (!demoted && is_demoted(lkb)) {
2532  log_print("WARN: pending demoted %x node %d %s",
2533  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2534  demote_restart = 1;
2535  continue;
2536  }
2537 
2538  if (deadlk) {
2539  log_print("WARN: pending deadlock %x node %d %s",
2540  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2541  dlm_dump_rsb(r);
2542  continue;
2543  }
2544 
2545  hi = max_t(int, lkb->lkb_rqmode, hi);
2546 
2547  if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2548  *cw = 1;
2549  }
2550 
2551  if (grant_restart)
2552  goto restart;
2553  if (demote_restart && !quit) {
2554  quit = 1;
2555  goto restart;
2556  }
2557 
2558  return max_t(int, high, hi);
2559 }
2560 
2561 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2562  unsigned int *count)
2563 {
2564  struct dlm_lkb *lkb, *s;
2565 
2566  list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2567  if (can_be_granted(r, lkb, 0, 0, NULL)) {
2568  grant_lock_pending(r, lkb);
2569  if (count)
2570  (*count)++;
2571  } else {
2572  high = max_t(int, lkb->lkb_rqmode, high);
2573  if (lkb->lkb_rqmode == DLM_LOCK_CW)
2574  *cw = 1;
2575  }
2576  }
2577 
2578  return high;
2579 }
2580 
2581 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2582  on either the convert or waiting queue.
2583  high is the largest rqmode of all locks blocked on the convert or
2584  waiting queue. */
2585 
2586 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2587 {
2588  if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2589  if (gr->lkb_highbast < DLM_LOCK_EX)
2590  return 1;
2591  return 0;
2592  }
2593 
2594  if (gr->lkb_highbast < high &&
2595  !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2596  return 1;
2597  return 0;
2598 }
2599 
2600 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2601 {
2602  struct dlm_lkb *lkb, *s;
2603  int high = DLM_LOCK_IV;
2604  int cw = 0;
2605 
2606  if (!is_master(r)) {
2607  log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2608  dlm_dump_rsb(r);
2609  return;
2610  }
2611 
2612  high = grant_pending_convert(r, high, &cw, count);
2613  high = grant_pending_wait(r, high, &cw, count);
2614 
2615  if (high == DLM_LOCK_IV)
2616  return;
2617 
2618  /*
2619  * If there are locks left on the wait/convert queue then send blocking
2620  * ASTs to granted locks based on the largest requested mode (high)
2621  * found above.
2622  */
2623 
2624  list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2625  if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2626  if (cw && high == DLM_LOCK_PR &&
2627  lkb->lkb_grmode == DLM_LOCK_PR)
2628  queue_bast(r, lkb, DLM_LOCK_CW);
2629  else
2630  queue_bast(r, lkb, high);
2631  lkb->lkb_highbast = high;
2632  }
2633  }
2634 }
2635 
2636 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2637 {
2638  if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2639  (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2640  if (gr->lkb_highbast < DLM_LOCK_EX)
2641  return 1;
2642  return 0;
2643  }
2644 
2645  if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2646  return 1;
2647  return 0;
2648 }
2649 
2650 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2651  struct dlm_lkb *lkb)
2652 {
2653  struct dlm_lkb *gr;
2654 
2655  list_for_each_entry(gr, head, lkb_statequeue) {
2656  /* skip self when sending basts to convertqueue */
2657  if (gr == lkb)
2658  continue;
2659  if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2660  queue_bast(r, gr, lkb->lkb_rqmode);
2661  gr->lkb_highbast = lkb->lkb_rqmode;
2662  }
2663  }
2664 }
2665 
2666 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2667 {
2668  send_bast_queue(r, &r->res_grantqueue, lkb);
2669 }
2670 
2671 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2672 {
2673  send_bast_queue(r, &r->res_grantqueue, lkb);
2674  send_bast_queue(r, &r->res_convertqueue, lkb);
2675 }
2676 
2677 /* set_master(r, lkb) -- set the master nodeid of a resource
2678 
2679  The purpose of this function is to set the nodeid field in the given
2680  lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2681  known, it can just be copied to the lkb and the function will return
2682  0. If the rsb's nodeid is _not_ known, it needs to be looked up
2683  before it can be copied to the lkb.
2684 
2685  When the rsb nodeid is being looked up remotely, the initial lkb
2686  causing the lookup is kept on the ls_waiters list waiting for the
2687  lookup reply. Other lkb's waiting for the same rsb lookup are kept
2688  on the rsb's res_lookup list until the master is verified.
2689 
2690  Return values:
2691  0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2692  1: the rsb master is not available and the lkb has been placed on
2693  a wait queue
2694 */
2695 
2696 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2697 {
2698  int our_nodeid = dlm_our_nodeid();
2699 
2700  if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2701  rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2702  r->res_first_lkid = lkb->lkb_id;
2703  lkb->lkb_nodeid = r->res_nodeid;
2704  return 0;
2705  }
2706 
2707  if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2709  return 1;
2710  }
2711 
2712  if (r->res_master_nodeid == our_nodeid) {
2713  lkb->lkb_nodeid = 0;
2714  return 0;
2715  }
2716 
2717  if (r->res_master_nodeid) {
2718  lkb->lkb_nodeid = r->res_master_nodeid;
2719  return 0;
2720  }
2721 
2722  if (dlm_dir_nodeid(r) == our_nodeid) {
2723  /* This is a somewhat unusual case; find_rsb will usually
2724  have set res_master_nodeid when dir nodeid is local, but
2725  there are cases where we become the dir node after we've
2726  past find_rsb and go through _request_lock again.
2727  confirm_master() or process_lookup_list() needs to be
2728  called after this. */
2729  log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2731  r->res_name);
2732  r->res_master_nodeid = our_nodeid;
2733  r->res_nodeid = 0;
2734  lkb->lkb_nodeid = 0;
2735  return 0;
2736  }
2737 
2738  wait_pending_remove(r);
2739 
2740  r->res_first_lkid = lkb->lkb_id;
2741  send_lookup(r, lkb);
2742  return 1;
2743 }
2744 
2745 static void process_lookup_list(struct dlm_rsb *r)
2746 {
2747  struct dlm_lkb *lkb, *safe;
2748 
2750  list_del_init(&lkb->lkb_rsb_lookup);
2751  _request_lock(r, lkb);
2752  schedule();
2753  }
2754 }
2755 
2756 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2757 
2758 static void confirm_master(struct dlm_rsb *r, int error)
2759 {
2760  struct dlm_lkb *lkb;
2761 
2762  if (!r->res_first_lkid)
2763  return;
2764 
2765  switch (error) {
2766  case 0:
2767  case -EINPROGRESS:
2768  r->res_first_lkid = 0;
2769  process_lookup_list(r);
2770  break;
2771 
2772  case -EAGAIN:
2773  case -EBADR:
2774  case -ENOTBLK:
2775  /* the remote request failed and won't be retried (it was
2776  a NOQUEUE, or has been canceled/unlocked); make a waiting
2777  lkb the first_lkid */
2778 
2779  r->res_first_lkid = 0;
2780 
2781  if (!list_empty(&r->res_lookup)) {
2782  lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2783  lkb_rsb_lookup);
2784  list_del_init(&lkb->lkb_rsb_lookup);
2785  r->res_first_lkid = lkb->lkb_id;
2786  _request_lock(r, lkb);
2787  }
2788  break;
2789 
2790  default:
2791  log_error(r->res_ls, "confirm_master unknown error %d", error);
2792  }
2793 }
2794 
2795 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2796  int namelen, unsigned long timeout_cs,
2797  void (*ast) (void *astparam),
2798  void *astparam,
2799  void (*bast) (void *astparam, int mode),
2800  struct dlm_args *args)
2801 {
2802  int rv = -EINVAL;
2803 
2804  /* check for invalid arg usage */
2805 
2806  if (mode < 0 || mode > DLM_LOCK_EX)
2807  goto out;
2808 
2809  if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2810  goto out;
2811 
2812  if (flags & DLM_LKF_CANCEL)
2813  goto out;
2814 
2815  if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2816  goto out;
2817 
2818  if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2819  goto out;
2820 
2821  if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2822  goto out;
2823 
2824  if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2825  goto out;
2826 
2827  if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2828  goto out;
2829 
2830  if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2831  goto out;
2832 
2833  if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2834  goto out;
2835 
2836  if (!ast || !lksb)
2837  goto out;
2838 
2839  if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2840  goto out;
2841 
2842  if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2843  goto out;
2844 
2845  /* these args will be copied to the lkb in validate_lock_args,
2846  it cannot be done now because when converting locks, fields in
2847  an active lkb cannot be modified before locking the rsb */
2848 
2849  args->flags = flags;
2850  args->astfn = ast;
2851  args->astparam = astparam;
2852  args->bastfn = bast;
2853  args->timeout = timeout_cs;
2854  args->mode = mode;
2855  args->lksb = lksb;
2856  rv = 0;
2857  out:
2858  return rv;
2859 }
2860 
2861 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2862 {
2863  if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2865  return -EINVAL;
2866 
2867  if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2868  return -EINVAL;
2869 
2870  args->flags = flags;
2871  args->astparam = astarg;
2872  return 0;
2873 }
2874 
2875 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2876  struct dlm_args *args)
2877 {
2878  int rv = -EINVAL;
2879 
2880  if (args->flags & DLM_LKF_CONVERT) {
2881  if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2882  goto out;
2883 
2884  if (args->flags & DLM_LKF_QUECVT &&
2885  !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2886  goto out;
2887 
2888  rv = -EBUSY;
2889  if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2890  goto out;
2891 
2892  if (lkb->lkb_wait_type)
2893  goto out;
2894 
2895  if (is_overlap(lkb))
2896  goto out;
2897  }
2898 
2899  lkb->lkb_exflags = args->flags;
2900  lkb->lkb_sbflags = 0;
2901  lkb->lkb_astfn = args->astfn;
2902  lkb->lkb_astparam = args->astparam;
2903  lkb->lkb_bastfn = args->bastfn;
2904  lkb->lkb_rqmode = args->mode;
2905  lkb->lkb_lksb = args->lksb;
2906  lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2907  lkb->lkb_ownpid = (int) current->pid;
2908  lkb->lkb_timeout_cs = args->timeout;
2909  rv = 0;
2910  out:
2911  if (rv)
2912  log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2913  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2914  lkb->lkb_status, lkb->lkb_wait_type,
2915  lkb->lkb_resource->res_name);
2916  return rv;
2917 }
2918 
2919 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2920  for success */
2921 
2922 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2923  because there may be a lookup in progress and it's valid to do
2924  cancel/unlockf on it */
2925 
2926 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2927 {
2928  struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2929  int rv = -EINVAL;
2930 
2931  if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2932  log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2933  dlm_print_lkb(lkb);
2934  goto out;
2935  }
2936 
2937  /* an lkb may still exist even though the lock is EOL'ed due to a
2938  cancel, unlock or failed noqueue request; an app can't use these
2939  locks; return same error as if the lkid had not been found at all */
2940 
2941  if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2942  log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2943  rv = -ENOENT;
2944  goto out;
2945  }
2946 
2947  /* an lkb may be waiting for an rsb lookup to complete where the
2948  lookup was initiated by another lock */
2949 
2950  if (!list_empty(&lkb->lkb_rsb_lookup)) {
2951  if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2952  log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2953  list_del_init(&lkb->lkb_rsb_lookup);
2954  queue_cast(lkb->lkb_resource, lkb,
2955  args->flags & DLM_LKF_CANCEL ?
2956  -DLM_ECANCEL : -DLM_EUNLOCK);
2957  unhold_lkb(lkb); /* undoes create_lkb() */
2958  }
2959  /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2960  rv = -EBUSY;
2961  goto out;
2962  }
2963 
2964  /* cancel not allowed with another cancel/unlock in progress */
2965 
2966  if (args->flags & DLM_LKF_CANCEL) {
2967  if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2968  goto out;
2969 
2970  if (is_overlap(lkb))
2971  goto out;
2972 
2973  /* don't let scand try to do a cancel */
2974  del_timeout(lkb);
2975 
2976  if (lkb->lkb_flags & DLM_IFL_RESEND) {
2978  rv = -EBUSY;
2979  goto out;
2980  }
2981 
2982  /* there's nothing to cancel */
2983  if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2984  !lkb->lkb_wait_type) {
2985  rv = -EBUSY;
2986  goto out;
2987  }
2988 
2989  switch (lkb->lkb_wait_type) {
2990  case DLM_MSG_LOOKUP:
2991  case DLM_MSG_REQUEST:
2993  rv = -EBUSY;
2994  goto out;
2995  case DLM_MSG_UNLOCK:
2996  case DLM_MSG_CANCEL:
2997  goto out;
2998  }
2999  /* add_to_waiters() will set OVERLAP_CANCEL */
3000  goto out_ok;
3001  }
3002 
3003  /* do we need to allow a force-unlock if there's a normal unlock
3004  already in progress? in what conditions could the normal unlock
3005  fail such that we'd want to send a force-unlock to be sure? */
3006 
3007  if (args->flags & DLM_LKF_FORCEUNLOCK) {
3008  if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3009  goto out;
3010 
3011  if (is_overlap_unlock(lkb))
3012  goto out;
3013 
3014  /* don't let scand try to do a cancel */
3015  del_timeout(lkb);
3016 
3017  if (lkb->lkb_flags & DLM_IFL_RESEND) {
3019  rv = -EBUSY;
3020  goto out;
3021  }
3022 
3023  switch (lkb->lkb_wait_type) {
3024  case DLM_MSG_LOOKUP:
3025  case DLM_MSG_REQUEST:
3027  rv = -EBUSY;
3028  goto out;
3029  case DLM_MSG_UNLOCK:
3030  goto out;
3031  }
3032  /* add_to_waiters() will set OVERLAP_UNLOCK */
3033  goto out_ok;
3034  }
3035 
3036  /* normal unlock not allowed if there's any op in progress */
3037  rv = -EBUSY;
3038  if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3039  goto out;
3040 
3041  out_ok:
3042  /* an overlapping op shouldn't blow away exflags from other op */
3043  lkb->lkb_exflags |= args->flags;
3044  lkb->lkb_sbflags = 0;
3045  lkb->lkb_astparam = args->astparam;
3046  rv = 0;
3047  out:
3048  if (rv)
3049  log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3050  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3051  args->flags, lkb->lkb_wait_type,
3052  lkb->lkb_resource->res_name);
3053  return rv;
3054 }
3055 
3056 /*
3057  * Four stage 4 varieties:
3058  * do_request(), do_convert(), do_unlock(), do_cancel()
3059  * These are called on the master node for the given lock and
3060  * from the central locking logic.
3061  */
3062 
3063 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3064 {
3065  int error = 0;
3066 
3067  if (can_be_granted(r, lkb, 1, 0, NULL)) {
3068  grant_lock(r, lkb);
3069  queue_cast(r, lkb, 0);
3070  goto out;
3071  }
3072 
3073  if (can_be_queued(lkb)) {
3074  error = -EINPROGRESS;
3075  add_lkb(r, lkb, DLM_LKSTS_WAITING);
3076  add_timeout(lkb);
3077  goto out;
3078  }
3079 
3080  error = -EAGAIN;
3081  queue_cast(r, lkb, -EAGAIN);
3082  out:
3083  return error;
3084 }
3085 
3086 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3087  int error)
3088 {
3089  switch (error) {
3090  case -EAGAIN:
3091  if (force_blocking_asts(lkb))
3092  send_blocking_asts_all(r, lkb);
3093  break;
3094  case -EINPROGRESS:
3095  send_blocking_asts(r, lkb);
3096  break;
3097  }
3098 }
3099 
3100 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3101 {
3102  int error = 0;
3103  int deadlk = 0;
3104 
3105  /* changing an existing lock may allow others to be granted */
3106 
3107  if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3108  grant_lock(r, lkb);
3109  queue_cast(r, lkb, 0);
3110  goto out;
3111  }
3112 
3113  /* can_be_granted() detected that this lock would block in a conversion
3114  deadlock, so we leave it on the granted queue and return EDEADLK in
3115  the ast for the convert. */
3116 
3117  if (deadlk) {
3118  /* it's left on the granted queue */
3119  revert_lock(r, lkb);
3120  queue_cast(r, lkb, -EDEADLK);
3121  error = -EDEADLK;
3122  goto out;
3123  }
3124 
3125  /* is_demoted() means the can_be_granted() above set the grmode
3126  to NL, and left us on the granted queue. This auto-demotion
3127  (due to CONVDEADLK) might mean other locks, and/or this lock, are
3128  now grantable. We have to try to grant other converting locks
3129  before we try again to grant this one. */
3130 
3131  if (is_demoted(lkb)) {
3132  grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3133  if (_can_be_granted(r, lkb, 1, 0)) {
3134  grant_lock(r, lkb);
3135  queue_cast(r, lkb, 0);
3136  goto out;
3137  }
3138  /* else fall through and move to convert queue */
3139  }
3140 
3141  if (can_be_queued(lkb)) {
3142  error = -EINPROGRESS;
3143  del_lkb(r, lkb);
3144  add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3145  add_timeout(lkb);
3146  goto out;
3147  }
3148 
3149  error = -EAGAIN;
3150  queue_cast(r, lkb, -EAGAIN);
3151  out:
3152  return error;
3153 }
3154 
3155 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3156  int error)
3157 {
3158  switch (error) {
3159  case 0:
3160  grant_pending_locks(r, NULL);
3161  /* grant_pending_locks also sends basts */
3162  break;
3163  case -EAGAIN:
3164  if (force_blocking_asts(lkb))
3165  send_blocking_asts_all(r, lkb);
3166  break;
3167  case -EINPROGRESS:
3168  send_blocking_asts(r, lkb);
3169  break;
3170  }
3171 }
3172 
3173 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3174 {
3175  remove_lock(r, lkb);
3176  queue_cast(r, lkb, -DLM_EUNLOCK);
3177  return -DLM_EUNLOCK;
3178 }
3179 
3180 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3181  int error)
3182 {
3183  grant_pending_locks(r, NULL);
3184 }
3185 
3186 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3187 
3188 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3189 {
3190  int error;
3191 
3192  error = revert_lock(r, lkb);
3193  if (error) {
3194  queue_cast(r, lkb, -DLM_ECANCEL);
3195  return -DLM_ECANCEL;
3196  }
3197  return 0;
3198 }
3199 
3200 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3201  int error)
3202 {
3203  if (error)
3204  grant_pending_locks(r, NULL);
3205 }
3206 
3207 /*
3208  * Four stage 3 varieties:
3209  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3210  */
3211 
3212 /* add a new lkb to a possibly new rsb, called by requesting process */
3213 
3214 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3215 {
3216  int error;
3217 
3218  /* set_master: sets lkb nodeid from r */
3219 
3220  error = set_master(r, lkb);
3221  if (error < 0)
3222  goto out;
3223  if (error) {
3224  error = 0;
3225  goto out;
3226  }
3227 
3228  if (is_remote(r)) {
3229  /* receive_request() calls do_request() on remote node */
3230  error = send_request(r, lkb);
3231  } else {
3232  error = do_request(r, lkb);
3233  /* for remote locks the request_reply is sent
3234  between do_request and do_request_effects */
3235  do_request_effects(r, lkb, error);
3236  }
3237  out:
3238  return error;
3239 }
3240 
3241 /* change some property of an existing lkb, e.g. mode */
3242 
3243 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3244 {
3245  int error;
3246 
3247  if (is_remote(r)) {
3248  /* receive_convert() calls do_convert() on remote node */
3249  error = send_convert(r, lkb);
3250  } else {
3251  error = do_convert(r, lkb);
3252  /* for remote locks the convert_reply is sent
3253  between do_convert and do_convert_effects */
3254  do_convert_effects(r, lkb, error);
3255  }
3256 
3257  return error;
3258 }
3259 
3260 /* remove an existing lkb from the granted queue */
3261 
3262 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3263 {
3264  int error;
3265 
3266  if (is_remote(r)) {
3267  /* receive_unlock() calls do_unlock() on remote node */
3268  error = send_unlock(r, lkb);
3269  } else {
3270  error = do_unlock(r, lkb);
3271  /* for remote locks the unlock_reply is sent
3272  between do_unlock and do_unlock_effects */
3273  do_unlock_effects(r, lkb, error);
3274  }
3275 
3276  return error;
3277 }
3278 
3279 /* remove an existing lkb from the convert or wait queue */
3280 
3281 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3282 {
3283  int error;
3284 
3285  if (is_remote(r)) {
3286  /* receive_cancel() calls do_cancel() on remote node */
3287  error = send_cancel(r, lkb);
3288  } else {
3289  error = do_cancel(r, lkb);
3290  /* for remote locks the cancel_reply is sent
3291  between do_cancel and do_cancel_effects */
3292  do_cancel_effects(r, lkb, error);
3293  }
3294 
3295  return error;
3296 }
3297 
3298 /*
3299  * Four stage 2 varieties:
3300  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3301  */
3302 
3303 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3304  int len, struct dlm_args *args)
3305 {
3306  struct dlm_rsb *r;
3307  int error;
3308 
3309  error = validate_lock_args(ls, lkb, args);
3310  if (error)
3311  return error;
3312 
3313  error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3314  if (error)
3315  return error;
3316 
3317  lock_rsb(r);
3318 
3319  attach_lkb(r, lkb);
3320  lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3321 
3322  error = _request_lock(r, lkb);
3323 
3324  unlock_rsb(r);
3325  put_rsb(r);
3326  return error;
3327 }
3328 
3329 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3330  struct dlm_args *args)
3331 {
3332  struct dlm_rsb *r;
3333  int error;
3334 
3335  r = lkb->lkb_resource;
3336 
3337  hold_rsb(r);
3338  lock_rsb(r);
3339 
3340  error = validate_lock_args(ls, lkb, args);
3341  if (error)
3342  goto out;
3343 
3344  error = _convert_lock(r, lkb);
3345  out:
3346  unlock_rsb(r);
3347  put_rsb(r);
3348  return error;
3349 }
3350 
3351 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3352  struct dlm_args *args)
3353 {
3354  struct dlm_rsb *r;
3355  int error;
3356 
3357  r = lkb->lkb_resource;
3358 
3359  hold_rsb(r);
3360  lock_rsb(r);
3361 
3362  error = validate_unlock_args(lkb, args);
3363  if (error)
3364  goto out;
3365 
3366  error = _unlock_lock(r, lkb);
3367  out:
3368  unlock_rsb(r);
3369  put_rsb(r);
3370  return error;
3371 }
3372 
3373 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3374  struct dlm_args *args)
3375 {
3376  struct dlm_rsb *r;
3377  int error;
3378 
3379  r = lkb->lkb_resource;
3380 
3381  hold_rsb(r);
3382  lock_rsb(r);
3383 
3384  error = validate_unlock_args(lkb, args);
3385  if (error)
3386  goto out;
3387 
3388  error = _cancel_lock(r, lkb);
3389  out:
3390  unlock_rsb(r);
3391  put_rsb(r);
3392  return error;
3393 }
3394 
3395 /*
3396  * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3397  */
3398 
3399 int dlm_lock(dlm_lockspace_t *lockspace,
3400  int mode,
3401  struct dlm_lksb *lksb,
3402  uint32_t flags,
3403  void *name,
3404  unsigned int namelen,
3405  uint32_t parent_lkid,
3406  void (*ast) (void *astarg),
3407  void *astarg,
3408  void (*bast) (void *astarg, int mode))
3409 {
3410  struct dlm_ls *ls;
3411  struct dlm_lkb *lkb;
3412  struct dlm_args args;
3413  int error, convert = flags & DLM_LKF_CONVERT;
3414 
3415  ls = dlm_find_lockspace_local(lockspace);
3416  if (!ls)
3417  return -EINVAL;
3418 
3419  dlm_lock_recovery(ls);
3420 
3421  if (convert)
3422  error = find_lkb(ls, lksb->sb_lkid, &lkb);
3423  else
3424  error = create_lkb(ls, &lkb);
3425 
3426  if (error)
3427  goto out;
3428 
3429  error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3430  astarg, bast, &args);
3431  if (error)
3432  goto out_put;
3433 
3434  if (convert)
3435  error = convert_lock(ls, lkb, &args);
3436  else
3437  error = request_lock(ls, lkb, name, namelen, &args);
3438 
3439  if (error == -EINPROGRESS)
3440  error = 0;
3441  out_put:
3442  if (convert || error)
3443  __put_lkb(ls, lkb);
3444  if (error == -EAGAIN || error == -EDEADLK)
3445  error = 0;
3446  out:
3447  dlm_unlock_recovery(ls);
3448  dlm_put_lockspace(ls);
3449  return error;
3450 }
3451 
3453  uint32_t lkid,
3454  uint32_t flags,
3455  struct dlm_lksb *lksb,
3456  void *astarg)
3457 {
3458  struct dlm_ls *ls;
3459  struct dlm_lkb *lkb;
3460  struct dlm_args args;
3461  int error;
3462 
3463  ls = dlm_find_lockspace_local(lockspace);
3464  if (!ls)
3465  return -EINVAL;
3466 
3467  dlm_lock_recovery(ls);
3468 
3469  error = find_lkb(ls, lkid, &lkb);
3470  if (error)
3471  goto out;
3472 
3473  error = set_unlock_args(flags, astarg, &args);
3474  if (error)
3475  goto out_put;
3476 
3477  if (flags & DLM_LKF_CANCEL)
3478  error = cancel_lock(ls, lkb, &args);
3479  else
3480  error = unlock_lock(ls, lkb, &args);
3481 
3482  if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3483  error = 0;
3484  if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3485  error = 0;
3486  out_put:
3487  dlm_put_lkb(lkb);
3488  out:
3489  dlm_unlock_recovery(ls);
3490  dlm_put_lockspace(ls);
3491  return error;
3492 }
3493 
3494 /*
3495  * send/receive routines for remote operations and replies
3496  *
3497  * send_args
3498  * send_common
3499  * send_request receive_request
3500  * send_convert receive_convert
3501  * send_unlock receive_unlock
3502  * send_cancel receive_cancel
3503  * send_grant receive_grant
3504  * send_bast receive_bast
3505  * send_lookup receive_lookup
3506  * send_remove receive_remove
3507  *
3508  * send_common_reply
3509  * receive_request_reply send_request_reply
3510  * receive_convert_reply send_convert_reply
3511  * receive_unlock_reply send_unlock_reply
3512  * receive_cancel_reply send_cancel_reply
3513  * receive_lookup_reply send_lookup_reply
3514  */
3515 
3516 static int _create_message(struct dlm_ls *ls, int mb_len,
3517  int to_nodeid, int mstype,
3518  struct dlm_message **ms_ret,
3519  struct dlm_mhandle **mh_ret)
3520 {
3521  struct dlm_message *ms;
3522  struct dlm_mhandle *mh;
3523  char *mb;
3524 
3525  /* get_buffer gives us a message handle (mh) that we need to
3526  pass into lowcomms_commit and a message buffer (mb) that we
3527  write our data into */
3528 
3529  mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3530  if (!mh)
3531  return -ENOBUFS;
3532 
3533  memset(mb, 0, mb_len);
3534 
3535  ms = (struct dlm_message *) mb;
3536 
3537  ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3538  ms->m_header.h_lockspace = ls->ls_global_id;
3539  ms->m_header.h_nodeid = dlm_our_nodeid();
3540  ms->m_header.h_length = mb_len;
3541  ms->m_header.h_cmd = DLM_MSG;
3542 
3543  ms->m_type = mstype;
3544 
3545  *mh_ret = mh;
3546  *ms_ret = ms;
3547  return 0;
3548 }
3549 
3550 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3551  int to_nodeid, int mstype,
3552  struct dlm_message **ms_ret,
3553  struct dlm_mhandle **mh_ret)
3554 {
3555  int mb_len = sizeof(struct dlm_message);
3556 
3557  switch (mstype) {
3558  case DLM_MSG_REQUEST:
3559  case DLM_MSG_LOOKUP:
3560  case DLM_MSG_REMOVE:
3561  mb_len += r->res_length;
3562  break;
3563  case DLM_MSG_CONVERT:
3564  case DLM_MSG_UNLOCK:
3565  case DLM_MSG_REQUEST_REPLY:
3566  case DLM_MSG_CONVERT_REPLY:
3567  case DLM_MSG_GRANT:
3568  if (lkb && lkb->lkb_lvbptr)
3569  mb_len += r->res_ls->ls_lvblen;
3570  break;
3571  }
3572 
3573  return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3574  ms_ret, mh_ret);
3575 }
3576 
3577 /* further lowcomms enhancements or alternate implementations may make
3578  the return value from this function useful at some point */
3579 
3580 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3581 {
3582  dlm_message_out(ms);
3584  return 0;
3585 }
3586 
3587 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3588  struct dlm_message *ms)
3589 {
3590  ms->m_nodeid = lkb->lkb_nodeid;
3591  ms->m_pid = lkb->lkb_ownpid;
3592  ms->m_lkid = lkb->lkb_id;
3593  ms->m_remid = lkb->lkb_remid;
3594  ms->m_exflags = lkb->lkb_exflags;
3595  ms->m_sbflags = lkb->lkb_sbflags;
3596  ms->m_flags = lkb->lkb_flags;
3597  ms->m_lvbseq = lkb->lkb_lvbseq;
3598  ms->m_status = lkb->lkb_status;
3599  ms->m_grmode = lkb->lkb_grmode;
3600  ms->m_rqmode = lkb->lkb_rqmode;
3601  ms->m_hash = r->res_hash;
3602 
3603  /* m_result and m_bastmode are set from function args,
3604  not from lkb fields */
3605 
3606  if (lkb->lkb_bastfn)
3607  ms->m_asts |= DLM_CB_BAST;
3608  if (lkb->lkb_astfn)
3609  ms->m_asts |= DLM_CB_CAST;
3610 
3611  /* compare with switch in create_message; send_remove() doesn't
3612  use send_args() */
3613 
3614  switch (ms->m_type) {
3615  case DLM_MSG_REQUEST:
3616  case DLM_MSG_LOOKUP:
3617  memcpy(ms->m_extra, r->res_name, r->res_length);
3618  break;
3619  case DLM_MSG_CONVERT:
3620  case DLM_MSG_UNLOCK:
3621  case DLM_MSG_REQUEST_REPLY:
3622  case DLM_MSG_CONVERT_REPLY:
3623  case DLM_MSG_GRANT:
3624  if (!lkb->lkb_lvbptr)
3625  break;
3626  memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3627  break;
3628  }
3629 }
3630 
3631 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3632 {
3633  struct dlm_message *ms;
3634  struct dlm_mhandle *mh;
3635  int to_nodeid, error;
3636 
3637  to_nodeid = r->res_nodeid;
3638 
3639  error = add_to_waiters(lkb, mstype, to_nodeid);
3640  if (error)
3641  return error;
3642 
3643  error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3644  if (error)
3645  goto fail;
3646 
3647  send_args(r, lkb, ms);
3648 
3649  error = send_message(mh, ms);
3650  if (error)
3651  goto fail;
3652  return 0;
3653 
3654  fail:
3655  remove_from_waiters(lkb, msg_reply_type(mstype));
3656  return error;
3657 }
3658 
3659 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3660 {
3661  return send_common(r, lkb, DLM_MSG_REQUEST);
3662 }
3663 
3664 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3665 {
3666  int error;
3667 
3668  error = send_common(r, lkb, DLM_MSG_CONVERT);
3669 
3670  /* down conversions go without a reply from the master */
3671  if (!error && down_conversion(lkb)) {
3672  remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3673  r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3674  r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3675  r->res_ls->ls_stub_ms.m_result = 0;
3676  __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3677  }
3678 
3679  return error;
3680 }
3681 
3682 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3683  MASTER_UNCERTAIN to force the next request on the rsb to confirm
3684  that the master is still correct. */
3685 
3686 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3687 {
3688  return send_common(r, lkb, DLM_MSG_UNLOCK);
3689 }
3690 
3691 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3692 {
3693  return send_common(r, lkb, DLM_MSG_CANCEL);
3694 }
3695 
3696 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3697 {
3698  struct dlm_message *ms;
3699  struct dlm_mhandle *mh;
3700  int to_nodeid, error;
3701 
3702  to_nodeid = lkb->lkb_nodeid;
3703 
3704  error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3705  if (error)
3706  goto out;
3707 
3708  send_args(r, lkb, ms);
3709 
3710  ms->m_result = 0;
3711 
3712  error = send_message(mh, ms);
3713  out:
3714  return error;
3715 }
3716 
3717 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3718 {
3719  struct dlm_message *ms;
3720  struct dlm_mhandle *mh;
3721  int to_nodeid, error;
3722 
3723  to_nodeid = lkb->lkb_nodeid;
3724 
3725  error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3726  if (error)
3727  goto out;
3728 
3729  send_args(r, lkb, ms);
3730 
3731  ms->m_bastmode = mode;
3732 
3733  error = send_message(mh, ms);
3734  out:
3735  return error;
3736 }
3737 
3738 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3739 {
3740  struct dlm_message *ms;
3741  struct dlm_mhandle *mh;
3742  int to_nodeid, error;
3743 
3744  to_nodeid = dlm_dir_nodeid(r);
3745 
3746  error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3747  if (error)
3748  return error;
3749 
3750  error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3751  if (error)
3752  goto fail;
3753 
3754  send_args(r, lkb, ms);
3755 
3756  error = send_message(mh, ms);
3757  if (error)
3758  goto fail;
3759  return 0;
3760 
3761  fail:
3762  remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3763  return error;
3764 }
3765 
3766 static int send_remove(struct dlm_rsb *r)
3767 {
3768  struct dlm_message *ms;
3769  struct dlm_mhandle *mh;
3770  int to_nodeid, error;
3771 
3772  to_nodeid = dlm_dir_nodeid(r);
3773 
3774  error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3775  if (error)
3776  goto out;
3777 
3778  memcpy(ms->m_extra, r->res_name, r->res_length);
3779  ms->m_hash = r->res_hash;
3780 
3781  error = send_message(mh, ms);
3782  out:
3783  return error;
3784 }
3785 
3786 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3787  int mstype, int rv)
3788 {
3789  struct dlm_message *ms;
3790  struct dlm_mhandle *mh;
3791  int to_nodeid, error;
3792 
3793  to_nodeid = lkb->lkb_nodeid;
3794 
3795  error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3796  if (error)
3797  goto out;
3798 
3799  send_args(r, lkb, ms);
3800 
3801  ms->m_result = rv;
3802 
3803  error = send_message(mh, ms);
3804  out:
3805  return error;
3806 }
3807 
3808 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3809 {
3810  return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3811 }
3812 
3813 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3814 {
3815  return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3816 }
3817 
3818 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3819 {
3820  return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3821 }
3822 
3823 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3824 {
3825  return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3826 }
3827 
3828 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3829  int ret_nodeid, int rv)
3830 {
3831  struct dlm_rsb *r = &ls->ls_stub_rsb;
3832  struct dlm_message *ms;
3833  struct dlm_mhandle *mh;
3834  int error, nodeid = ms_in->m_header.h_nodeid;
3835 
3836  error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3837  if (error)
3838  goto out;
3839 
3840  ms->m_lkid = ms_in->m_lkid;
3841  ms->m_result = rv;
3842  ms->m_nodeid = ret_nodeid;
3843 
3844  error = send_message(mh, ms);
3845  out:
3846  return error;
3847 }
3848 
3849 /* which args we save from a received message depends heavily on the type
3850  of message, unlike the send side where we can safely send everything about
3851  the lkb for any type of message */
3852 
3853 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3854 {
3855  lkb->lkb_exflags = ms->m_exflags;
3856  lkb->lkb_sbflags = ms->m_sbflags;
3857  lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3858  (ms->m_flags & 0x0000FFFF);
3859 }
3860 
3861 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3862 {
3863  if (ms->m_flags == DLM_IFL_STUB_MS)
3864  return;
3865 
3866  lkb->lkb_sbflags = ms->m_sbflags;
3867  lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3868  (ms->m_flags & 0x0000FFFF);
3869 }
3870 
3871 static int receive_extralen(struct dlm_message *ms)
3872 {
3873  return (ms->m_header.h_length - sizeof(struct dlm_message));
3874 }
3875 
3876 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3877  struct dlm_message *ms)
3878 {
3879  int len;
3880 
3881  if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3882  if (!lkb->lkb_lvbptr)
3883  lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3884  if (!lkb->lkb_lvbptr)
3885  return -ENOMEM;
3886  len = receive_extralen(ms);
3887  if (len > DLM_RESNAME_MAXLEN)
3888  len = DLM_RESNAME_MAXLEN;
3889  memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3890  }
3891  return 0;
3892 }
3893 
3894 static void fake_bastfn(void *astparam, int mode)
3895 {
3896  log_print("fake_bastfn should not be called");
3897 }
3898 
3899 static void fake_astfn(void *astparam)
3900 {
3901  log_print("fake_astfn should not be called");
3902 }
3903 
3904 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3905  struct dlm_message *ms)
3906 {
3907  lkb->lkb_nodeid = ms->m_header.h_nodeid;
3908  lkb->lkb_ownpid = ms->m_pid;
3909  lkb->lkb_remid = ms->m_lkid;
3910  lkb->lkb_grmode = DLM_LOCK_IV;
3911  lkb->lkb_rqmode = ms->m_rqmode;
3912 
3913  lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3914  lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3915 
3916  if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3917  /* lkb was just created so there won't be an lvb yet */
3918  lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3919  if (!lkb->lkb_lvbptr)
3920  return -ENOMEM;
3921  }
3922 
3923  return 0;
3924 }
3925 
3926 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3927  struct dlm_message *ms)
3928 {
3929  if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3930  return -EBUSY;
3931 
3932  if (receive_lvb(ls, lkb, ms))
3933  return -ENOMEM;
3934 
3935  lkb->lkb_rqmode = ms->m_rqmode;
3936  lkb->lkb_lvbseq = ms->m_lvbseq;
3937 
3938  return 0;
3939 }
3940 
3941 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3942  struct dlm_message *ms)
3943 {
3944  if (receive_lvb(ls, lkb, ms))
3945  return -ENOMEM;
3946  return 0;
3947 }
3948 
3949 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3950  uses to send a reply and that the remote end uses to process the reply. */
3951 
3952 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3953 {
3954  struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3955  lkb->lkb_nodeid = ms->m_header.h_nodeid;
3956  lkb->lkb_remid = ms->m_lkid;
3957 }
3958 
3959 /* This is called after the rsb is locked so that we can safely inspect
3960  fields in the lkb. */
3961 
3962 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3963 {
3964  int from = ms->m_header.h_nodeid;
3965  int error = 0;
3966 
3967  switch (ms->m_type) {
3968  case DLM_MSG_CONVERT:
3969  case DLM_MSG_UNLOCK:
3970  case DLM_MSG_CANCEL:
3971  if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3972  error = -EINVAL;
3973  break;
3974 
3975  case DLM_MSG_CONVERT_REPLY:
3976  case DLM_MSG_UNLOCK_REPLY:
3977  case DLM_MSG_CANCEL_REPLY:
3978  case DLM_MSG_GRANT:
3979  case DLM_MSG_BAST:
3980  if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3981  error = -EINVAL;
3982  break;
3983 
3984  case DLM_MSG_REQUEST_REPLY:
3985  if (!is_process_copy(lkb))
3986  error = -EINVAL;
3987  else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3988  error = -EINVAL;
3989  break;
3990 
3991  default:
3992  error = -EINVAL;
3993  }
3994 
3995  if (error)
3996  log_error(lkb->lkb_resource->res_ls,
3997  "ignore invalid message %d from %d %x %x %x %d",
3998  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3999  lkb->lkb_flags, lkb->lkb_nodeid);
4000  return error;
4001 }
4002 
4003 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4004 {
4005  char name[DLM_RESNAME_MAXLEN + 1];
4006  struct dlm_message *ms;
4007  struct dlm_mhandle *mh;
4008  struct dlm_rsb *r;
4009  uint32_t hash, b;
4010  int rv, dir_nodeid;
4011 
4012  memset(name, 0, sizeof(name));
4013  memcpy(name, ms_name, len);
4014 
4015  hash = jhash(name, len, 0);
4016  b = hash & (ls->ls_rsbtbl_size - 1);
4017 
4018  dir_nodeid = dlm_hash2nodeid(ls, hash);
4019 
4020  log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4021 
4022  spin_lock(&ls->ls_rsbtbl[b].lock);
4023  rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4024  if (!rv) {
4025  spin_unlock(&ls->ls_rsbtbl[b].lock);
4026  log_error(ls, "repeat_remove on keep %s", name);
4027  return;
4028  }
4029 
4030  rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4031  if (!rv) {
4032  spin_unlock(&ls->ls_rsbtbl[b].lock);
4033  log_error(ls, "repeat_remove on toss %s", name);
4034  return;
4035  }
4036 
4037  /* use ls->remove_name2 to avoid conflict with shrink? */
4038 
4039  spin_lock(&ls->ls_remove_spin);
4040  ls->ls_remove_len = len;
4042  spin_unlock(&ls->ls_remove_spin);
4043  spin_unlock(&ls->ls_rsbtbl[b].lock);
4044 
4045  rv = _create_message(ls, sizeof(struct dlm_message) + len,
4046  dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4047  if (rv)
4048  return;
4049 
4050  memcpy(ms->m_extra, name, len);
4051  ms->m_hash = hash;
4052 
4053  send_message(mh, ms);
4054 
4055  spin_lock(&ls->ls_remove_spin);
4056  ls->ls_remove_len = 0;
4058  spin_unlock(&ls->ls_remove_spin);
4059 }
4060 
4061 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4062 {
4063  struct dlm_lkb *lkb;
4064  struct dlm_rsb *r;
4065  int from_nodeid;
4066  int error, namelen = 0;
4067 
4068  from_nodeid = ms->m_header.h_nodeid;
4069 
4070  error = create_lkb(ls, &lkb);
4071  if (error)
4072  goto fail;
4073 
4074  receive_flags(lkb, ms);
4075  lkb->lkb_flags |= DLM_IFL_MSTCPY;
4076  error = receive_request_args(ls, lkb, ms);
4077  if (error) {
4078  __put_lkb(ls, lkb);
4079  goto fail;
4080  }
4081 
4082  /* The dir node is the authority on whether we are the master
4083  for this rsb or not, so if the master sends us a request, we should
4084  recreate the rsb if we've destroyed it. This race happens when we
4085  send a remove message to the dir node at the same time that the dir
4086  node sends us a request for the rsb. */
4087 
4088  namelen = receive_extralen(ms);
4089 
4090  error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4091  R_RECEIVE_REQUEST, &r);
4092  if (error) {
4093  __put_lkb(ls, lkb);
4094  goto fail;
4095  }
4096 
4097  lock_rsb(r);
4098 
4099  if (r->res_master_nodeid != dlm_our_nodeid()) {
4100  error = validate_master_nodeid(ls, r, from_nodeid);
4101  if (error) {
4102  unlock_rsb(r);
4103  put_rsb(r);
4104  __put_lkb(ls, lkb);
4105  goto fail;
4106  }
4107  }
4108 
4109  attach_lkb(r, lkb);
4110  error = do_request(r, lkb);
4111  send_request_reply(r, lkb, error);
4112  do_request_effects(r, lkb, error);
4113 
4114  unlock_rsb(r);
4115  put_rsb(r);
4116 
4117  if (error == -EINPROGRESS)
4118  error = 0;
4119  if (error)
4120  dlm_put_lkb(lkb);
4121  return 0;
4122 
4123  fail:
4124  /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4125  and do this receive_request again from process_lookup_list once
4126  we get the lookup reply. This would avoid a many repeated
4127  ENOTBLK request failures when the lookup reply designating us
4128  as master is delayed. */
4129 
4130  /* We could repeatedly return -EBADR here if our send_remove() is
4131  delayed in being sent/arriving/being processed on the dir node.
4132  Another node would repeatedly lookup up the master, and the dir
4133  node would continue returning our nodeid until our send_remove
4134  took effect.
4135 
4136  We send another remove message in case our previous send_remove
4137  was lost/ignored/missed somehow. */
4138 
4139  if (error != -ENOTBLK) {
4140  log_limit(ls, "receive_request %x from %d %d",
4141  ms->m_lkid, from_nodeid, error);
4142  }
4143 
4144  if (namelen && error == -EBADR) {
4145  send_repeat_remove(ls, ms->m_extra, namelen);
4146  msleep(1000);
4147  }
4148 
4149  setup_stub_lkb(ls, ms);
4150  send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4151  return error;
4152 }
4153 
4154 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4155 {
4156  struct dlm_lkb *lkb;
4157  struct dlm_rsb *r;
4158  int error, reply = 1;
4159 
4160  error = find_lkb(ls, ms->m_remid, &lkb);
4161  if (error)
4162  goto fail;
4163 
4164  if (lkb->lkb_remid != ms->m_lkid) {
4165  log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4166  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4167  (unsigned long long)lkb->lkb_recover_seq,
4168  ms->m_header.h_nodeid, ms->m_lkid);
4169  error = -ENOENT;
4170  goto fail;
4171  }
4172 
4173  r = lkb->lkb_resource;
4174 
4175  hold_rsb(r);
4176  lock_rsb(r);
4177 
4178  error = validate_message(lkb, ms);
4179  if (error)
4180  goto out;
4181 
4182  receive_flags(lkb, ms);
4183 
4184  error = receive_convert_args(ls, lkb, ms);
4185  if (error) {
4186  send_convert_reply(r, lkb, error);
4187  goto out;
4188  }
4189 
4190  reply = !down_conversion(lkb);
4191 
4192  error = do_convert(r, lkb);
4193  if (reply)
4194  send_convert_reply(r, lkb, error);
4195  do_convert_effects(r, lkb, error);
4196  out:
4197  unlock_rsb(r);
4198  put_rsb(r);
4199  dlm_put_lkb(lkb);
4200  return 0;
4201 
4202  fail:
4203  setup_stub_lkb(ls, ms);
4204  send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4205  return error;
4206 }
4207 
4208 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4209 {
4210  struct dlm_lkb *lkb;
4211  struct dlm_rsb *r;
4212  int error;
4213 
4214  error = find_lkb(ls, ms->m_remid, &lkb);
4215  if (error)
4216  goto fail;
4217 
4218  if (lkb->lkb_remid != ms->m_lkid) {
4219  log_error(ls, "receive_unlock %x remid %x remote %d %x",
4220  lkb->lkb_id, lkb->lkb_remid,
4221  ms->m_header.h_nodeid, ms->m_lkid);
4222  error = -ENOENT;
4223  goto fail;
4224  }
4225 
4226  r = lkb->lkb_resource;
4227 
4228  hold_rsb(r);
4229  lock_rsb(r);
4230 
4231  error = validate_message(lkb, ms);
4232  if (error)
4233  goto out;
4234 
4235  receive_flags(lkb, ms);
4236 
4237  error = receive_unlock_args(ls, lkb, ms);
4238  if (error) {
4239  send_unlock_reply(r, lkb, error);
4240  goto out;
4241  }
4242 
4243  error = do_unlock(r, lkb);
4244  send_unlock_reply(r, lkb, error);
4245  do_unlock_effects(r, lkb, error);
4246  out:
4247  unlock_rsb(r);
4248  put_rsb(r);
4249  dlm_put_lkb(lkb);
4250  return 0;
4251 
4252  fail:
4253  setup_stub_lkb(ls, ms);
4254  send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4255  return error;
4256 }
4257 
4258 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4259 {
4260  struct dlm_lkb *lkb;
4261  struct dlm_rsb *r;
4262  int error;
4263 
4264  error = find_lkb(ls, ms->m_remid, &lkb);
4265  if (error)
4266  goto fail;
4267 
4268  receive_flags(lkb, ms);
4269 
4270  r = lkb->lkb_resource;
4271 
4272  hold_rsb(r);
4273  lock_rsb(r);
4274 
4275  error = validate_message(lkb, ms);
4276  if (error)
4277  goto out;
4278 
4279  error = do_cancel(r, lkb);
4280  send_cancel_reply(r, lkb, error);
4281  do_cancel_effects(r, lkb, error);
4282  out:
4283  unlock_rsb(r);
4284  put_rsb(r);
4285  dlm_put_lkb(lkb);
4286  return 0;
4287 
4288  fail:
4289  setup_stub_lkb(ls, ms);
4290  send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4291  return error;
4292 }
4293 
4294 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4295 {
4296  struct dlm_lkb *lkb;
4297  struct dlm_rsb *r;
4298  int error;
4299 
4300  error = find_lkb(ls, ms->m_remid, &lkb);
4301  if (error)
4302  return error;
4303 
4304  r = lkb->lkb_resource;
4305 
4306  hold_rsb(r);
4307  lock_rsb(r);
4308 
4309  error = validate_message(lkb, ms);
4310  if (error)
4311  goto out;
4312 
4313  receive_flags_reply(lkb, ms);
4314  if (is_altmode(lkb))
4315  munge_altmode(lkb, ms);
4316  grant_lock_pc(r, lkb, ms);
4317  queue_cast(r, lkb, 0);
4318  out:
4319  unlock_rsb(r);
4320  put_rsb(r);
4321  dlm_put_lkb(lkb);
4322  return 0;
4323 }
4324 
4325 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4326 {
4327  struct dlm_lkb *lkb;
4328  struct dlm_rsb *r;
4329  int error;
4330 
4331  error = find_lkb(ls, ms->m_remid, &lkb);
4332  if (error)
4333  return error;
4334 
4335  r = lkb->lkb_resource;
4336 
4337  hold_rsb(r);
4338  lock_rsb(r);
4339 
4340  error = validate_message(lkb, ms);
4341  if (error)
4342  goto out;
4343 
4344  queue_bast(r, lkb, ms->m_bastmode);
4345  lkb->lkb_highbast = ms->m_bastmode;
4346  out:
4347  unlock_rsb(r);
4348  put_rsb(r);
4349  dlm_put_lkb(lkb);
4350  return 0;
4351 }
4352 
4353 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4354 {
4355  int len, error, ret_nodeid, from_nodeid, our_nodeid;
4356 
4357  from_nodeid = ms->m_header.h_nodeid;
4358  our_nodeid = dlm_our_nodeid();
4359 
4360  len = receive_extralen(ms);
4361 
4362  error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4363  &ret_nodeid, NULL);
4364 
4365  /* Optimization: we're master so treat lookup as a request */
4366  if (!error && ret_nodeid == our_nodeid) {
4367  receive_request(ls, ms);
4368  return;
4369  }
4370  send_lookup_reply(ls, ms, ret_nodeid, error);
4371 }
4372 
4373 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4374 {
4375  char name[DLM_RESNAME_MAXLEN+1];
4376  struct dlm_rsb *r;
4377  uint32_t hash, b;
4378  int rv, len, dir_nodeid, from_nodeid;
4379 
4380  from_nodeid = ms->m_header.h_nodeid;
4381 
4382  len = receive_extralen(ms);
4383 
4384  if (len > DLM_RESNAME_MAXLEN) {
4385  log_error(ls, "receive_remove from %d bad len %d",
4386  from_nodeid, len);
4387  return;
4388  }
4389 
4390  dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4391  if (dir_nodeid != dlm_our_nodeid()) {
4392  log_error(ls, "receive_remove from %d bad nodeid %d",
4393  from_nodeid, dir_nodeid);
4394  return;
4395  }
4396 
4397  /* Look for name on rsbtbl.toss, if it's there, kill it.
4398  If it's on rsbtbl.keep, it's being used, and we should ignore this
4399  message. This is an expected race between the dir node sending a
4400  request to the master node at the same time as the master node sends
4401  a remove to the dir node. The resolution to that race is for the
4402  dir node to ignore the remove message, and the master node to
4403  recreate the master rsb when it gets a request from the dir node for
4404  an rsb it doesn't have. */
4405 
4406  memset(name, 0, sizeof(name));
4407  memcpy(name, ms->m_extra, len);
4408 
4409  hash = jhash(name, len, 0);
4410  b = hash & (ls->ls_rsbtbl_size - 1);
4411 
4412  spin_lock(&ls->ls_rsbtbl[b].lock);
4413 
4414  rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4415  if (rv) {
4416  /* verify the rsb is on keep list per comment above */
4417  rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4418  if (rv) {
4419  /* should not happen */
4420  log_error(ls, "receive_remove from %d not found %s",
4421  from_nodeid, name);
4422  spin_unlock(&ls->ls_rsbtbl[b].lock);
4423  return;
4424  }
4425  if (r->res_master_nodeid != from_nodeid) {
4426  /* should not happen */
4427  log_error(ls, "receive_remove keep from %d master %d",
4428  from_nodeid, r->res_master_nodeid);
4429  dlm_print_rsb(r);
4430  spin_unlock(&ls->ls_rsbtbl[b].lock);
4431  return;
4432  }
4433 
4434  log_debug(ls, "receive_remove from %d master %d first %x %s",
4435  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4436  name);
4437  spin_unlock(&ls->ls_rsbtbl[b].lock);
4438  return;
4439  }
4440 
4441  if (r->res_master_nodeid != from_nodeid) {
4442  log_error(ls, "receive_remove toss from %d master %d",
4443  from_nodeid, r->res_master_nodeid);
4444  dlm_print_rsb(r);
4445  spin_unlock(&ls->ls_rsbtbl[b].lock);
4446  return;
4447  }
4448 
4449  if (kref_put(&r->res_ref, kill_rsb)) {
4450  rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4451  spin_unlock(&ls->ls_rsbtbl[b].lock);
4452  dlm_free_rsb(r);
4453  } else {
4454  log_error(ls, "receive_remove from %d rsb ref error",
4455  from_nodeid);
4456  dlm_print_rsb(r);
4457  spin_unlock(&ls->ls_rsbtbl[b].lock);
4458  }
4459 }
4460 
4461 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4462 {
4463  do_purge(ls, ms->m_nodeid, ms->m_pid);
4464 }
4465 
4466 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4467 {
4468  struct dlm_lkb *lkb;
4469  struct dlm_rsb *r;
4470  int error, mstype, result;
4471  int from_nodeid = ms->m_header.h_nodeid;
4472 
4473  error = find_lkb(ls, ms->m_remid, &lkb);
4474  if (error)
4475  return error;
4476 
4477  r = lkb->lkb_resource;
4478  hold_rsb(r);
4479  lock_rsb(r);
4480 
4481  error = validate_message(lkb, ms);
4482  if (error)
4483  goto out;
4484 
4485  mstype = lkb->lkb_wait_type;
4486  error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4487  if (error) {
4488  log_error(ls, "receive_request_reply %x remote %d %x result %d",
4489  lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4490  dlm_dump_rsb(r);
4491  goto out;
4492  }
4493 
4494  /* Optimization: the dir node was also the master, so it took our
4495  lookup as a request and sent request reply instead of lookup reply */
4496  if (mstype == DLM_MSG_LOOKUP) {
4497  r->res_master_nodeid = from_nodeid;
4498  r->res_nodeid = from_nodeid;
4499  lkb->lkb_nodeid = from_nodeid;
4500  }
4501 
4502  /* this is the value returned from do_request() on the master */
4503  result = ms->m_result;
4504 
4505  switch (result) {
4506  case -EAGAIN:
4507  /* request would block (be queued) on remote master */
4508  queue_cast(r, lkb, -EAGAIN);
4509  confirm_master(r, -EAGAIN);
4510  unhold_lkb(lkb); /* undoes create_lkb() */
4511  break;
4512 
4513  case -EINPROGRESS:
4514  case 0:
4515  /* request was queued or granted on remote master */
4516  receive_flags_reply(lkb, ms);
4517  lkb->lkb_remid = ms->m_lkid;
4518  if (is_altmode(lkb))
4519  munge_altmode(lkb, ms);
4520  if (result) {
4521  add_lkb(r, lkb, DLM_LKSTS_WAITING);
4522  add_timeout(lkb);
4523  } else {
4524  grant_lock_pc(r, lkb, ms);
4525  queue_cast(r, lkb, 0);
4526  }
4527  confirm_master(r, result);
4528  break;
4529 
4530  case -EBADR:
4531  case -ENOTBLK:
4532  /* find_rsb failed to find rsb or rsb wasn't master */
4533  log_limit(ls, "receive_request_reply %x from %d %d "
4534  "master %d dir %d first %x %s", lkb->lkb_id,
4535  from_nodeid, result, r->res_master_nodeid,
4537 
4538  if (r->res_dir_nodeid != dlm_our_nodeid() &&
4540  /* cause _request_lock->set_master->send_lookup */
4541  r->res_master_nodeid = 0;
4542  r->res_nodeid = -1;
4543  lkb->lkb_nodeid = -1;
4544  }
4545 
4546  if (is_overlap(lkb)) {
4547  /* we'll ignore error in cancel/unlock reply */
4548  queue_cast_overlap(r, lkb);
4549  confirm_master(r, result);
4550  unhold_lkb(lkb); /* undoes create_lkb() */
4551  } else {
4552  _request_lock(r, lkb);
4553 
4554  if (r->res_master_nodeid == dlm_our_nodeid())
4555  confirm_master(r, 0);
4556  }
4557  break;
4558 
4559  default:
4560  log_error(ls, "receive_request_reply %x error %d",
4561  lkb->lkb_id, result);
4562  }
4563 
4564  if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4565  log_debug(ls, "receive_request_reply %x result %d unlock",
4566  lkb->lkb_id, result);
4569  send_unlock(r, lkb);
4570  } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4571  log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4574  send_cancel(r, lkb);
4575  } else {
4578  }
4579  out:
4580  unlock_rsb(r);
4581  put_rsb(r);
4582  dlm_put_lkb(lkb);
4583  return 0;
4584 }
4585 
4586 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4587  struct dlm_message *ms)
4588 {
4589  /* this is the value returned from do_convert() on the master */
4590  switch (ms->m_result) {
4591  case -EAGAIN:
4592  /* convert would block (be queued) on remote master */
4593  queue_cast(r, lkb, -EAGAIN);
4594  break;
4595 
4596  case -EDEADLK:
4597  receive_flags_reply(lkb, ms);
4598  revert_lock_pc(r, lkb);
4599  queue_cast(r, lkb, -EDEADLK);
4600  break;
4601 
4602  case -EINPROGRESS:
4603  /* convert was queued on remote master */
4604  receive_flags_reply(lkb, ms);
4605  if (is_demoted(lkb))
4606  munge_demoted(lkb);
4607  del_lkb(r, lkb);
4608  add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4609  add_timeout(lkb);
4610  break;
4611 
4612  case 0:
4613  /* convert was granted on remote master */
4614  receive_flags_reply(lkb, ms);
4615  if (is_demoted(lkb))
4616  munge_demoted(lkb);
4617  grant_lock_pc(r, lkb, ms);
4618  queue_cast(r, lkb, 0);
4619  break;
4620 
4621  default:
4622  log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4623  lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4624  ms->m_result);
4625  dlm_print_rsb(r);
4626  dlm_print_lkb(lkb);
4627  }
4628 }
4629 
4630 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4631 {
4632  struct dlm_rsb *r = lkb->lkb_resource;
4633  int error;
4634 
4635  hold_rsb(r);
4636  lock_rsb(r);
4637 
4638  error = validate_message(lkb, ms);
4639  if (error)
4640  goto out;
4641 
4642  /* stub reply can happen with waiters_mutex held */
4643  error = remove_from_waiters_ms(lkb, ms);
4644  if (error)
4645  goto out;
4646 
4647  __receive_convert_reply(r, lkb, ms);
4648  out:
4649  unlock_rsb(r);
4650  put_rsb(r);
4651 }
4652 
4653 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4654 {
4655  struct dlm_lkb *lkb;
4656  int error;
4657 
4658  error = find_lkb(ls, ms->m_remid, &lkb);
4659  if (error)
4660  return error;
4661 
4662  _receive_convert_reply(lkb, ms);
4663  dlm_put_lkb(lkb);
4664  return 0;
4665 }
4666 
4667 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4668 {
4669  struct dlm_rsb *r = lkb->lkb_resource;
4670  int error;
4671 
4672  hold_rsb(r);
4673  lock_rsb(r);
4674 
4675  error = validate_message(lkb, ms);
4676  if (error)
4677  goto out;
4678 
4679  /* stub reply can happen with waiters_mutex held */
4680  error = remove_from_waiters_ms(lkb, ms);
4681  if (error)
4682  goto out;
4683 
4684  /* this is the value returned from do_unlock() on the master */
4685 
4686  switch (ms->m_result) {
4687  case -DLM_EUNLOCK:
4688  receive_flags_reply(lkb, ms);
4689  remove_lock_pc(r, lkb);
4690  queue_cast(r, lkb, -DLM_EUNLOCK);
4691  break;
4692  case -ENOENT:
4693  break;
4694  default:
4695  log_error(r->res_ls, "receive_unlock_reply %x error %d",
4696  lkb->lkb_id, ms->m_result);
4697  }
4698  out:
4699  unlock_rsb(r);
4700  put_rsb(r);
4701 }
4702 
4703 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4704 {
4705  struct dlm_lkb *lkb;
4706  int error;
4707 
4708  error = find_lkb(ls, ms->m_remid, &lkb);
4709  if (error)
4710  return error;
4711 
4712  _receive_unlock_reply(lkb, ms);
4713  dlm_put_lkb(lkb);
4714  return 0;
4715 }
4716 
4717 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4718 {
4719  struct dlm_rsb *r = lkb->lkb_resource;
4720  int error;
4721 
4722  hold_rsb(r);
4723  lock_rsb(r);
4724 
4725  error = validate_message(lkb, ms);
4726  if (error)
4727  goto out;
4728 
4729  /* stub reply can happen with waiters_mutex held */
4730  error = remove_from_waiters_ms(lkb, ms);
4731  if (error)
4732  goto out;
4733 
4734  /* this is the value returned from do_cancel() on the master */
4735 
4736  switch (ms->m_result) {
4737  case -DLM_ECANCEL:
4738  receive_flags_reply(lkb, ms);
4739  revert_lock_pc(r, lkb);
4740  queue_cast(r, lkb, -DLM_ECANCEL);
4741  break;
4742  case 0:
4743  break;
4744  default:
4745  log_error(r->res_ls, "receive_cancel_reply %x error %d",
4746  lkb->lkb_id, ms->m_result);
4747  }
4748  out:
4749  unlock_rsb(r);
4750  put_rsb(r);
4751 }
4752 
4753 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4754 {
4755  struct dlm_lkb *lkb;
4756  int error;
4757 
4758  error = find_lkb(ls, ms->m_remid, &lkb);
4759  if (error)
4760  return error;
4761 
4762  _receive_cancel_reply(lkb, ms);
4763  dlm_put_lkb(lkb);
4764  return 0;
4765 }
4766 
4767 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4768 {
4769  struct dlm_lkb *lkb;
4770  struct dlm_rsb *r;
4771  int error, ret_nodeid;
4772  int do_lookup_list = 0;
4773 
4774  error = find_lkb(ls, ms->m_lkid, &lkb);
4775  if (error) {
4776  log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4777  return;
4778  }
4779 
4780  /* ms->m_result is the value returned by dlm_master_lookup on dir node
4781  FIXME: will a non-zero error ever be returned? */
4782 
4783  r = lkb->lkb_resource;
4784  hold_rsb(r);
4785  lock_rsb(r);
4786 
4787  error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4788  if (error)
4789  goto out;
4790 
4791  ret_nodeid = ms->m_nodeid;
4792 
4793  /* We sometimes receive a request from the dir node for this
4794  rsb before we've received the dir node's loookup_reply for it.
4795  The request from the dir node implies we're the master, so we set
4796  ourself as master in receive_request_reply, and verify here that
4797  we are indeed the master. */
4798 
4799  if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4800  /* This should never happen */
4801  log_error(ls, "receive_lookup_reply %x from %d ret %d "
4802  "master %d dir %d our %d first %x %s",
4803  lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4806  }
4807 
4808  if (ret_nodeid == dlm_our_nodeid()) {
4809  r->res_master_nodeid = ret_nodeid;
4810  r->res_nodeid = 0;
4811  do_lookup_list = 1;
4812  r->res_first_lkid = 0;
4813  } else if (ret_nodeid == -1) {
4814  /* the remote node doesn't believe it's the dir node */
4815  log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4816  lkb->lkb_id, ms->m_header.h_nodeid);
4817  r->res_master_nodeid = 0;
4818  r->res_nodeid = -1;
4819  lkb->lkb_nodeid = -1;
4820  } else {
4821  /* set_master() will set lkb_nodeid from r */
4822  r->res_master_nodeid = ret_nodeid;
4823  r->res_nodeid = ret_nodeid;
4824  }
4825 
4826  if (is_overlap(lkb)) {
4827  log_debug(ls, "receive_lookup_reply %x unlock %x",
4828  lkb->lkb_id, lkb->lkb_flags);
4829  queue_cast_overlap(r, lkb);
4830  unhold_lkb(lkb); /* undoes create_lkb() */
4831  goto out_list;
4832  }
4833 
4834  _request_lock(r, lkb);
4835 
4836  out_list:
4837  if (do_lookup_list)
4838  process_lookup_list(r);
4839  out:
4840  unlock_rsb(r);
4841  put_rsb(r);
4842  dlm_put_lkb(lkb);
4843 }
4844 
4845 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4846  uint32_t saved_seq)
4847 {
4848  int error = 0, noent = 0;
4849 
4850  if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4851  log_limit(ls, "receive %d from non-member %d %x %x %d",
4852  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4853  ms->m_remid, ms->m_result);
4854  return;
4855  }
4856 
4857  switch (ms->m_type) {
4858 
4859  /* messages sent to a master node */
4860 
4861  case DLM_MSG_REQUEST:
4862  error = receive_request(ls, ms);
4863  break;
4864 
4865  case DLM_MSG_CONVERT:
4866  error = receive_convert(ls, ms);
4867  break;
4868 
4869  case DLM_MSG_UNLOCK:
4870  error = receive_unlock(ls, ms);
4871  break;
4872 
4873  case DLM_MSG_CANCEL:
4874  noent = 1;
4875  error = receive_cancel(ls, ms);
4876  break;
4877 
4878  /* messages sent from a master node (replies to above) */
4879 
4880  case DLM_MSG_REQUEST_REPLY:
4881  error = receive_request_reply(ls, ms);
4882  break;
4883 
4884  case DLM_MSG_CONVERT_REPLY:
4885  error = receive_convert_reply(ls, ms);
4886  break;
4887 
4888  case DLM_MSG_UNLOCK_REPLY:
4889  error = receive_unlock_reply(ls, ms);
4890  break;
4891 
4892  case DLM_MSG_CANCEL_REPLY:
4893  error = receive_cancel_reply(ls, ms);
4894  break;
4895 
4896  /* messages sent from a master node (only two types of async msg) */
4897 
4898  case DLM_MSG_GRANT:
4899  noent = 1;
4900  error = receive_grant(ls, ms);
4901  break;
4902 
4903  case DLM_MSG_BAST:
4904  noent = 1;
4905  error = receive_bast(ls, ms);
4906  break;
4907 
4908  /* messages sent to a dir node */
4909 
4910  case DLM_MSG_LOOKUP:
4911  receive_lookup(ls, ms);
4912  break;
4913 
4914  case DLM_MSG_REMOVE:
4915  receive_remove(ls, ms);
4916  break;
4917 
4918  /* messages sent from a dir node (remove has no reply) */
4919 
4920  case DLM_MSG_LOOKUP_REPLY:
4921  receive_lookup_reply(ls, ms);
4922  break;
4923 
4924  /* other messages */
4925 
4926  case DLM_MSG_PURGE:
4927  receive_purge(ls, ms);
4928  break;
4929 
4930  default:
4931  log_error(ls, "unknown message type %d", ms->m_type);
4932  }
4933 
4934  /*
4935  * When checking for ENOENT, we're checking the result of
4936  * find_lkb(m_remid):
4937  *
4938  * The lock id referenced in the message wasn't found. This may
4939  * happen in normal usage for the async messages and cancel, so
4940  * only use log_debug for them.
4941  *
4942  * Some errors are expected and normal.
4943  */
4944 
4945  if (error == -ENOENT && noent) {
4946  log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4947  ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4948  ms->m_lkid, saved_seq);
4949  } else if (error == -ENOENT) {
4950  log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4951  ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4952  ms->m_lkid, saved_seq);
4953 
4954  if (ms->m_type == DLM_MSG_CONVERT)
4955  dlm_dump_rsb_hash(ls, ms->m_hash);
4956  }
4957 
4958  if (error == -EINVAL) {
4959  log_error(ls, "receive %d inval from %d lkid %x remid %x "
4960  "saved_seq %u",
4961  ms->m_type, ms->m_header.h_nodeid,
4962  ms->m_lkid, ms->m_remid, saved_seq);
4963  }
4964 }
4965 
4966 /* If the lockspace is in recovery mode (locking stopped), then normal
4967  messages are saved on the requestqueue for processing after recovery is
4968  done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4969  messages off the requestqueue before we process new ones. This occurs right
4970  after recovery completes when we transition from saving all messages on
4971  requestqueue, to processing all the saved messages, to processing new
4972  messages as they arrive. */
4973 
4974 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4975  int nodeid)
4976 {
4977  if (dlm_locking_stopped(ls)) {
4978  /* If we were a member of this lockspace, left, and rejoined,
4979  other nodes may still be sending us messages from the
4980  lockspace generation before we left. */
4981  if (!ls->ls_generation) {
4982  log_limit(ls, "receive %d from %d ignore old gen",
4983  ms->m_type, nodeid);
4984  return;
4985  }
4986 
4987  dlm_add_requestqueue(ls, nodeid, ms);
4988  } else {
4990  _receive_message(ls, ms, 0);
4991  }
4992 }
4993 
4994 /* This is called by dlm_recoverd to process messages that were saved on
4995  the requestqueue. */
4996 
4997 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4998  uint32_t saved_seq)
4999 {
5000  _receive_message(ls, ms, saved_seq);
5001 }
5002 
5003 /* This is called by the midcomms layer when something is received for
5004  the lockspace. It could be either a MSG (normal message sent as part of
5005  standard locking activity) or an RCOM (recovery message sent as part of
5006  lockspace recovery). */
5007 
5008 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5009 {
5010  struct dlm_header *hd = &p->header;
5011  struct dlm_ls *ls;
5012  int type = 0;
5013 
5014  switch (hd->h_cmd) {
5015  case DLM_MSG:
5016  dlm_message_in(&p->message);
5017  type = p->message.m_type;
5018  break;
5019  case DLM_RCOM:
5020  dlm_rcom_in(&p->rcom);
5021  type = p->rcom.rc_type;
5022  break;
5023  default:
5024  log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5025  return;
5026  }
5027 
5028  if (hd->h_nodeid != nodeid) {
5029  log_print("invalid h_nodeid %d from %d lockspace %x",
5030  hd->h_nodeid, nodeid, hd->h_lockspace);
5031  return;
5032  }
5033 
5035  if (!ls) {
5036  if (dlm_config.ci_log_debug) {
5037  printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5038  "%u from %d cmd %d type %d\n",
5039  hd->h_lockspace, nodeid, hd->h_cmd, type);
5040  }
5041 
5042  if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5043  dlm_send_ls_not_ready(nodeid, &p->rcom);
5044  return;
5045  }
5046 
5047  /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5048  be inactive (in this ls) before transitioning to recovery mode */
5049 
5050  down_read(&ls->ls_recv_active);
5051  if (hd->h_cmd == DLM_MSG)
5052  dlm_receive_message(ls, &p->message, nodeid);
5053  else
5054  dlm_receive_rcom(ls, &p->rcom, nodeid);
5055  up_read(&ls->ls_recv_active);
5056 
5057  dlm_put_lockspace(ls);
5058 }
5059 
5060 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5061  struct dlm_message *ms_stub)
5062 {
5063  if (middle_conversion(lkb)) {
5064  hold_lkb(lkb);
5065  memset(ms_stub, 0, sizeof(struct dlm_message));
5066  ms_stub->m_flags = DLM_IFL_STUB_MS;
5067  ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5068  ms_stub->m_result = -EINPROGRESS;
5069  ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5070  _receive_convert_reply(lkb, ms_stub);
5071 
5072  /* Same special case as in receive_rcom_lock_args() */
5073  lkb->lkb_grmode = DLM_LOCK_IV;
5074  rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5075  unhold_lkb(lkb);
5076 
5077  } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5078  lkb->lkb_flags |= DLM_IFL_RESEND;
5079  }
5080 
5081  /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5082  conversions are async; there's no reply from the remote master */
5083 }
5084 
5085 /* A waiting lkb needs recovery if the master node has failed, or
5086  the master node is changing (only when no directory is used) */
5087 
5088 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5089  int dir_nodeid)
5090 {
5091  if (dlm_no_directory(ls))
5092  return 1;
5093 
5094  if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5095  return 1;
5096 
5097  return 0;
5098 }
5099 
5100 /* Recovery for locks that are waiting for replies from nodes that are now
5101  gone. We can just complete unlocks and cancels by faking a reply from the
5102  dead node. Requests and up-conversions we flag to be resent after
5103  recovery. Down-conversions can just be completed with a fake reply like
5104  unlocks. Conversions between PR and CW need special attention. */
5105 
5107 {
5108  struct dlm_lkb *lkb, *safe;
5109  struct dlm_message *ms_stub;
5110  int wait_type, stub_unlock_result, stub_cancel_result;
5111  int dir_nodeid;
5112 
5113  ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
5114  if (!ms_stub) {
5115  log_error(ls, "dlm_recover_waiters_pre no mem");
5116  return;
5117  }
5118 
5120 
5121  list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5122 
5123  dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5124 
5125  /* exclude debug messages about unlocks because there can be so
5126  many and they aren't very interesting */
5127 
5128  if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5129  log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5130  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5131  lkb->lkb_id,
5132  lkb->lkb_remid,
5133  lkb->lkb_wait_type,
5134  lkb->lkb_resource->res_nodeid,
5135  lkb->lkb_nodeid,
5136  lkb->lkb_wait_nodeid,
5137  dir_nodeid);
5138  }
5139 
5140  /* all outstanding lookups, regardless of destination will be
5141  resent after recovery is done */
5142 
5143  if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5144  lkb->lkb_flags |= DLM_IFL_RESEND;
5145  continue;
5146  }
5147 
5148  if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5149  continue;
5150 
5151  wait_type = lkb->lkb_wait_type;
5152  stub_unlock_result = -DLM_EUNLOCK;
5153  stub_cancel_result = -DLM_ECANCEL;
5154 
5155  /* Main reply may have been received leaving a zero wait_type,
5156  but a reply for the overlapping op may not have been
5157  received. In that case we need to fake the appropriate
5158  reply for the overlap op. */
5159 
5160  if (!wait_type) {
5161  if (is_overlap_cancel(lkb)) {
5162  wait_type = DLM_MSG_CANCEL;
5163  if (lkb->lkb_grmode == DLM_LOCK_IV)
5164  stub_cancel_result = 0;
5165  }
5166  if (is_overlap_unlock(lkb)) {
5167  wait_type = DLM_MSG_UNLOCK;
5168  if (lkb->lkb_grmode == DLM_LOCK_IV)
5169  stub_unlock_result = -ENOENT;
5170  }
5171 
5172  log_debug(ls, "rwpre overlap %x %x %d %d %d",
5173  lkb->lkb_id, lkb->lkb_flags, wait_type,
5174  stub_cancel_result, stub_unlock_result);
5175  }
5176 
5177  switch (wait_type) {
5178 
5179  case DLM_MSG_REQUEST:
5180  lkb->lkb_flags |= DLM_IFL_RESEND;
5181  break;
5182 
5183  case DLM_MSG_CONVERT:
5184  recover_convert_waiter(ls, lkb, ms_stub);
5185  break;
5186 
5187  case DLM_MSG_UNLOCK:
5188  hold_lkb(lkb);
5189  memset(ms_stub, 0, sizeof(struct dlm_message));
5190  ms_stub->m_flags = DLM_IFL_STUB_MS;
5191  ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5192  ms_stub->m_result = stub_unlock_result;
5193  ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5194  _receive_unlock_reply(lkb, ms_stub);
5195  dlm_put_lkb(lkb);
5196  break;
5197 
5198  case DLM_MSG_CANCEL:
5199  hold_lkb(lkb);
5200  memset(ms_stub, 0, sizeof(struct dlm_message));
5201  ms_stub->m_flags = DLM_IFL_STUB_MS;
5202  ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5203  ms_stub->m_result = stub_cancel_result;
5204  ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5205  _receive_cancel_reply(lkb, ms_stub);
5206  dlm_put_lkb(lkb);
5207  break;
5208 
5209  default:
5210  log_error(ls, "invalid lkb wait_type %d %d",
5211  lkb->lkb_wait_type, wait_type);
5212  }
5213  schedule();
5214  }
5216  kfree(ms_stub);
5217 }
5218 
5219 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5220 {
5221  struct dlm_lkb *lkb;
5222  int found = 0;
5223 
5226  if (lkb->lkb_flags & DLM_IFL_RESEND) {
5227  hold_lkb(lkb);
5228  found = 1;
5229  break;
5230  }
5231  }
5233 
5234  if (!found)
5235  lkb = NULL;
5236  return lkb;
5237 }
5238 
5239 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
5240  master or dir-node for r. Processing the lkb may result in it being placed
5241  back on waiters. */
5242 
5243 /* We do this after normal locking has been enabled and any saved messages
5244  (in requestqueue) have been processed. We should be confident that at
5245  this point we won't get or process a reply to any of these waiting
5246  operations. But, new ops may be coming in on the rsbs/locks here from
5247  userspace or remotely. */
5248 
5249 /* there may have been an overlap unlock/cancel prior to recovery or after
5250  recovery. if before, the lkb may still have a pos wait_count; if after, the
5251  overlap flag would just have been set and nothing new sent. we can be
5252  confident here than any replies to either the initial op or overlap ops
5253  prior to recovery have been received. */
5254 
5256 {
5257  struct dlm_lkb *lkb;
5258  struct dlm_rsb *r;
5259  int error = 0, mstype, err, oc, ou;
5260 
5261  while (1) {
5262  if (dlm_locking_stopped(ls)) {
5263  log_debug(ls, "recover_waiters_post aborted");
5264  error = -EINTR;
5265  break;
5266  }
5267 
5268  lkb = find_resend_waiter(ls);
5269  if (!lkb)
5270  break;
5271 
5272  r = lkb->lkb_resource;
5273  hold_rsb(r);
5274  lock_rsb(r);
5275 
5276  mstype = lkb->lkb_wait_type;
5277  oc = is_overlap_cancel(lkb);
5278  ou = is_overlap_unlock(lkb);
5279  err = 0;
5280 
5281  log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5282  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5283  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5284  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5285  dlm_dir_nodeid(r), oc, ou);
5286 
5287  /* At this point we assume that we won't get a reply to any
5288  previous op or overlap op on this lock. First, do a big
5289  remove_from_waiters() for all previous ops. */
5290 
5291  lkb->lkb_flags &= ~DLM_IFL_RESEND;
5294  lkb->lkb_wait_type = 0;
5295  lkb->lkb_wait_count = 0;
5297  list_del_init(&lkb->lkb_wait_reply);
5299  unhold_lkb(lkb); /* for waiters list */
5300 
5301  if (oc || ou) {
5302  /* do an unlock or cancel instead of resending */
5303  switch (mstype) {
5304  case DLM_MSG_LOOKUP:
5305  case DLM_MSG_REQUEST:
5306  queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5307  -DLM_ECANCEL);
5308  unhold_lkb(lkb); /* undoes create_lkb() */
5309  break;
5310  case DLM_MSG_CONVERT:
5311  if (oc) {
5312  queue_cast(r, lkb, -DLM_ECANCEL);
5313  } else {
5315  _unlock_lock(r, lkb);
5316  }
5317  break;
5318  default:
5319  err = 1;
5320  }
5321  } else {
5322  switch (mstype) {
5323  case DLM_MSG_LOOKUP:
5324  case DLM_MSG_REQUEST:
5325  _request_lock(r, lkb);
5326  if (is_master(r))
5327  confirm_master(r, 0);
5328  break;
5329  case DLM_MSG_CONVERT:
5330  _convert_lock(r, lkb);
5331  break;
5332  default:
5333  err = 1;
5334  }
5335  }
5336 
5337  if (err) {
5338  log_error(ls, "waiter %x msg %d r_nodeid %d "
5339  "dir_nodeid %d overlap %d %d",
5340  lkb->lkb_id, mstype, r->res_nodeid,
5341  dlm_dir_nodeid(r), oc, ou);
5342  }
5343  unlock_rsb(r);
5344  put_rsb(r);
5345  dlm_put_lkb(lkb);
5346  }
5347 
5348  return error;
5349 }
5350 
5351 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5352  struct list_head *list)
5353 {
5354  struct dlm_lkb *lkb, *safe;
5355 
5356  list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5357  if (!is_master_copy(lkb))
5358  continue;
5359 
5360  /* don't purge lkbs we've added in recover_master_copy for
5361  the current recovery seq */
5362 
5363  if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5364  continue;
5365 
5366  del_lkb(r, lkb);
5367 
5368  /* this put should free the lkb */
5369  if (!dlm_put_lkb(lkb))
5370  log_error(ls, "purged mstcpy lkb not released");
5371  }
5372 }
5373 
5375 {
5376  struct dlm_ls *ls = r->res_ls;
5377 
5378  purge_mstcpy_list(ls, r, &r->res_grantqueue);
5379  purge_mstcpy_list(ls, r, &r->res_convertqueue);
5380  purge_mstcpy_list(ls, r, &r->res_waitqueue);
5381 }
5382 
5383 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5384  struct list_head *list,
5385  int nodeid_gone, unsigned int *count)
5386 {
5387  struct dlm_lkb *lkb, *safe;
5388 
5389  list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5390  if (!is_master_copy(lkb))
5391  continue;
5392 
5393  if ((lkb->lkb_nodeid == nodeid_gone) ||
5394  dlm_is_removed(ls, lkb->lkb_nodeid)) {
5395 
5396  del_lkb(r, lkb);
5397 
5398  /* this put should free the lkb */
5399  if (!dlm_put_lkb(lkb))
5400  log_error(ls, "purged dead lkb not released");
5401 
5402  rsb_set_flag(r, RSB_RECOVER_GRANT);
5403 
5404  (*count)++;
5405  }
5406  }
5407 }
5408 
5409 /* Get rid of locks held by nodes that are gone. */
5410 
5411 void dlm_recover_purge(struct dlm_ls *ls)
5412 {
5413  struct dlm_rsb *r;
5414  struct dlm_member *memb;
5415  int nodes_count = 0;
5416  int nodeid_gone = 0;
5417  unsigned int lkb_count = 0;
5418 
5419  /* cache one removed nodeid to optimize the common
5420  case of a single node removed */
5421 
5422  list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5423  nodes_count++;
5424  nodeid_gone = memb->nodeid;
5425  }
5426 
5427  if (!nodes_count)
5428  return;
5429 
5430  down_write(&ls->ls_root_sem);
5431  list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5432  hold_rsb(r);
5433  lock_rsb(r);
5434  if (is_master(r)) {
5435  purge_dead_list(ls, r, &r->res_grantqueue,
5436  nodeid_gone, &lkb_count);
5437  purge_dead_list(ls, r, &r->res_convertqueue,
5438  nodeid_gone, &lkb_count);
5439  purge_dead_list(ls, r, &r->res_waitqueue,
5440  nodeid_gone, &lkb_count);
5441  }
5442  unlock_rsb(r);
5443  unhold_rsb(r);
5444  cond_resched();
5445  }
5446  up_write(&ls->ls_root_sem);
5447 
5448  if (lkb_count)
5449  log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
5450  lkb_count, nodes_count);
5451 }
5452 
5453 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5454 {
5455  struct rb_node *n;
5456  struct dlm_rsb *r;
5457 
5458  spin_lock(&ls->ls_rsbtbl[bucket].lock);
5459  for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5460  r = rb_entry(n, struct dlm_rsb, res_hashnode);
5461 
5462  if (!rsb_flag(r, RSB_RECOVER_GRANT))
5463  continue;
5464  if (!is_master(r)) {
5465  rsb_clear_flag(r, RSB_RECOVER_GRANT);
5466  continue;
5467  }
5468  hold_rsb(r);
5469  spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5470  return r;
5471  }
5472  spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5473  return NULL;
5474 }
5475 
5476 /*
5477  * Attempt to grant locks on resources that we are the master of.
5478  * Locks may have become grantable during recovery because locks
5479  * from departed nodes have been purged (or not rebuilt), allowing
5480  * previously blocked locks to now be granted. The subset of rsb's
5481  * we are interested in are those with lkb's on either the convert or
5482  * waiting queues.
5483  *
5484  * Simplest would be to go through each master rsb and check for non-empty
5485  * convert or waiting queues, and attempt to grant on those rsbs.
5486  * Checking the queues requires lock_rsb, though, for which we'd need
5487  * to release the rsbtbl lock. This would make iterating through all
5488  * rsb's very inefficient. So, we rely on earlier recovery routines
5489  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5490  * locks for.
5491  */
5492 
5493 void dlm_recover_grant(struct dlm_ls *ls)
5494 {
5495  struct dlm_rsb *r;
5496  int bucket = 0;
5497  unsigned int count = 0;
5498  unsigned int rsb_count = 0;
5499  unsigned int lkb_count = 0;
5500 
5501  while (1) {
5502  r = find_grant_rsb(ls, bucket);
5503  if (!r) {
5504  if (bucket == ls->ls_rsbtbl_size - 1)
5505  break;
5506  bucket++;
5507  continue;
5508  }
5509  rsb_count++;
5510  count = 0;
5511  lock_rsb(r);
5512  /* the RECOVER_GRANT flag is checked in the grant path */
5513  grant_pending_locks(r, &count);
5514  rsb_clear_flag(r, RSB_RECOVER_GRANT);
5515  lkb_count += count;
5516  confirm_master(r, 0);
5517  unlock_rsb(r);
5518  put_rsb(r);
5519  cond_resched();
5520  }
5521 
5522  if (lkb_count)
5523  log_debug(ls, "dlm_recover_grant %u locks on %u resources",
5524  lkb_count, rsb_count);
5525 }
5526 
5527 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5528  uint32_t remid)
5529 {
5530  struct dlm_lkb *lkb;
5531 
5532  list_for_each_entry(lkb, head, lkb_statequeue) {
5533  if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5534  return lkb;
5535  }
5536  return NULL;
5537 }
5538 
5539 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5540  uint32_t remid)
5541 {
5542  struct dlm_lkb *lkb;
5543 
5544  lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5545  if (lkb)
5546  return lkb;
5547  lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5548  if (lkb)
5549  return lkb;
5550  lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5551  if (lkb)
5552  return lkb;
5553  return NULL;
5554 }
5555 
5556 /* needs at least dlm_rcom + rcom_lock */
5557 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5558  struct dlm_rsb *r, struct dlm_rcom *rc)
5559 {
5560  struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5561 
5562  lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5563  lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5564  lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5565  lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5566  lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5567  lkb->lkb_flags |= DLM_IFL_MSTCPY;
5568  lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5569  lkb->lkb_rqmode = rl->rl_rqmode;
5570  lkb->lkb_grmode = rl->rl_grmode;
5571  /* don't set lkb_status because add_lkb wants to itself */
5572 
5573  lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5574  lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5575 
5576  if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5577  int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5579  if (lvblen > ls->ls_lvblen)
5580  return -EINVAL;
5581  lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5582  if (!lkb->lkb_lvbptr)
5583  return -ENOMEM;
5584  memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5585  }
5586 
5587  /* Conversions between PR and CW (middle modes) need special handling.
5588  The real granted mode of these converting locks cannot be determined
5589  until all locks have been rebuilt on the rsb (recover_conversion) */
5590 
5592  middle_conversion(lkb)) {
5594  lkb->lkb_grmode = DLM_LOCK_IV;
5595  rsb_set_flag(r, RSB_RECOVER_CONVERT);
5596  }
5597 
5598  return 0;
5599 }
5600 
5601 /* This lkb may have been recovered in a previous aborted recovery so we need
5602  to check if the rsb already has an lkb with the given remote nodeid/lkid.
5603  If so we just send back a standard reply. If not, we create a new lkb with
5604  the given values and send back our lkid. We send back our lkid by sending
5605  back the rcom_lock struct we got but with the remid field filled in. */
5606 
5607 /* needs at least dlm_rcom + rcom_lock */
5608 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5609 {
5610  struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5611  struct dlm_rsb *r;
5612  struct dlm_lkb *lkb;
5613  uint32_t remid = 0;
5614  int from_nodeid = rc->rc_header.h_nodeid;
5615  int error;
5616 
5617  if (rl->rl_parent_lkid) {
5618  error = -EOPNOTSUPP;
5619  goto out;
5620  }
5621 
5622  remid = le32_to_cpu(rl->rl_lkid);
5623 
5624  /* In general we expect the rsb returned to be R_MASTER, but we don't
5625  have to require it. Recovery of masters on one node can overlap
5626  recovery of locks on another node, so one node can send us MSTCPY
5627  locks before we've made ourselves master of this rsb. We can still
5628  add new MSTCPY locks that we receive here without any harm; when
5629  we make ourselves master, dlm_recover_masters() won't touch the
5630  MSTCPY locks we've received early. */
5631 
5632  error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5633  from_nodeid, R_RECEIVE_RECOVER, &r);
5634  if (error)
5635  goto out;
5636 
5637  lock_rsb(r);
5638 
5639  if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5640  log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5641  from_nodeid, remid);
5642  error = -EBADR;
5643  goto out_unlock;
5644  }
5645 
5646  lkb = search_remid(r, from_nodeid, remid);
5647  if (lkb) {
5648  error = -EEXIST;
5649  goto out_remid;
5650  }
5651 
5652  error = create_lkb(ls, &lkb);
5653  if (error)
5654  goto out_unlock;
5655 
5656  error = receive_rcom_lock_args(ls, lkb, r, rc);
5657  if (error) {
5658  __put_lkb(ls, lkb);
5659  goto out_unlock;
5660  }
5661 
5662  attach_lkb(r, lkb);
5663  add_lkb(r, lkb, rl->rl_status);
5664  error = 0;
5665  ls->ls_recover_locks_in++;
5666 
5667  if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5668  rsb_set_flag(r, RSB_RECOVER_GRANT);
5669 
5670  out_remid:
5671  /* this is the new value returned to the lock holder for
5672  saving in its process-copy lkb */
5673  rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5674 
5675  lkb->lkb_recover_seq = ls->ls_recover_seq;
5676 
5677  out_unlock:
5678  unlock_rsb(r);
5679  put_rsb(r);
5680  out:
5681  if (error && error != -EEXIST)
5682  log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
5683  from_nodeid, remid, error);
5684  rl->rl_result = cpu_to_le32(error);
5685  return error;
5686 }
5687 
5688 /* needs at least dlm_rcom + rcom_lock */
5689 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5690 {
5691  struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5692  struct dlm_rsb *r;
5693  struct dlm_lkb *lkb;
5694  uint32_t lkid, remid;
5695  int error, result;
5696 
5697  lkid = le32_to_cpu(rl->rl_lkid);
5698  remid = le32_to_cpu(rl->rl_remid);
5699  result = le32_to_cpu(rl->rl_result);
5700 
5701  error = find_lkb(ls, lkid, &lkb);
5702  if (error) {
5703  log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5704  lkid, rc->rc_header.h_nodeid, remid, result);
5705  return error;
5706  }
5707 
5708  r = lkb->lkb_resource;
5709  hold_rsb(r);
5710  lock_rsb(r);
5711 
5712  if (!is_process_copy(lkb)) {
5713  log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5714  lkid, rc->rc_header.h_nodeid, remid, result);
5715  dlm_dump_rsb(r);
5716  unlock_rsb(r);
5717  put_rsb(r);
5718  dlm_put_lkb(lkb);
5719  return -EINVAL;
5720  }
5721 
5722  switch (result) {
5723  case -EBADR:
5724  /* There's a chance the new master received our lock before
5725  dlm_recover_master_reply(), this wouldn't happen if we did
5726  a barrier between recover_masters and recover_locks. */
5727 
5728  log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5729  lkid, rc->rc_header.h_nodeid, remid, result);
5730 
5731  dlm_send_rcom_lock(r, lkb);
5732  goto out;
5733  case -EEXIST:
5734  case 0:
5735  lkb->lkb_remid = remid;
5736  break;
5737  default:
5738  log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5739  lkid, rc->rc_header.h_nodeid, remid, result);
5740  }
5741 
5742  /* an ack for dlm_recover_locks() which waits for replies from
5743  all the locks it sends to new masters */
5744  dlm_recovered_lock(r);
5745  out:
5746  unlock_rsb(r);
5747  put_rsb(r);
5748  dlm_put_lkb(lkb);
5749 
5750  return 0;
5751 }
5752 
5753 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5754  int mode, uint32_t flags, void *name, unsigned int namelen,
5755  unsigned long timeout_cs)
5756 {
5757  struct dlm_lkb *lkb;
5758  struct dlm_args args;
5759  int error;
5760 
5761  dlm_lock_recovery(ls);
5762 
5763  error = create_lkb(ls, &lkb);
5764  if (error) {
5765  kfree(ua);
5766  goto out;
5767  }
5768 
5769  if (flags & DLM_LKF_VALBLK) {
5770  ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5771  if (!ua->lksb.sb_lvbptr) {
5772  kfree(ua);
5773  __put_lkb(ls, lkb);
5774  error = -ENOMEM;
5775  goto out;
5776  }
5777  }
5778 
5779  /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5780  When DLM_IFL_USER is set, the dlm knows that this is a userspace
5781  lock and that lkb_astparam is the dlm_user_args structure. */
5782 
5783  error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5784  fake_astfn, ua, fake_bastfn, &args);
5785  lkb->lkb_flags |= DLM_IFL_USER;
5786 
5787  if (error) {
5788  __put_lkb(ls, lkb);
5789  goto out;
5790  }
5791 
5792  error = request_lock(ls, lkb, name, namelen, &args);
5793 
5794  switch (error) {
5795  case 0:
5796  break;
5797  case -EINPROGRESS:
5798  error = 0;
5799  break;
5800  case -EAGAIN:
5801  error = 0;
5802  /* fall through */
5803  default:
5804  __put_lkb(ls, lkb);
5805  goto out;
5806  }
5807 
5808  /* add this new lkb to the per-process list of locks */
5809  spin_lock(&ua->proc->locks_spin);
5810  hold_lkb(lkb);
5811  list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5812  spin_unlock(&ua->proc->locks_spin);
5813  out:
5814  dlm_unlock_recovery(ls);
5815  return error;
5816 }
5817 
5818 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5819  int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5820  unsigned long timeout_cs)
5821 {
5822  struct dlm_lkb *lkb;
5823  struct dlm_args args;
5824  struct dlm_user_args *ua;
5825  int error;
5826 
5827  dlm_lock_recovery(ls);
5828 
5829  error = find_lkb(ls, lkid, &lkb);
5830  if (error)
5831  goto out;
5832 
5833  /* user can change the params on its lock when it converts it, or
5834  add an lvb that didn't exist before */
5835 
5836  ua = lkb->lkb_ua;
5837 
5838  if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5839  ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5840  if (!ua->lksb.sb_lvbptr) {
5841  error = -ENOMEM;
5842  goto out_put;
5843  }
5844  }
5845  if (lvb_in && ua->lksb.sb_lvbptr)
5846  memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5847 
5848  ua->xid = ua_tmp->xid;
5849  ua->castparam = ua_tmp->castparam;
5850  ua->castaddr = ua_tmp->castaddr;
5851  ua->bastparam = ua_tmp->bastparam;
5852  ua->bastaddr = ua_tmp->bastaddr;
5853  ua->user_lksb = ua_tmp->user_lksb;
5854 
5855  error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5856  fake_astfn, ua, fake_bastfn, &args);
5857  if (error)
5858  goto out_put;
5859 
5860  error = convert_lock(ls, lkb, &args);
5861 
5862  if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5863  error = 0;
5864  out_put:
5865  dlm_put_lkb(lkb);
5866  out:
5867  dlm_unlock_recovery(ls);
5868  kfree(ua_tmp);
5869  return error;
5870 }
5871 
5872 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5873  uint32_t flags, uint32_t lkid, char *lvb_in)
5874 {
5875  struct dlm_lkb *lkb;
5876  struct dlm_args args;
5877  struct dlm_user_args *ua;
5878  int error;
5879 
5880  dlm_lock_recovery(ls);
5881 
5882  error = find_lkb(ls, lkid, &lkb);
5883  if (error)
5884  goto out;
5885 
5886  ua = lkb->lkb_ua;
5887 
5888  if (lvb_in && ua->lksb.sb_lvbptr)
5889  memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5890  if (ua_tmp->castparam)
5891  ua->castparam = ua_tmp->castparam;
5892  ua->user_lksb = ua_tmp->user_lksb;
5893 
5894  error = set_unlock_args(flags, ua, &args);
5895  if (error)
5896  goto out_put;
5897 
5898  error = unlock_lock(ls, lkb, &args);
5899 
5900  if (error == -DLM_EUNLOCK)
5901  error = 0;
5902  /* from validate_unlock_args() */
5903  if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5904  error = 0;
5905  if (error)
5906  goto out_put;
5907 
5908  spin_lock(&ua->proc->locks_spin);
5909  /* dlm_user_add_cb() may have already taken lkb off the proc list */
5910  if (!list_empty(&lkb->lkb_ownqueue))
5911  list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5912  spin_unlock(&ua->proc->locks_spin);
5913  out_put:
5914  dlm_put_lkb(lkb);
5915  out:
5916  dlm_unlock_recovery(ls);
5917  kfree(ua_tmp);
5918  return error;
5919 }
5920 
5921 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5922  uint32_t flags, uint32_t lkid)
5923 {
5924  struct dlm_lkb *lkb;
5925  struct dlm_args args;
5926  struct dlm_user_args *ua;
5927  int error;
5928 
5929  dlm_lock_recovery(ls);
5930 
5931  error = find_lkb(ls, lkid, &lkb);
5932  if (error)
5933  goto out;
5934 
5935  ua = lkb->lkb_ua;
5936  if (ua_tmp->castparam)
5937  ua->castparam = ua_tmp->castparam;
5938  ua->user_lksb = ua_tmp->user_lksb;
5939 
5940  error = set_unlock_args(flags, ua, &args);
5941  if (error)
5942  goto out_put;
5943 
5944  error = cancel_lock(ls, lkb, &args);
5945 
5946  if (error == -DLM_ECANCEL)
5947  error = 0;
5948  /* from validate_unlock_args() */
5949  if (error == -EBUSY)
5950  error = 0;
5951  out_put:
5952  dlm_put_lkb(lkb);
5953  out:
5954  dlm_unlock_recovery(ls);
5955  kfree(ua_tmp);
5956  return error;
5957 }
5958 
5959 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5960 {
5961  struct dlm_lkb *lkb;
5962  struct dlm_args args;
5963  struct dlm_user_args *ua;
5964  struct dlm_rsb *r;
5965  int error;
5966 
5967  dlm_lock_recovery(ls);
5968 
5969  error = find_lkb(ls, lkid, &lkb);
5970  if (error)
5971  goto out;
5972 
5973  ua = lkb->lkb_ua;
5974 
5975  error = set_unlock_args(flags, ua, &args);
5976  if (error)
5977  goto out_put;
5978 
5979  /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5980 
5981  r = lkb->lkb_resource;
5982  hold_rsb(r);
5983  lock_rsb(r);
5984 
5985  error = validate_unlock_args(lkb, &args);
5986  if (error)
5987  goto out_r;
5989 
5990  error = _cancel_lock(r, lkb);
5991  out_r:
5992  unlock_rsb(r);
5993  put_rsb(r);
5994 
5995  if (error == -DLM_ECANCEL)
5996  error = 0;
5997  /* from validate_unlock_args() */
5998  if (error == -EBUSY)
5999  error = 0;
6000  out_put:
6001  dlm_put_lkb(lkb);
6002  out:
6003  dlm_unlock_recovery(ls);
6004  return error;
6005 }
6006 
6007 /* lkb's that are removed from the waiters list by revert are just left on the
6008  orphans list with the granted orphan locks, to be freed by purge */
6009 
6010 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6011 {
6012  struct dlm_args args;
6013  int error;
6014 
6015  hold_lkb(lkb);
6017  list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6019 
6020  set_unlock_args(0, lkb->lkb_ua, &args);
6021 
6022  error = cancel_lock(ls, lkb, &args);
6023  if (error == -DLM_ECANCEL)
6024  error = 0;
6025  return error;
6026 }
6027 
6028 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
6029  Regardless of what rsb queue the lock is on, it's removed and freed. */
6030 
6031 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6032 {
6033  struct dlm_args args;
6034  int error;
6035 
6036  set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
6037 
6038  error = unlock_lock(ls, lkb, &args);
6039  if (error == -DLM_EUNLOCK)
6040  error = 0;
6041  return error;
6042 }
6043 
6044 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6045  (which does lock_rsb) due to deadlock with receiving a message that does
6046  lock_rsb followed by dlm_user_add_cb() */
6047 
6048 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6049  struct dlm_user_proc *proc)
6050 {
6051  struct dlm_lkb *lkb = NULL;
6052 
6054  if (list_empty(&proc->locks))
6055  goto out;
6056 
6057  lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6058  list_del_init(&lkb->lkb_ownqueue);
6059 
6060  if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6061  lkb->lkb_flags |= DLM_IFL_ORPHAN;
6062  else
6063  lkb->lkb_flags |= DLM_IFL_DEAD;
6064  out:
6066  return lkb;
6067 }
6068 
6069 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6070  1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6071  which we clear here. */
6072 
6073 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6074  list, and no more device_writes should add lkb's to proc->locks list; so we
6075  shouldn't need to take asts_spin or locks_spin here. this assumes that
6076  device reads/writes/closes are serialized -- FIXME: we may need to serialize
6077  them ourself. */
6078 
6079 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6080 {
6081  struct dlm_lkb *lkb, *safe;
6082 
6083  dlm_lock_recovery(ls);
6084 
6085  while (1) {
6086  lkb = del_proc_lock(ls, proc);
6087  if (!lkb)
6088  break;
6089  del_timeout(lkb);
6090  if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6091  orphan_proc_lock(ls, lkb);
6092  else
6093  unlock_proc_lock(ls, lkb);
6094 
6095  /* this removes the reference for the proc->locks list
6096  added by dlm_user_request, it may result in the lkb
6097  being freed */
6098 
6099  dlm_put_lkb(lkb);
6100  }
6101 
6103 
6104  /* in-progress unlocks */
6105  list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6106  list_del_init(&lkb->lkb_ownqueue);
6107  lkb->lkb_flags |= DLM_IFL_DEAD;
6108  dlm_put_lkb(lkb);
6109  }
6110 
6111  list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6112  memset(&lkb->lkb_callbacks, 0,
6113  sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6114  list_del_init(&lkb->lkb_cb_list);
6115  dlm_put_lkb(lkb);
6116  }
6117 
6119  dlm_unlock_recovery(ls);
6120 }
6121 
6122 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6123 {
6124  struct dlm_lkb *lkb, *safe;
6125 
6126  while (1) {
6127  lkb = NULL;
6128  spin_lock(&proc->locks_spin);
6129  if (!list_empty(&proc->locks)) {
6130  lkb = list_entry(proc->locks.next, struct dlm_lkb,
6131  lkb_ownqueue);
6132  list_del_init(&lkb->lkb_ownqueue);
6133  }
6134  spin_unlock(&proc->locks_spin);
6135 
6136  if (!lkb)
6137  break;
6138 
6139  lkb->lkb_flags |= DLM_IFL_DEAD;
6140  unlock_proc_lock(ls, lkb);
6141  dlm_put_lkb(lkb); /* ref from proc->locks list */
6142  }
6143 
6144  spin_lock(&proc->locks_spin);
6145  list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6146  list_del_init(&lkb->lkb_ownqueue);
6147  lkb->lkb_flags |= DLM_IFL_DEAD;
6148  dlm_put_lkb(lkb);
6149  }
6150  spin_unlock(&proc->locks_spin);
6151 
6152  spin_lock(&proc->asts_spin);
6153  list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6154  memset(&lkb->lkb_callbacks, 0,
6155  sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6156  list_del_init(&lkb->lkb_cb_list);
6157  dlm_put_lkb(lkb);
6158  }
6159  spin_unlock(&proc->asts_spin);
6160 }
6161 
6162 /* pid of 0 means purge all orphans */
6163 
6164 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6165 {
6166  struct dlm_lkb *lkb, *safe;
6167 
6170  if (pid && lkb->lkb_ownpid != pid)
6171  continue;
6172  unlock_proc_lock(ls, lkb);
6173  list_del_init(&lkb->lkb_ownqueue);
6174  dlm_put_lkb(lkb);
6175  }
6177 }
6178 
6179 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6180 {
6181  struct dlm_message *ms;
6182  struct dlm_mhandle *mh;
6183  int error;
6184 
6185  error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6186  DLM_MSG_PURGE, &ms, &mh);
6187  if (error)
6188  return error;
6189  ms->m_nodeid = nodeid;
6190  ms->m_pid = pid;
6191 
6192  return send_message(mh, ms);
6193 }
6194 
6195 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6196  int nodeid, int pid)
6197 {
6198  int error = 0;
6199 
6200  if (nodeid != dlm_our_nodeid()) {
6201  error = send_purge(ls, nodeid, pid);
6202  } else {
6203  dlm_lock_recovery(ls);
6204  if (pid == current->pid)
6205  purge_proc_locks(ls, proc);
6206  else
6207  do_purge(ls, nodeid, pid);
6208  dlm_unlock_recovery(ls);
6209  }
6210  return error;
6211 }
6212