Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
mds_client.c
Go to the documentation of this file.
2 
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/sched.h>
7 #include <linux/debugfs.h>
8 #include <linux/seq_file.h>
9 
10 #include "super.h"
11 #include "mds_client.h"
12 
14 #include <linux/ceph/messenger.h>
15 #include <linux/ceph/decode.h>
16 #include <linux/ceph/pagelist.h>
17 #include <linux/ceph/auth.h>
18 #include <linux/ceph/debugfs.h>
19 
20 /*
21  * A cluster of MDS (metadata server) daemons is responsible for
22  * managing the file system namespace (the directory hierarchy and
23  * inodes) and for coordinating shared access to storage. Metadata is
24  * partitioning hierarchically across a number of servers, and that
25  * partition varies over time as the cluster adjusts the distribution
26  * in order to balance load.
27  *
28  * The MDS client is primarily responsible to managing synchronous
29  * metadata requests for operations like open, unlink, and so forth.
30  * If there is a MDS failure, we find out about it when we (possibly
31  * request and) receive a new MDS map, and can resubmit affected
32  * requests.
33  *
34  * For the most part, though, we take advantage of a lossless
35  * communications channel to the MDS, and do not need to worry about
36  * timing out or resubmitting requests.
37  *
38  * We maintain a stateful "session" with each MDS we interact with.
39  * Within each session, we sent periodic heartbeat messages to ensure
40  * any capabilities or leases we have been issues remain valid. If
41  * the session times out and goes stale, our leases and capabilities
42  * are no longer valid.
43  */
44 
47  bool flock;
48 };
49 
50 static void __wake_requests(struct ceph_mds_client *mdsc,
51  struct list_head *head);
52 
53 static const struct ceph_connection_operations mds_con_ops;
54 
55 
56 /*
57  * mds reply parsing
58  */
59 
60 /*
61  * parse individual inode info
62  */
63 static int parse_reply_info_in(void **p, void *end,
65  int features)
66 {
67  int err = -EIO;
68 
69  info->in = *p;
70  *p += sizeof(struct ceph_mds_reply_inode) +
71  sizeof(*info->in->fragtree.splits) *
72  le32_to_cpu(info->in->fragtree.nsplits);
73 
74  ceph_decode_32_safe(p, end, info->symlink_len, bad);
75  ceph_decode_need(p, end, info->symlink_len, bad);
76  info->symlink = *p;
77  *p += info->symlink_len;
78 
79  if (features & CEPH_FEATURE_DIRLAYOUTHASH)
80  ceph_decode_copy_safe(p, end, &info->dir_layout,
81  sizeof(info->dir_layout), bad);
82  else
83  memset(&info->dir_layout, 0, sizeof(info->dir_layout));
84 
85  ceph_decode_32_safe(p, end, info->xattr_len, bad);
86  ceph_decode_need(p, end, info->xattr_len, bad);
87  info->xattr_data = *p;
88  *p += info->xattr_len;
89  return 0;
90 bad:
91  return err;
92 }
93 
94 /*
95  * parse a normal reply, which may contain a (dir+)dentry and/or a
96  * target inode.
97  */
98 static int parse_reply_info_trace(void **p, void *end,
99  struct ceph_mds_reply_info_parsed *info,
100  int features)
101 {
102  int err;
103 
104  if (info->head->is_dentry) {
105  err = parse_reply_info_in(p, end, &info->diri, features);
106  if (err < 0)
107  goto out_bad;
108 
109  if (unlikely(*p + sizeof(*info->dirfrag) > end))
110  goto bad;
111  info->dirfrag = *p;
112  *p += sizeof(*info->dirfrag) +
113  sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
114  if (unlikely(*p > end))
115  goto bad;
116 
117  ceph_decode_32_safe(p, end, info->dname_len, bad);
118  ceph_decode_need(p, end, info->dname_len, bad);
119  info->dname = *p;
120  *p += info->dname_len;
121  info->dlease = *p;
122  *p += sizeof(*info->dlease);
123  }
124 
125  if (info->head->is_target) {
126  err = parse_reply_info_in(p, end, &info->targeti, features);
127  if (err < 0)
128  goto out_bad;
129  }
130 
131  if (unlikely(*p != end))
132  goto bad;
133  return 0;
134 
135 bad:
136  err = -EIO;
137 out_bad:
138  pr_err("problem parsing mds trace %d\n", err);
139  return err;
140 }
141 
142 /*
143  * parse readdir results
144  */
145 static int parse_reply_info_dir(void **p, void *end,
146  struct ceph_mds_reply_info_parsed *info,
147  int features)
148 {
149  u32 num, i = 0;
150  int err;
151 
152  info->dir_dir = *p;
153  if (*p + sizeof(*info->dir_dir) > end)
154  goto bad;
155  *p += sizeof(*info->dir_dir) +
156  sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
157  if (*p > end)
158  goto bad;
159 
160  ceph_decode_need(p, end, sizeof(num) + 2, bad);
161  num = ceph_decode_32(p);
162  info->dir_end = ceph_decode_8(p);
163  info->dir_complete = ceph_decode_8(p);
164  if (num == 0)
165  goto done;
166 
167  /* alloc large array */
168  info->dir_nr = num;
169  info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
170  sizeof(*info->dir_dname) +
171  sizeof(*info->dir_dname_len) +
172  sizeof(*info->dir_dlease),
173  GFP_NOFS);
174  if (info->dir_in == NULL) {
175  err = -ENOMEM;
176  goto out_bad;
177  }
178  info->dir_dname = (void *)(info->dir_in + num);
179  info->dir_dname_len = (void *)(info->dir_dname + num);
180  info->dir_dlease = (void *)(info->dir_dname_len + num);
181 
182  while (num) {
183  /* dentry */
184  ceph_decode_need(p, end, sizeof(u32)*2, bad);
185  info->dir_dname_len[i] = ceph_decode_32(p);
186  ceph_decode_need(p, end, info->dir_dname_len[i], bad);
187  info->dir_dname[i] = *p;
188  *p += info->dir_dname_len[i];
189  dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
190  info->dir_dname[i]);
191  info->dir_dlease[i] = *p;
192  *p += sizeof(struct ceph_mds_reply_lease);
193 
194  /* inode */
195  err = parse_reply_info_in(p, end, &info->dir_in[i], features);
196  if (err < 0)
197  goto out_bad;
198  i++;
199  num--;
200  }
201 
202 done:
203  if (*p != end)
204  goto bad;
205  return 0;
206 
207 bad:
208  err = -EIO;
209 out_bad:
210  pr_err("problem parsing dir contents %d\n", err);
211  return err;
212 }
213 
214 /*
215  * parse fcntl F_GETLK results
216  */
217 static int parse_reply_info_filelock(void **p, void *end,
218  struct ceph_mds_reply_info_parsed *info,
219  int features)
220 {
221  if (*p + sizeof(*info->filelock_reply) > end)
222  goto bad;
223 
224  info->filelock_reply = *p;
225  *p += sizeof(*info->filelock_reply);
226 
227  if (unlikely(*p != end))
228  goto bad;
229  return 0;
230 
231 bad:
232  return -EIO;
233 }
234 
235 /*
236  * parse extra results
237  */
238 static int parse_reply_info_extra(void **p, void *end,
239  struct ceph_mds_reply_info_parsed *info,
240  int features)
241 {
242  if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
243  return parse_reply_info_filelock(p, end, info, features);
244  else
245  return parse_reply_info_dir(p, end, info, features);
246 }
247 
248 /*
249  * parse entire mds reply
250  */
251 static int parse_reply_info(struct ceph_msg *msg,
252  struct ceph_mds_reply_info_parsed *info,
253  int features)
254 {
255  void *p, *end;
256  u32 len;
257  int err;
258 
259  info->head = msg->front.iov_base;
260  p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
261  end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
262 
263  /* trace */
264  ceph_decode_32_safe(&p, end, len, bad);
265  if (len > 0) {
266  ceph_decode_need(&p, end, len, bad);
267  err = parse_reply_info_trace(&p, p+len, info, features);
268  if (err < 0)
269  goto out_bad;
270  }
271 
272  /* extra */
273  ceph_decode_32_safe(&p, end, len, bad);
274  if (len > 0) {
275  ceph_decode_need(&p, end, len, bad);
276  err = parse_reply_info_extra(&p, p+len, info, features);
277  if (err < 0)
278  goto out_bad;
279  }
280 
281  /* snap blob */
282  ceph_decode_32_safe(&p, end, len, bad);
283  info->snapblob_len = len;
284  info->snapblob = p;
285  p += len;
286 
287  if (p != end)
288  goto bad;
289  return 0;
290 
291 bad:
292  err = -EIO;
293 out_bad:
294  pr_err("mds parse_reply err %d\n", err);
295  return err;
296 }
297 
298 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
299 {
300  kfree(info->dir_in);
301 }
302 
303 
304 /*
305  * sessions
306  */
307 static const char *session_state_name(int s)
308 {
309  switch (s) {
310  case CEPH_MDS_SESSION_NEW: return "new";
311  case CEPH_MDS_SESSION_OPENING: return "opening";
312  case CEPH_MDS_SESSION_OPEN: return "open";
313  case CEPH_MDS_SESSION_HUNG: return "hung";
314  case CEPH_MDS_SESSION_CLOSING: return "closing";
315  case CEPH_MDS_SESSION_RESTARTING: return "restarting";
316  case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
317  default: return "???";
318  }
319 }
320 
321 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
322 {
323  if (atomic_inc_not_zero(&s->s_ref)) {
324  dout("mdsc get_session %p %d -> %d\n", s,
325  atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
326  return s;
327  } else {
328  dout("mdsc get_session %p 0 -- FAIL", s);
329  return NULL;
330  }
331 }
332 
334 {
335  dout("mdsc put_session %p %d -> %d\n", s,
336  atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
337  if (atomic_dec_and_test(&s->s_ref)) {
338  if (s->s_auth.authorizer)
339  s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
340  s->s_mdsc->fsc->client->monc.auth,
341  s->s_auth.authorizer);
342  kfree(s);
343  }
344 }
345 
346 /*
347  * called under mdsc->mutex
348  */
350  int mds)
351 {
352  struct ceph_mds_session *session;
353 
354  if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
355  return NULL;
356  session = mdsc->sessions[mds];
357  dout("lookup_mds_session %p %d\n", session,
358  atomic_read(&session->s_ref));
359  get_session(session);
360  return session;
361 }
362 
363 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
364 {
365  if (mds >= mdsc->max_sessions)
366  return false;
367  return mdsc->sessions[mds];
368 }
369 
370 static int __verify_registered_session(struct ceph_mds_client *mdsc,
371  struct ceph_mds_session *s)
372 {
373  if (s->s_mds >= mdsc->max_sessions ||
374  mdsc->sessions[s->s_mds] != s)
375  return -ENOENT;
376  return 0;
377 }
378 
379 /*
380  * create+register a new session for given mds.
381  * called under mdsc->mutex.
382  */
383 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
384  int mds)
385 {
386  struct ceph_mds_session *s;
387 
388  s = kzalloc(sizeof(*s), GFP_NOFS);
389  if (!s)
390  return ERR_PTR(-ENOMEM);
391  s->s_mdsc = mdsc;
392  s->s_mds = mds;
394  s->s_ttl = 0;
395  s->s_seq = 0;
396  mutex_init(&s->s_mutex);
397 
398  ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
399 
401  s->s_cap_gen = 0;
402  s->s_cap_ttl = jiffies - 1;
403 
405  s->s_renew_requested = 0;
406  s->s_renew_seq = 0;
407  INIT_LIST_HEAD(&s->s_caps);
408  s->s_nr_caps = 0;
409  s->s_trim_caps = 0;
410  atomic_set(&s->s_ref, 1);
411  INIT_LIST_HEAD(&s->s_waiting);
412  INIT_LIST_HEAD(&s->s_unsafe);
413  s->s_num_cap_releases = 0;
414  s->s_cap_iterator = NULL;
415  INIT_LIST_HEAD(&s->s_cap_releases);
416  INIT_LIST_HEAD(&s->s_cap_releases_done);
417  INIT_LIST_HEAD(&s->s_cap_flushing);
418  INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
419 
420  dout("register_session mds%d\n", mds);
421  if (mds >= mdsc->max_sessions) {
422  int newmax = 1 << get_count_order(mds+1);
423  struct ceph_mds_session **sa;
424 
425  dout("register_session realloc to %d\n", newmax);
426  sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
427  if (sa == NULL)
428  goto fail_realloc;
429  if (mdsc->sessions) {
430  memcpy(sa, mdsc->sessions,
431  mdsc->max_sessions * sizeof(void *));
432  kfree(mdsc->sessions);
433  }
434  mdsc->sessions = sa;
435  mdsc->max_sessions = newmax;
436  }
437  mdsc->sessions[mds] = s;
438  atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
439 
441  ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
442 
443  return s;
444 
445 fail_realloc:
446  kfree(s);
447  return ERR_PTR(-ENOMEM);
448 }
449 
450 /*
451  * called under mdsc->mutex
452  */
453 static void __unregister_session(struct ceph_mds_client *mdsc,
454  struct ceph_mds_session *s)
455 {
456  dout("__unregister_session mds%d %p\n", s->s_mds, s);
457  BUG_ON(mdsc->sessions[s->s_mds] != s);
458  mdsc->sessions[s->s_mds] = NULL;
459  ceph_con_close(&s->s_con);
461 }
462 
463 /*
464  * drop session refs in request.
465  *
466  * should be last request ref, or hold mdsc->mutex
467  */
468 static void put_request_session(struct ceph_mds_request *req)
469 {
470  if (req->r_session) {
472  req->r_session = NULL;
473  }
474 }
475 
477 {
478  struct ceph_mds_request *req = container_of(kref,
479  struct ceph_mds_request,
480  r_kref);
481  if (req->r_request)
482  ceph_msg_put(req->r_request);
483  if (req->r_reply) {
484  ceph_msg_put(req->r_reply);
485  destroy_reply_info(&req->r_reply_info);
486  }
487  if (req->r_inode) {
488  ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
489  iput(req->r_inode);
490  }
491  if (req->r_locked_dir)
492  ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
493  if (req->r_target_inode)
494  iput(req->r_target_inode);
495  if (req->r_dentry)
496  dput(req->r_dentry);
497  if (req->r_old_dentry) {
498  /*
499  * track (and drop pins for) r_old_dentry_dir
500  * separately, since r_old_dentry's d_parent may have
501  * changed between the dir mutex being dropped and
502  * this request being freed.
503  */
504  ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
505  CEPH_CAP_PIN);
506  dput(req->r_old_dentry);
507  iput(req->r_old_dentry_dir);
508  }
509  kfree(req->r_path1);
510  kfree(req->r_path2);
511  put_request_session(req);
513  kfree(req);
514 }
515 
516 /*
517  * lookup session, bump ref if found.
518  *
519  * called under mdsc->mutex.
520  */
521 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
522  u64 tid)
523 {
524  struct ceph_mds_request *req;
525  struct rb_node *n = mdsc->request_tree.rb_node;
526 
527  while (n) {
528  req = rb_entry(n, struct ceph_mds_request, r_node);
529  if (tid < req->r_tid)
530  n = n->rb_left;
531  else if (tid > req->r_tid)
532  n = n->rb_right;
533  else {
534  ceph_mdsc_get_request(req);
535  return req;
536  }
537  }
538  return NULL;
539 }
540 
541 static void __insert_request(struct ceph_mds_client *mdsc,
542  struct ceph_mds_request *new)
543 {
544  struct rb_node **p = &mdsc->request_tree.rb_node;
545  struct rb_node *parent = NULL;
546  struct ceph_mds_request *req = NULL;
547 
548  while (*p) {
549  parent = *p;
550  req = rb_entry(parent, struct ceph_mds_request, r_node);
551  if (new->r_tid < req->r_tid)
552  p = &(*p)->rb_left;
553  else if (new->r_tid > req->r_tid)
554  p = &(*p)->rb_right;
555  else
556  BUG();
557  }
558 
559  rb_link_node(&new->r_node, parent, p);
560  rb_insert_color(&new->r_node, &mdsc->request_tree);
561 }
562 
563 /*
564  * Register an in-flight request, and assign a tid. Link to directory
565  * are modifying (if any).
566  *
567  * Called under mdsc->mutex.
568  */
569 static void __register_request(struct ceph_mds_client *mdsc,
570  struct ceph_mds_request *req,
571  struct inode *dir)
572 {
573  req->r_tid = ++mdsc->last_tid;
574  if (req->r_num_caps)
576  req->r_num_caps);
577  dout("__register_request %p tid %lld\n", req, req->r_tid);
578  ceph_mdsc_get_request(req);
579  __insert_request(mdsc, req);
580 
581  req->r_uid = current_fsuid();
582  req->r_gid = current_fsgid();
583 
584  if (dir) {
585  struct ceph_inode_info *ci = ceph_inode(dir);
586 
587  ihold(dir);
588  spin_lock(&ci->i_unsafe_lock);
589  req->r_unsafe_dir = dir;
591  spin_unlock(&ci->i_unsafe_lock);
592  }
593 }
594 
595 static void __unregister_request(struct ceph_mds_client *mdsc,
596  struct ceph_mds_request *req)
597 {
598  dout("__unregister_request %p tid %lld\n", req, req->r_tid);
599  rb_erase(&req->r_node, &mdsc->request_tree);
600  RB_CLEAR_NODE(&req->r_node);
601 
602  if (req->r_unsafe_dir) {
603  struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
604 
605  spin_lock(&ci->i_unsafe_lock);
606  list_del_init(&req->r_unsafe_dir_item);
607  spin_unlock(&ci->i_unsafe_lock);
608 
609  iput(req->r_unsafe_dir);
610  req->r_unsafe_dir = NULL;
611  }
612 
613  ceph_mdsc_put_request(req);
614 }
615 
616 /*
617  * Choose mds to send request to next. If there is a hint set in the
618  * request (e.g., due to a prior forward hint from the mds), use that.
619  * Otherwise, consult frag tree and/or caps to identify the
620  * appropriate mds. If all else fails, choose randomly.
621  *
622  * Called under mdsc->mutex.
623  */
624 static struct dentry *get_nonsnap_parent(struct dentry *dentry)
625 {
626  /*
627  * we don't need to worry about protecting the d_parent access
628  * here because we never renaming inside the snapped namespace
629  * except to resplice to another snapdir, and either the old or new
630  * result is a valid result.
631  */
632  while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
633  dentry = dentry->d_parent;
634  return dentry;
635 }
636 
637 static int __choose_mds(struct ceph_mds_client *mdsc,
638  struct ceph_mds_request *req)
639 {
640  struct inode *inode;
641  struct ceph_inode_info *ci;
642  struct ceph_cap *cap;
643  int mode = req->r_direct_mode;
644  int mds = -1;
645  u32 hash = req->r_direct_hash;
646  bool is_hash = req->r_direct_is_hash;
647 
648  /*
649  * is there a specific mds we should try? ignore hint if we have
650  * no session and the mds is not up (active or recovering).
651  */
652  if (req->r_resend_mds >= 0 &&
653  (__have_session(mdsc, req->r_resend_mds) ||
654  ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
655  dout("choose_mds using resend_mds mds%d\n",
656  req->r_resend_mds);
657  return req->r_resend_mds;
658  }
659 
660  if (mode == USE_RANDOM_MDS)
661  goto random;
662 
663  inode = NULL;
664  if (req->r_inode) {
665  inode = req->r_inode;
666  } else if (req->r_dentry) {
667  /* ignore race with rename; old or new d_parent is okay */
668  struct dentry *parent = req->r_dentry->d_parent;
669  struct inode *dir = parent->d_inode;
670 
671  if (dir->i_sb != mdsc->fsc->sb) {
672  /* not this fs! */
673  inode = req->r_dentry->d_inode;
674  } else if (ceph_snap(dir) != CEPH_NOSNAP) {
675  /* direct snapped/virtual snapdir requests
676  * based on parent dir inode */
677  struct dentry *dn = get_nonsnap_parent(parent);
678  inode = dn->d_inode;
679  dout("__choose_mds using nonsnap parent %p\n", inode);
680  } else if (req->r_dentry->d_inode) {
681  /* dentry target */
682  inode = req->r_dentry->d_inode;
683  } else {
684  /* dir + name */
685  inode = dir;
686  hash = ceph_dentry_hash(dir, req->r_dentry);
687  is_hash = true;
688  }
689  }
690 
691  dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
692  (int)hash, mode);
693  if (!inode)
694  goto random;
695  ci = ceph_inode(inode);
696 
697  if (is_hash && S_ISDIR(inode->i_mode)) {
698  struct ceph_inode_frag frag;
699  int found;
700 
701  ceph_choose_frag(ci, hash, &frag, &found);
702  if (found) {
703  if (mode == USE_ANY_MDS && frag.ndist > 0) {
704  u8 r;
705 
706  /* choose a random replica */
707  get_random_bytes(&r, 1);
708  r %= frag.ndist;
709  mds = frag.dist[r];
710  dout("choose_mds %p %llx.%llx "
711  "frag %u mds%d (%d/%d)\n",
712  inode, ceph_vinop(inode),
713  frag.frag, mds,
714  (int)r, frag.ndist);
715  if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
717  return mds;
718  }
719 
720  /* since this file/dir wasn't known to be
721  * replicated, then we want to look for the
722  * authoritative mds. */
723  mode = USE_AUTH_MDS;
724  if (frag.mds >= 0) {
725  /* choose auth mds */
726  mds = frag.mds;
727  dout("choose_mds %p %llx.%llx "
728  "frag %u mds%d (auth)\n",
729  inode, ceph_vinop(inode), frag.frag, mds);
730  if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
732  return mds;
733  }
734  }
735  }
736 
737  spin_lock(&ci->i_ceph_lock);
738  cap = NULL;
739  if (mode == USE_AUTH_MDS)
740  cap = ci->i_auth_cap;
741  if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
742  cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
743  if (!cap) {
744  spin_unlock(&ci->i_ceph_lock);
745  goto random;
746  }
747  mds = cap->session->s_mds;
748  dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
749  inode, ceph_vinop(inode), mds,
750  cap == ci->i_auth_cap ? "auth " : "", cap);
751  spin_unlock(&ci->i_ceph_lock);
752  return mds;
753 
754 random:
755  mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
756  dout("choose_mds chose random mds%d\n", mds);
757  return mds;
758 }
759 
760 
761 /*
762  * session messages
763  */
764 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
765 {
766  struct ceph_msg *msg;
767  struct ceph_mds_session_head *h;
768 
769  msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
770  false);
771  if (!msg) {
772  pr_err("create_session_msg ENOMEM creating msg\n");
773  return NULL;
774  }
775  h = msg->front.iov_base;
776  h->op = cpu_to_le32(op);
777  h->seq = cpu_to_le64(seq);
778  return msg;
779 }
780 
781 /*
782  * send session open request.
783  *
784  * called under mdsc->mutex
785  */
786 static int __open_session(struct ceph_mds_client *mdsc,
787  struct ceph_mds_session *session)
788 {
789  struct ceph_msg *msg;
790  int mstate;
791  int mds = session->s_mds;
792 
793  /* wait for mds to go active? */
794  mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
795  dout("open_session to mds%d (%s)\n", mds,
796  ceph_mds_state_name(mstate));
798  session->s_renew_requested = jiffies;
799 
800  /* send connect message */
801  msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
802  if (!msg)
803  return -ENOMEM;
804  ceph_con_send(&session->s_con, msg);
805  return 0;
806 }
807 
808 /*
809  * open sessions for any export targets for the given mds
810  *
811  * called under mdsc->mutex
812  */
813 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
814  struct ceph_mds_session *session)
815 {
816  struct ceph_mds_info *mi;
817  struct ceph_mds_session *ts;
818  int i, mds = session->s_mds;
819  int target;
820 
821  if (mds >= mdsc->mdsmap->m_max_mds)
822  return;
823  mi = &mdsc->mdsmap->m_info[mds];
824  dout("open_export_target_sessions for mds%d (%d targets)\n",
825  session->s_mds, mi->num_export_targets);
826 
827  for (i = 0; i < mi->num_export_targets; i++) {
828  target = mi->export_targets[i];
829  ts = __ceph_lookup_mds_session(mdsc, target);
830  if (!ts) {
831  ts = register_session(mdsc, target);
832  if (IS_ERR(ts))
833  return;
834  }
835  if (session->s_state == CEPH_MDS_SESSION_NEW ||
836  session->s_state == CEPH_MDS_SESSION_CLOSING)
837  __open_session(mdsc, session);
838  else
839  dout(" mds%d target mds%d %p is %s\n", session->s_mds,
840  i, ts, session_state_name(ts->s_state));
842  }
843 }
844 
846  struct ceph_mds_session *session)
847 {
848  mutex_lock(&mdsc->mutex);
849  __open_export_target_sessions(mdsc, session);
850  mutex_unlock(&mdsc->mutex);
851 }
852 
853 /*
854  * session caps
855  */
856 
857 /*
858  * Free preallocated cap messages assigned to this session
859  */
860 static void cleanup_cap_releases(struct ceph_mds_session *session)
861 {
862  struct ceph_msg *msg;
863 
864  spin_lock(&session->s_cap_lock);
865  while (!list_empty(&session->s_cap_releases)) {
866  msg = list_first_entry(&session->s_cap_releases,
867  struct ceph_msg, list_head);
868  list_del_init(&msg->list_head);
869  ceph_msg_put(msg);
870  }
871  while (!list_empty(&session->s_cap_releases_done)) {
872  msg = list_first_entry(&session->s_cap_releases_done,
873  struct ceph_msg, list_head);
874  list_del_init(&msg->list_head);
875  ceph_msg_put(msg);
876  }
877  spin_unlock(&session->s_cap_lock);
878 }
879 
880 /*
881  * Helper to safely iterate over all caps associated with a session, with
882  * special care taken to handle a racing __ceph_remove_cap().
883  *
884  * Caller must hold session s_mutex.
885  */
886 static int iterate_session_caps(struct ceph_mds_session *session,
887  int (*cb)(struct inode *, struct ceph_cap *,
888  void *), void *arg)
889 {
890  struct list_head *p;
891  struct ceph_cap *cap;
892  struct inode *inode, *last_inode = NULL;
893  struct ceph_cap *old_cap = NULL;
894  int ret;
895 
896  dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
897  spin_lock(&session->s_cap_lock);
898  p = session->s_caps.next;
899  while (p != &session->s_caps) {
900  cap = list_entry(p, struct ceph_cap, session_caps);
901  inode = igrab(&cap->ci->vfs_inode);
902  if (!inode) {
903  p = p->next;
904  continue;
905  }
906  session->s_cap_iterator = cap;
907  spin_unlock(&session->s_cap_lock);
908 
909  if (last_inode) {
910  iput(last_inode);
911  last_inode = NULL;
912  }
913  if (old_cap) {
914  ceph_put_cap(session->s_mdsc, old_cap);
915  old_cap = NULL;
916  }
917 
918  ret = cb(inode, cap, arg);
919  last_inode = inode;
920 
921  spin_lock(&session->s_cap_lock);
922  p = p->next;
923  if (cap->ci == NULL) {
924  dout("iterate_session_caps finishing cap %p removal\n",
925  cap);
926  BUG_ON(cap->session != session);
927  list_del_init(&cap->session_caps);
928  session->s_nr_caps--;
929  cap->session = NULL;
930  old_cap = cap; /* put_cap it w/o locks held */
931  }
932  if (ret < 0)
933  goto out;
934  }
935  ret = 0;
936 out:
937  session->s_cap_iterator = NULL;
938  spin_unlock(&session->s_cap_lock);
939 
940  if (last_inode)
941  iput(last_inode);
942  if (old_cap)
943  ceph_put_cap(session->s_mdsc, old_cap);
944 
945  return ret;
946 }
947 
948 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
949  void *arg)
950 {
951  struct ceph_inode_info *ci = ceph_inode(inode);
952  int drop = 0;
953 
954  dout("removing cap %p, ci is %p, inode is %p\n",
955  cap, ci, &ci->vfs_inode);
956  spin_lock(&ci->i_ceph_lock);
957  __ceph_remove_cap(cap);
958  if (!__ceph_is_any_real_caps(ci)) {
959  struct ceph_mds_client *mdsc =
960  ceph_sb_to_client(inode->i_sb)->mdsc;
961 
962  spin_lock(&mdsc->cap_dirty_lock);
963  if (!list_empty(&ci->i_dirty_item)) {
964  pr_info(" dropping dirty %s state for %p %lld\n",
966  inode, ceph_ino(inode));
967  ci->i_dirty_caps = 0;
968  list_del_init(&ci->i_dirty_item);
969  drop = 1;
970  }
971  if (!list_empty(&ci->i_flushing_item)) {
972  pr_info(" dropping dirty+flushing %s state for %p %lld\n",
974  inode, ceph_ino(inode));
975  ci->i_flushing_caps = 0;
976  list_del_init(&ci->i_flushing_item);
977  mdsc->num_cap_flushing--;
978  drop = 1;
979  }
980  if (drop && ci->i_wrbuffer_ref) {
981  pr_info(" dropping dirty data for %p %lld\n",
982  inode, ceph_ino(inode));
983  ci->i_wrbuffer_ref = 0;
984  ci->i_wrbuffer_ref_head = 0;
985  drop++;
986  }
987  spin_unlock(&mdsc->cap_dirty_lock);
988  }
989  spin_unlock(&ci->i_ceph_lock);
990  while (drop--)
991  iput(inode);
992  return 0;
993 }
994 
995 /*
996  * caller must hold session s_mutex
997  */
998 static void remove_session_caps(struct ceph_mds_session *session)
999 {
1000  dout("remove_session_caps on %p\n", session);
1001  iterate_session_caps(session, remove_session_caps_cb, NULL);
1002  BUG_ON(session->s_nr_caps > 0);
1003  BUG_ON(!list_empty(&session->s_cap_flushing));
1004  cleanup_cap_releases(session);
1005 }
1006 
1007 /*
1008  * wake up any threads waiting on this session's caps. if the cap is
1009  * old (didn't get renewed on the client reconnect), remove it now.
1010  *
1011  * caller must hold s_mutex.
1012  */
1013 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1014  void *arg)
1015 {
1016  struct ceph_inode_info *ci = ceph_inode(inode);
1017 
1018  wake_up_all(&ci->i_cap_wq);
1019  if (arg) {
1020  spin_lock(&ci->i_ceph_lock);
1021  ci->i_wanted_max_size = 0;
1022  ci->i_requested_max_size = 0;
1023  spin_unlock(&ci->i_ceph_lock);
1024  }
1025  return 0;
1026 }
1027 
1028 static void wake_up_session_caps(struct ceph_mds_session *session,
1029  int reconnect)
1030 {
1031  dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1032  iterate_session_caps(session, wake_up_session_cb,
1033  (void *)(unsigned long)reconnect);
1034 }
1035 
1036 /*
1037  * Send periodic message to MDS renewing all currently held caps. The
1038  * ack will reset the expiration for all caps from this session.
1039  *
1040  * caller holds s_mutex
1041  */
1042 static int send_renew_caps(struct ceph_mds_client *mdsc,
1043  struct ceph_mds_session *session)
1044 {
1045  struct ceph_msg *msg;
1046  int state;
1047 
1048  if (time_after_eq(jiffies, session->s_cap_ttl) &&
1049  time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1050  pr_info("mds%d caps stale\n", session->s_mds);
1051  session->s_renew_requested = jiffies;
1052 
1053  /* do not try to renew caps until a recovering mds has reconnected
1054  * with its clients. */
1055  state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1056  if (state < CEPH_MDS_STATE_RECONNECT) {
1057  dout("send_renew_caps ignoring mds%d (%s)\n",
1058  session->s_mds, ceph_mds_state_name(state));
1059  return 0;
1060  }
1061 
1062  dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1063  ceph_mds_state_name(state));
1064  msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1065  ++session->s_renew_seq);
1066  if (!msg)
1067  return -ENOMEM;
1068  ceph_con_send(&session->s_con, msg);
1069  return 0;
1070 }
1071 
1072 /*
1073  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1074  *
1075  * Called under session->s_mutex
1076  */
1077 static void renewed_caps(struct ceph_mds_client *mdsc,
1078  struct ceph_mds_session *session, int is_renew)
1079 {
1080  int was_stale;
1081  int wake = 0;
1082 
1083  spin_lock(&session->s_cap_lock);
1084  was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1085 
1086  session->s_cap_ttl = session->s_renew_requested +
1087  mdsc->mdsmap->m_session_timeout*HZ;
1088 
1089  if (was_stale) {
1090  if (time_before(jiffies, session->s_cap_ttl)) {
1091  pr_info("mds%d caps renewed\n", session->s_mds);
1092  wake = 1;
1093  } else {
1094  pr_info("mds%d caps still stale\n", session->s_mds);
1095  }
1096  }
1097  dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1098  session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1099  time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1100  spin_unlock(&session->s_cap_lock);
1101 
1102  if (wake)
1103  wake_up_session_caps(session, 0);
1104 }
1105 
1106 /*
1107  * send a session close request
1108  */
1109 static int request_close_session(struct ceph_mds_client *mdsc,
1110  struct ceph_mds_session *session)
1111 {
1112  struct ceph_msg *msg;
1113 
1114  dout("request_close_session mds%d state %s seq %lld\n",
1115  session->s_mds, session_state_name(session->s_state),
1116  session->s_seq);
1117  msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1118  if (!msg)
1119  return -ENOMEM;
1120  ceph_con_send(&session->s_con, msg);
1121  return 0;
1122 }
1123 
1124 /*
1125  * Called with s_mutex held.
1126  */
1127 static int __close_session(struct ceph_mds_client *mdsc,
1128  struct ceph_mds_session *session)
1129 {
1130  if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1131  return 0;
1132  session->s_state = CEPH_MDS_SESSION_CLOSING;
1133  return request_close_session(mdsc, session);
1134 }
1135 
1136 /*
1137  * Trim old(er) caps.
1138  *
1139  * Because we can't cache an inode without one or more caps, we do
1140  * this indirectly: if a cap is unused, we prune its aliases, at which
1141  * point the inode will hopefully get dropped to.
1142  *
1143  * Yes, this is a bit sloppy. Our only real goal here is to respond to
1144  * memory pressure from the MDS, though, so it needn't be perfect.
1145  */
1146 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1147 {
1148  struct ceph_mds_session *session = arg;
1149  struct ceph_inode_info *ci = ceph_inode(inode);
1150  int used, oissued, mine;
1151 
1152  if (session->s_trim_caps <= 0)
1153  return -1;
1154 
1155  spin_lock(&ci->i_ceph_lock);
1156  mine = cap->issued | cap->implemented;
1157  used = __ceph_caps_used(ci);
1158  oissued = __ceph_caps_issued_other(ci, cap);
1159 
1160  dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1161  inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1162  ceph_cap_string(used));
1163  if (ci->i_dirty_caps)
1164  goto out; /* dirty caps */
1165  if ((used & ~oissued) & mine)
1166  goto out; /* we need these caps */
1167 
1168  session->s_trim_caps--;
1169  if (oissued) {
1170  /* we aren't the only cap.. just remove us */
1171  __ceph_remove_cap(cap);
1172  } else {
1173  /* try to drop referring dentries */
1174  spin_unlock(&ci->i_ceph_lock);
1175  d_prune_aliases(inode);
1176  dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1177  inode, cap, atomic_read(&inode->i_count));
1178  return 0;
1179  }
1180 
1181 out:
1182  spin_unlock(&ci->i_ceph_lock);
1183  return 0;
1184 }
1185 
1186 /*
1187  * Trim session cap count down to some max number.
1188  */
1189 static int trim_caps(struct ceph_mds_client *mdsc,
1190  struct ceph_mds_session *session,
1191  int max_caps)
1192 {
1193  int trim_caps = session->s_nr_caps - max_caps;
1194 
1195  dout("trim_caps mds%d start: %d / %d, trim %d\n",
1196  session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1197  if (trim_caps > 0) {
1198  session->s_trim_caps = trim_caps;
1199  iterate_session_caps(session, trim_caps_cb, session);
1200  dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1201  session->s_mds, session->s_nr_caps, max_caps,
1202  trim_caps - session->s_trim_caps);
1203  session->s_trim_caps = 0;
1204  }
1205  return 0;
1206 }
1207 
1208 /*
1209  * Allocate cap_release messages. If there is a partially full message
1210  * in the queue, try to allocate enough to cover it's remainder, so that
1211  * we can send it immediately.
1212  *
1213  * Called under s_mutex.
1214  */
1216  struct ceph_mds_session *session)
1217 {
1218  struct ceph_msg *msg, *partial = NULL;
1219  struct ceph_mds_cap_release *head;
1220  int err = -ENOMEM;
1221  int extra = mdsc->fsc->mount_options->cap_release_safety;
1222  int num;
1223 
1224  dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1225  extra);
1226 
1227  spin_lock(&session->s_cap_lock);
1228 
1229  if (!list_empty(&session->s_cap_releases)) {
1230  msg = list_first_entry(&session->s_cap_releases,
1231  struct ceph_msg,
1232  list_head);
1233  head = msg->front.iov_base;
1234  num = le32_to_cpu(head->num);
1235  if (num) {
1236  dout(" partial %p with (%d/%d)\n", msg, num,
1237  (int)CEPH_CAPS_PER_RELEASE);
1238  extra += CEPH_CAPS_PER_RELEASE - num;
1239  partial = msg;
1240  }
1241  }
1242  while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1243  spin_unlock(&session->s_cap_lock);
1245  GFP_NOFS, false);
1246  if (!msg)
1247  goto out_unlocked;
1248  dout("add_cap_releases %p msg %p now %d\n", session, msg,
1249  (int)msg->front.iov_len);
1250  head = msg->front.iov_base;
1251  head->num = cpu_to_le32(0);
1252  msg->front.iov_len = sizeof(*head);
1253  spin_lock(&session->s_cap_lock);
1254  list_add(&msg->list_head, &session->s_cap_releases);
1256  }
1257 
1258  if (partial) {
1259  head = partial->front.iov_base;
1260  num = le32_to_cpu(head->num);
1261  dout(" queueing partial %p with %d/%d\n", partial, num,
1262  (int)CEPH_CAPS_PER_RELEASE);
1263  list_move_tail(&partial->list_head,
1264  &session->s_cap_releases_done);
1265  session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1266  }
1267  err = 0;
1268  spin_unlock(&session->s_cap_lock);
1269 out_unlocked:
1270  return err;
1271 }
1272 
1273 /*
1274  * flush all dirty inode data to disk.
1275  *
1276  * returns true if we've flushed through want_flush_seq
1277  */
1278 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1279 {
1280  int mds, ret = 1;
1281 
1282  dout("check_cap_flush want %lld\n", want_flush_seq);
1283  mutex_lock(&mdsc->mutex);
1284  for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1285  struct ceph_mds_session *session = mdsc->sessions[mds];
1286 
1287  if (!session)
1288  continue;
1289  get_session(session);
1290  mutex_unlock(&mdsc->mutex);
1291 
1292  mutex_lock(&session->s_mutex);
1293  if (!list_empty(&session->s_cap_flushing)) {
1294  struct ceph_inode_info *ci =
1295  list_entry(session->s_cap_flushing.next,
1296  struct ceph_inode_info,
1297  i_flushing_item);
1298  struct inode *inode = &ci->vfs_inode;
1299 
1300  spin_lock(&ci->i_ceph_lock);
1301  if (ci->i_cap_flush_seq <= want_flush_seq) {
1302  dout("check_cap_flush still flushing %p "
1303  "seq %lld <= %lld to mds%d\n", inode,
1304  ci->i_cap_flush_seq, want_flush_seq,
1305  session->s_mds);
1306  ret = 0;
1307  }
1308  spin_unlock(&ci->i_ceph_lock);
1309  }
1310  mutex_unlock(&session->s_mutex);
1311  ceph_put_mds_session(session);
1312 
1313  if (!ret)
1314  return ret;
1315  mutex_lock(&mdsc->mutex);
1316  }
1317 
1318  mutex_unlock(&mdsc->mutex);
1319  dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1320  return ret;
1321 }
1322 
1323 /*
1324  * called under s_mutex
1325  */
1327  struct ceph_mds_session *session)
1328 {
1329  struct ceph_msg *msg;
1330 
1331  dout("send_cap_releases mds%d\n", session->s_mds);
1332  spin_lock(&session->s_cap_lock);
1333  while (!list_empty(&session->s_cap_releases_done)) {
1334  msg = list_first_entry(&session->s_cap_releases_done,
1335  struct ceph_msg, list_head);
1336  list_del_init(&msg->list_head);
1337  spin_unlock(&session->s_cap_lock);
1338  msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1339  dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1340  ceph_con_send(&session->s_con, msg);
1341  spin_lock(&session->s_cap_lock);
1342  }
1343  spin_unlock(&session->s_cap_lock);
1344 }
1345 
1346 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1347  struct ceph_mds_session *session)
1348 {
1349  struct ceph_msg *msg;
1350  struct ceph_mds_cap_release *head;
1351  unsigned num;
1352 
1353  dout("discard_cap_releases mds%d\n", session->s_mds);
1354  spin_lock(&session->s_cap_lock);
1355 
1356  /* zero out the in-progress message */
1357  msg = list_first_entry(&session->s_cap_releases,
1358  struct ceph_msg, list_head);
1359  head = msg->front.iov_base;
1360  num = le32_to_cpu(head->num);
1361  dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1362  head->num = cpu_to_le32(0);
1363  session->s_num_cap_releases += num;
1364 
1365  /* requeue completed messages */
1366  while (!list_empty(&session->s_cap_releases_done)) {
1367  msg = list_first_entry(&session->s_cap_releases_done,
1368  struct ceph_msg, list_head);
1369  list_del_init(&msg->list_head);
1370 
1371  head = msg->front.iov_base;
1372  num = le32_to_cpu(head->num);
1373  dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1374  num);
1375  session->s_num_cap_releases += num;
1376  head->num = cpu_to_le32(0);
1377  msg->front.iov_len = sizeof(*head);
1378  list_add(&msg->list_head, &session->s_cap_releases);
1379  }
1380 
1381  spin_unlock(&session->s_cap_lock);
1382 }
1383 
1384 /*
1385  * requests
1386  */
1387 
1388 /*
1389  * Create an mds request.
1390  */
1391 struct ceph_mds_request *
1392 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1393 {
1394  struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1395 
1396  if (!req)
1397  return ERR_PTR(-ENOMEM);
1398 
1399  mutex_init(&req->r_fill_mutex);
1400  req->r_mdsc = mdsc;
1401  req->r_started = jiffies;
1402  req->r_resend_mds = -1;
1403  INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1404  req->r_fmode = -1;
1405  kref_init(&req->r_kref);
1406  INIT_LIST_HEAD(&req->r_wait);
1407  init_completion(&req->r_completion);
1408  init_completion(&req->r_safe_completion);
1409  INIT_LIST_HEAD(&req->r_unsafe_item);
1410 
1411  req->r_op = op;
1412  req->r_direct_mode = mode;
1413  return req;
1414 }
1415 
1416 /*
1417  * return oldest (lowest) request, tid in request tree, 0 if none.
1418  *
1419  * called under mdsc->mutex.
1420  */
1421 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1422 {
1423  if (RB_EMPTY_ROOT(&mdsc->request_tree))
1424  return NULL;
1425  return rb_entry(rb_first(&mdsc->request_tree),
1426  struct ceph_mds_request, r_node);
1427 }
1428 
1429 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1430 {
1431  struct ceph_mds_request *req = __get_oldest_req(mdsc);
1432 
1433  if (req)
1434  return req->r_tid;
1435  return 0;
1436 }
1437 
1438 /*
1439  * Build a dentry's path. Allocate on heap; caller must kfree. Based
1440  * on build_path_from_dentry in fs/cifs/dir.c.
1441  *
1442  * If @stop_on_nosnap, generate path relative to the first non-snapped
1443  * inode.
1444  *
1445  * Encode hidden .snap dirs as a double /, i.e.
1446  * foo/.snap/bar -> foo//bar
1447  */
1448 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1449  int stop_on_nosnap)
1450 {
1451  struct dentry *temp;
1452  char *path;
1453  int len, pos;
1454  unsigned seq;
1455 
1456  if (dentry == NULL)
1457  return ERR_PTR(-EINVAL);
1458 
1459 retry:
1460  len = 0;
1461  seq = read_seqbegin(&rename_lock);
1462  rcu_read_lock();
1463  for (temp = dentry; !IS_ROOT(temp);) {
1464  struct inode *inode = temp->d_inode;
1465  if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1466  len++; /* slash only */
1467  else if (stop_on_nosnap && inode &&
1468  ceph_snap(inode) == CEPH_NOSNAP)
1469  break;
1470  else
1471  len += 1 + temp->d_name.len;
1472  temp = temp->d_parent;
1473  }
1474  rcu_read_unlock();
1475  if (len)
1476  len--; /* no leading '/' */
1477 
1478  path = kmalloc(len+1, GFP_NOFS);
1479  if (path == NULL)
1480  return ERR_PTR(-ENOMEM);
1481  pos = len;
1482  path[pos] = 0; /* trailing null */
1483  rcu_read_lock();
1484  for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1485  struct inode *inode;
1486 
1487  spin_lock(&temp->d_lock);
1488  inode = temp->d_inode;
1489  if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1490  dout("build_path path+%d: %p SNAPDIR\n",
1491  pos, temp);
1492  } else if (stop_on_nosnap && inode &&
1493  ceph_snap(inode) == CEPH_NOSNAP) {
1494  spin_unlock(&temp->d_lock);
1495  break;
1496  } else {
1497  pos -= temp->d_name.len;
1498  if (pos < 0) {
1499  spin_unlock(&temp->d_lock);
1500  break;
1501  }
1502  strncpy(path + pos, temp->d_name.name,
1503  temp->d_name.len);
1504  }
1505  spin_unlock(&temp->d_lock);
1506  if (pos)
1507  path[--pos] = '/';
1508  temp = temp->d_parent;
1509  }
1510  rcu_read_unlock();
1511  if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1512  pr_err("build_path did not end path lookup where "
1513  "expected, namelen is %d, pos is %d\n", len, pos);
1514  /* presumably this is only possible if racing with a
1515  rename of one of the parent directories (we can not
1516  lock the dentries above us to prevent this, but
1517  retrying should be harmless) */
1518  kfree(path);
1519  goto retry;
1520  }
1521 
1522  *base = ceph_ino(temp->d_inode);
1523  *plen = len;
1524  dout("build_path on %p %d built %llx '%.*s'\n",
1525  dentry, dentry->d_count, *base, len, path);
1526  return path;
1527 }
1528 
1529 static int build_dentry_path(struct dentry *dentry,
1530  const char **ppath, int *ppathlen, u64 *pino,
1531  int *pfreepath)
1532 {
1533  char *path;
1534 
1535  if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1536  *pino = ceph_ino(dentry->d_parent->d_inode);
1537  *ppath = dentry->d_name.name;
1538  *ppathlen = dentry->d_name.len;
1539  return 0;
1540  }
1541  path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1542  if (IS_ERR(path))
1543  return PTR_ERR(path);
1544  *ppath = path;
1545  *pfreepath = 1;
1546  return 0;
1547 }
1548 
1549 static int build_inode_path(struct inode *inode,
1550  const char **ppath, int *ppathlen, u64 *pino,
1551  int *pfreepath)
1552 {
1553  struct dentry *dentry;
1554  char *path;
1555 
1556  if (ceph_snap(inode) == CEPH_NOSNAP) {
1557  *pino = ceph_ino(inode);
1558  *ppathlen = 0;
1559  return 0;
1560  }
1561  dentry = d_find_alias(inode);
1562  path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1563  dput(dentry);
1564  if (IS_ERR(path))
1565  return PTR_ERR(path);
1566  *ppath = path;
1567  *pfreepath = 1;
1568  return 0;
1569 }
1570 
1571 /*
1572  * request arguments may be specified via an inode *, a dentry *, or
1573  * an explicit ino+path.
1574  */
1575 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1576  const char *rpath, u64 rino,
1577  const char **ppath, int *pathlen,
1578  u64 *ino, int *freepath)
1579 {
1580  int r = 0;
1581 
1582  if (rinode) {
1583  r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1584  dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1585  ceph_snap(rinode));
1586  } else if (rdentry) {
1587  r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1588  dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1589  *ppath);
1590  } else if (rpath || rino) {
1591  *ino = rino;
1592  *ppath = rpath;
1593  *pathlen = strlen(rpath);
1594  dout(" path %.*s\n", *pathlen, rpath);
1595  }
1596 
1597  return r;
1598 }
1599 
1600 /*
1601  * called under mdsc->mutex
1602  */
1603 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1604  struct ceph_mds_request *req,
1605  int mds)
1606 {
1607  struct ceph_msg *msg;
1608  struct ceph_mds_request_head *head;
1609  const char *path1 = NULL;
1610  const char *path2 = NULL;
1611  u64 ino1 = 0, ino2 = 0;
1612  int pathlen1 = 0, pathlen2 = 0;
1613  int freepath1 = 0, freepath2 = 0;
1614  int len;
1615  u16 releases;
1616  void *p, *end;
1617  int ret;
1618 
1619  ret = set_request_path_attr(req->r_inode, req->r_dentry,
1620  req->r_path1, req->r_ino1.ino,
1621  &path1, &pathlen1, &ino1, &freepath1);
1622  if (ret < 0) {
1623  msg = ERR_PTR(ret);
1624  goto out;
1625  }
1626 
1627  ret = set_request_path_attr(NULL, req->r_old_dentry,
1628  req->r_path2, req->r_ino2.ino,
1629  &path2, &pathlen2, &ino2, &freepath2);
1630  if (ret < 0) {
1631  msg = ERR_PTR(ret);
1632  goto out_free1;
1633  }
1634 
1635  len = sizeof(*head) +
1636  pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1637 
1638  /* calculate (max) length for cap releases */
1639  len += sizeof(struct ceph_mds_request_release) *
1640  (!!req->r_inode_drop + !!req->r_dentry_drop +
1641  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1642  if (req->r_dentry_drop)
1643  len += req->r_dentry->d_name.len;
1644  if (req->r_old_dentry_drop)
1645  len += req->r_old_dentry->d_name.len;
1646 
1647  msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1648  if (!msg) {
1649  msg = ERR_PTR(-ENOMEM);
1650  goto out_free2;
1651  }
1652 
1653  msg->hdr.tid = cpu_to_le64(req->r_tid);
1654 
1655  head = msg->front.iov_base;
1656  p = msg->front.iov_base + sizeof(*head);
1657  end = msg->front.iov_base + msg->front.iov_len;
1658 
1659  head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1660  head->op = cpu_to_le32(req->r_op);
1661  head->caller_uid = cpu_to_le32(req->r_uid);
1662  head->caller_gid = cpu_to_le32(req->r_gid);
1663  head->args = req->r_args;
1664 
1665  ceph_encode_filepath(&p, end, ino1, path1);
1666  ceph_encode_filepath(&p, end, ino2, path2);
1667 
1668  /* make note of release offset, in case we need to replay */
1669  req->r_request_release_offset = p - msg->front.iov_base;
1670 
1671  /* cap releases */
1672  releases = 0;
1673  if (req->r_inode_drop)
1674  releases += ceph_encode_inode_release(&p,
1675  req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1676  mds, req->r_inode_drop, req->r_inode_unless, 0);
1677  if (req->r_dentry_drop)
1678  releases += ceph_encode_dentry_release(&p, req->r_dentry,
1679  mds, req->r_dentry_drop, req->r_dentry_unless);
1680  if (req->r_old_dentry_drop)
1681  releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1682  mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1683  if (req->r_old_inode_drop)
1684  releases += ceph_encode_inode_release(&p,
1685  req->r_old_dentry->d_inode,
1686  mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1687  head->num_releases = cpu_to_le16(releases);
1688 
1689  BUG_ON(p > end);
1690  msg->front.iov_len = p - msg->front.iov_base;
1691  msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1692 
1693  msg->pages = req->r_pages;
1694  msg->nr_pages = req->r_num_pages;
1695  msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1696  msg->hdr.data_off = cpu_to_le16(0);
1697 
1698 out_free2:
1699  if (freepath2)
1700  kfree((char *)path2);
1701 out_free1:
1702  if (freepath1)
1703  kfree((char *)path1);
1704 out:
1705  return msg;
1706 }
1707 
1708 /*
1709  * called under mdsc->mutex if error, under no mutex if
1710  * success.
1711  */
1712 static void complete_request(struct ceph_mds_client *mdsc,
1713  struct ceph_mds_request *req)
1714 {
1715  if (req->r_callback)
1716  req->r_callback(mdsc, req);
1717  else
1718  complete_all(&req->r_completion);
1719 }
1720 
1721 /*
1722  * called under mdsc->mutex
1723  */
1724 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1725  struct ceph_mds_request *req,
1726  int mds)
1727 {
1728  struct ceph_mds_request_head *rhead;
1729  struct ceph_msg *msg;
1730  int flags = 0;
1731 
1732  req->r_attempts++;
1733  if (req->r_inode) {
1734  struct ceph_cap *cap =
1735  ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1736 
1737  if (cap)
1738  req->r_sent_on_mseq = cap->mseq;
1739  else
1740  req->r_sent_on_mseq = -1;
1741  }
1742  dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1743  req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1744 
1745  if (req->r_got_unsafe) {
1746  /*
1747  * Replay. Do not regenerate message (and rebuild
1748  * paths, etc.); just use the original message.
1749  * Rebuilding paths will break for renames because
1750  * d_move mangles the src name.
1751  */
1752  msg = req->r_request;
1753  rhead = msg->front.iov_base;
1754 
1755  flags = le32_to_cpu(rhead->flags);
1756  flags |= CEPH_MDS_FLAG_REPLAY;
1757  rhead->flags = cpu_to_le32(flags);
1758 
1759  if (req->r_target_inode)
1760  rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1761 
1762  rhead->num_retry = req->r_attempts - 1;
1763 
1764  /* remove cap/dentry releases from message */
1765  rhead->num_releases = 0;
1766  msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1767  msg->front.iov_len = req->r_request_release_offset;
1768  return 0;
1769  }
1770 
1771  if (req->r_request) {
1772  ceph_msg_put(req->r_request);
1773  req->r_request = NULL;
1774  }
1775  msg = create_request_message(mdsc, req, mds);
1776  if (IS_ERR(msg)) {
1777  req->r_err = PTR_ERR(msg);
1778  complete_request(mdsc, req);
1779  return PTR_ERR(msg);
1780  }
1781  req->r_request = msg;
1782 
1783  rhead = msg->front.iov_base;
1784  rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1785  if (req->r_got_unsafe)
1786  flags |= CEPH_MDS_FLAG_REPLAY;
1787  if (req->r_locked_dir)
1788  flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1789  rhead->flags = cpu_to_le32(flags);
1790  rhead->num_fwd = req->r_num_fwd;
1791  rhead->num_retry = req->r_attempts - 1;
1792  rhead->ino = 0;
1793 
1794  dout(" r_locked_dir = %p\n", req->r_locked_dir);
1795  return 0;
1796 }
1797 
1798 /*
1799  * send request, or put it on the appropriate wait list.
1800  */
1801 static int __do_request(struct ceph_mds_client *mdsc,
1802  struct ceph_mds_request *req)
1803 {
1804  struct ceph_mds_session *session = NULL;
1805  int mds = -1;
1806  int err = -EAGAIN;
1807 
1808  if (req->r_err || req->r_got_result)
1809  goto out;
1810 
1811  if (req->r_timeout &&
1812  time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1813  dout("do_request timed out\n");
1814  err = -EIO;
1815  goto finish;
1816  }
1817 
1818  put_request_session(req);
1819 
1820  mds = __choose_mds(mdsc, req);
1821  if (mds < 0 ||
1822  ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1823  dout("do_request no mds or not active, waiting for map\n");
1824  list_add(&req->r_wait, &mdsc->waiting_for_map);
1825  goto out;
1826  }
1827 
1828  /* get, open session */
1829  session = __ceph_lookup_mds_session(mdsc, mds);
1830  if (!session) {
1831  session = register_session(mdsc, mds);
1832  if (IS_ERR(session)) {
1833  err = PTR_ERR(session);
1834  goto finish;
1835  }
1836  }
1837  req->r_session = get_session(session);
1838 
1839  dout("do_request mds%d session %p state %s\n", mds, session,
1840  session_state_name(session->s_state));
1841  if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1842  session->s_state != CEPH_MDS_SESSION_HUNG) {
1843  if (session->s_state == CEPH_MDS_SESSION_NEW ||
1844  session->s_state == CEPH_MDS_SESSION_CLOSING)
1845  __open_session(mdsc, session);
1846  list_add(&req->r_wait, &session->s_waiting);
1847  goto out_session;
1848  }
1849 
1850  /* send request */
1851  req->r_resend_mds = -1; /* forget any previous mds hint */
1852 
1853  if (req->r_request_started == 0) /* note request start time */
1854  req->r_request_started = jiffies;
1855 
1856  err = __prepare_send_request(mdsc, req, mds);
1857  if (!err) {
1858  ceph_msg_get(req->r_request);
1859  ceph_con_send(&session->s_con, req->r_request);
1860  }
1861 
1862 out_session:
1863  ceph_put_mds_session(session);
1864 out:
1865  return err;
1866 
1867 finish:
1868  req->r_err = err;
1869  complete_request(mdsc, req);
1870  goto out;
1871 }
1872 
1873 /*
1874  * called under mdsc->mutex
1875  */
1876 static void __wake_requests(struct ceph_mds_client *mdsc,
1877  struct list_head *head)
1878 {
1879  struct ceph_mds_request *req, *nreq;
1880 
1881  list_for_each_entry_safe(req, nreq, head, r_wait) {
1882  list_del_init(&req->r_wait);
1883  __do_request(mdsc, req);
1884  }
1885 }
1886 
1887 /*
1888  * Wake up threads with requests pending for @mds, so that they can
1889  * resubmit their requests to a possibly different mds.
1890  */
1891 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1892 {
1893  struct ceph_mds_request *req;
1894  struct rb_node *p;
1895 
1896  dout("kick_requests mds%d\n", mds);
1897  for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1898  req = rb_entry(p, struct ceph_mds_request, r_node);
1899  if (req->r_got_unsafe)
1900  continue;
1901  if (req->r_session &&
1902  req->r_session->s_mds == mds) {
1903  dout(" kicking tid %llu\n", req->r_tid);
1904  __do_request(mdsc, req);
1905  }
1906  }
1907 }
1908 
1910  struct ceph_mds_request *req)
1911 {
1912  dout("submit_request on %p\n", req);
1913  mutex_lock(&mdsc->mutex);
1914  __register_request(mdsc, req, NULL);
1915  __do_request(mdsc, req);
1916  mutex_unlock(&mdsc->mutex);
1917 }
1918 
1919 /*
1920  * Synchrously perform an mds request. Take care of all of the
1921  * session setup, forwarding, retry details.
1922  */
1924  struct inode *dir,
1925  struct ceph_mds_request *req)
1926 {
1927  int err;
1928 
1929  dout("do_request on %p\n", req);
1930 
1931  /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1932  if (req->r_inode)
1933  ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1934  if (req->r_locked_dir)
1935  ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1936  if (req->r_old_dentry)
1937  ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
1938  CEPH_CAP_PIN);
1939 
1940  /* issue */
1941  mutex_lock(&mdsc->mutex);
1942  __register_request(mdsc, req, dir);
1943  __do_request(mdsc, req);
1944 
1945  if (req->r_err) {
1946  err = req->r_err;
1947  __unregister_request(mdsc, req);
1948  dout("do_request early error %d\n", err);
1949  goto out;
1950  }
1951 
1952  /* wait */
1953  mutex_unlock(&mdsc->mutex);
1954  dout("do_request waiting\n");
1955  if (req->r_timeout) {
1957  &req->r_completion, req->r_timeout);
1958  if (err == 0)
1959  err = -EIO;
1960  } else {
1962  }
1963  dout("do_request waited, got %d\n", err);
1964  mutex_lock(&mdsc->mutex);
1965 
1966  /* only abort if we didn't race with a real reply */
1967  if (req->r_got_result) {
1968  err = le32_to_cpu(req->r_reply_info.head->result);
1969  } else if (err < 0) {
1970  dout("aborted request %lld with %d\n", req->r_tid, err);
1971 
1972  /*
1973  * ensure we aren't running concurrently with
1974  * ceph_fill_trace or ceph_readdir_prepopulate, which
1975  * rely on locks (dir mutex) held by our caller.
1976  */
1977  mutex_lock(&req->r_fill_mutex);
1978  req->r_err = err;
1979  req->r_aborted = true;
1980  mutex_unlock(&req->r_fill_mutex);
1981 
1982  if (req->r_locked_dir &&
1983  (req->r_op & CEPH_MDS_OP_WRITE))
1985  } else {
1986  err = req->r_err;
1987  }
1988 
1989 out:
1990  mutex_unlock(&mdsc->mutex);
1991  dout("do_request %p done, result %d\n", req, err);
1992  return err;
1993 }
1994 
1995 /*
1996  * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS
1997  * namespace request.
1998  */
2000 {
2001  struct inode *inode = req->r_locked_dir;
2002  struct ceph_inode_info *ci = ceph_inode(inode);
2003 
2004  dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode);
2005  spin_lock(&ci->i_ceph_lock);
2006  ceph_dir_clear_complete(inode);
2007  ci->i_release_count++;
2008  spin_unlock(&ci->i_ceph_lock);
2009 
2010  if (req->r_dentry)
2012  if (req->r_old_dentry)
2014 }
2015 
2016 /*
2017  * Handle mds reply.
2018  *
2019  * We take the session mutex and parse and process the reply immediately.
2020  * This preserves the logical ordering of replies, capabilities, etc., sent
2021  * by the MDS as they are applied to our local cache.
2022  */
2023 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2024 {
2025  struct ceph_mds_client *mdsc = session->s_mdsc;
2026  struct ceph_mds_request *req;
2027  struct ceph_mds_reply_head *head = msg->front.iov_base;
2028  struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
2029  u64 tid;
2030  int err, result;
2031  int mds = session->s_mds;
2032 
2033  if (msg->front.iov_len < sizeof(*head)) {
2034  pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2035  ceph_msg_dump(msg);
2036  return;
2037  }
2038 
2039  /* get request, session */
2040  tid = le64_to_cpu(msg->hdr.tid);
2041  mutex_lock(&mdsc->mutex);
2042  req = __lookup_request(mdsc, tid);
2043  if (!req) {
2044  dout("handle_reply on unknown tid %llu\n", tid);
2045  mutex_unlock(&mdsc->mutex);
2046  return;
2047  }
2048  dout("handle_reply %p\n", req);
2049 
2050  /* correct session? */
2051  if (req->r_session != session) {
2052  pr_err("mdsc_handle_reply got %llu on session mds%d"
2053  " not mds%d\n", tid, session->s_mds,
2054  req->r_session ? req->r_session->s_mds : -1);
2055  mutex_unlock(&mdsc->mutex);
2056  goto out;
2057  }
2058 
2059  /* dup? */
2060  if ((req->r_got_unsafe && !head->safe) ||
2061  (req->r_got_safe && head->safe)) {
2062  pr_warning("got a dup %s reply on %llu from mds%d\n",
2063  head->safe ? "safe" : "unsafe", tid, mds);
2064  mutex_unlock(&mdsc->mutex);
2065  goto out;
2066  }
2067  if (req->r_got_safe && !head->safe) {
2068  pr_warning("got unsafe after safe on %llu from mds%d\n",
2069  tid, mds);
2070  mutex_unlock(&mdsc->mutex);
2071  goto out;
2072  }
2073 
2074  result = le32_to_cpu(head->result);
2075 
2076  /*
2077  * Handle an ESTALE
2078  * if we're not talking to the authority, send to them
2079  * if the authority has changed while we weren't looking,
2080  * send to new authority
2081  * Otherwise we just have to return an ESTALE
2082  */
2083  if (result == -ESTALE) {
2084  dout("got ESTALE on request %llu", req->r_tid);
2085  if (!req->r_inode) {
2086  /* do nothing; not an authority problem */
2087  } else if (req->r_direct_mode != USE_AUTH_MDS) {
2088  dout("not using auth, setting for that now");
2089  req->r_direct_mode = USE_AUTH_MDS;
2090  __do_request(mdsc, req);
2091  mutex_unlock(&mdsc->mutex);
2092  goto out;
2093  } else {
2094  struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2095  struct ceph_cap *cap = NULL;
2096 
2097  if (req->r_session)
2098  cap = ceph_get_cap_for_mds(ci,
2099  req->r_session->s_mds);
2100 
2101  dout("already using auth");
2102  if ((!cap || cap != ci->i_auth_cap) ||
2103  (cap->mseq != req->r_sent_on_mseq)) {
2104  dout("but cap changed, so resending");
2105  __do_request(mdsc, req);
2106  mutex_unlock(&mdsc->mutex);
2107  goto out;
2108  }
2109  }
2110  dout("have to return ESTALE on request %llu", req->r_tid);
2111  }
2112 
2113 
2114  if (head->safe) {
2115  req->r_got_safe = true;
2116  __unregister_request(mdsc, req);
2118 
2119  if (req->r_got_unsafe) {
2120  /*
2121  * We already handled the unsafe response, now do the
2122  * cleanup. No need to examine the response; the MDS
2123  * doesn't include any result info in the safe
2124  * response. And even if it did, there is nothing
2125  * useful we could do with a revised return value.
2126  */
2127  dout("got safe reply %llu, mds%d\n", tid, mds);
2128  list_del_init(&req->r_unsafe_item);
2129 
2130  /* last unsafe request during umount? */
2131  if (mdsc->stopping && !__get_oldest_req(mdsc))
2133  mutex_unlock(&mdsc->mutex);
2134  goto out;
2135  }
2136  } else {
2137  req->r_got_unsafe = true;
2138  list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2139  }
2140 
2141  dout("handle_reply tid %lld result %d\n", tid, result);
2142  rinfo = &req->r_reply_info;
2143  err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2144  mutex_unlock(&mdsc->mutex);
2145 
2146  mutex_lock(&session->s_mutex);
2147  if (err < 0) {
2148  pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2149  ceph_msg_dump(msg);
2150  goto out_err;
2151  }
2152 
2153  /* snap trace */
2154  if (rinfo->snapblob_len) {
2155  down_write(&mdsc->snap_rwsem);
2156  ceph_update_snap_trace(mdsc, rinfo->snapblob,
2157  rinfo->snapblob + rinfo->snapblob_len,
2158  le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2159  downgrade_write(&mdsc->snap_rwsem);
2160  } else {
2161  down_read(&mdsc->snap_rwsem);
2162  }
2163 
2164  /* insert trace into our cache */
2165  mutex_lock(&req->r_fill_mutex);
2166  err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2167  if (err == 0) {
2168  if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2169  rinfo->dir_nr)
2172  }
2173  mutex_unlock(&req->r_fill_mutex);
2174 
2175  up_read(&mdsc->snap_rwsem);
2176 out_err:
2177  mutex_lock(&mdsc->mutex);
2178  if (!req->r_aborted) {
2179  if (err) {
2180  req->r_err = err;
2181  } else {
2182  req->r_reply = msg;
2183  ceph_msg_get(msg);
2184  req->r_got_result = true;
2185  }
2186  } else {
2187  dout("reply arrived after request %lld was aborted\n", tid);
2188  }
2189  mutex_unlock(&mdsc->mutex);
2190 
2191  ceph_add_cap_releases(mdsc, req->r_session);
2192  mutex_unlock(&session->s_mutex);
2193 
2194  /* kick calling process */
2195  complete_request(mdsc, req);
2196 out:
2197  ceph_mdsc_put_request(req);
2198  return;
2199 }
2200 
2201 
2202 
2203 /*
2204  * handle mds notification that our request has been forwarded.
2205  */
2206 static void handle_forward(struct ceph_mds_client *mdsc,
2207  struct ceph_mds_session *session,
2208  struct ceph_msg *msg)
2209 {
2210  struct ceph_mds_request *req;
2211  u64 tid = le64_to_cpu(msg->hdr.tid);
2212  u32 next_mds;
2213  u32 fwd_seq;
2214  int err = -EINVAL;
2215  void *p = msg->front.iov_base;
2216  void *end = p + msg->front.iov_len;
2217 
2218  ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2219  next_mds = ceph_decode_32(&p);
2220  fwd_seq = ceph_decode_32(&p);
2221 
2222  mutex_lock(&mdsc->mutex);
2223  req = __lookup_request(mdsc, tid);
2224  if (!req) {
2225  dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2226  goto out; /* dup reply? */
2227  }
2228 
2229  if (req->r_aborted) {
2230  dout("forward tid %llu aborted, unregistering\n", tid);
2231  __unregister_request(mdsc, req);
2232  } else if (fwd_seq <= req->r_num_fwd) {
2233  dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2234  tid, next_mds, req->r_num_fwd, fwd_seq);
2235  } else {
2236  /* resend. forward race not possible; mds would drop */
2237  dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2238  BUG_ON(req->r_err);
2239  BUG_ON(req->r_got_result);
2240  req->r_num_fwd = fwd_seq;
2241  req->r_resend_mds = next_mds;
2242  put_request_session(req);
2243  __do_request(mdsc, req);
2244  }
2245  ceph_mdsc_put_request(req);
2246 out:
2247  mutex_unlock(&mdsc->mutex);
2248  return;
2249 
2250 bad:
2251  pr_err("mdsc_handle_forward decode error err=%d\n", err);
2252 }
2253 
2254 /*
2255  * handle a mds session control message
2256  */
2257 static void handle_session(struct ceph_mds_session *session,
2258  struct ceph_msg *msg)
2259 {
2260  struct ceph_mds_client *mdsc = session->s_mdsc;
2261  u32 op;
2262  u64 seq;
2263  int mds = session->s_mds;
2264  struct ceph_mds_session_head *h = msg->front.iov_base;
2265  int wake = 0;
2266 
2267  /* decode */
2268  if (msg->front.iov_len != sizeof(*h))
2269  goto bad;
2270  op = le32_to_cpu(h->op);
2271  seq = le64_to_cpu(h->seq);
2272 
2273  mutex_lock(&mdsc->mutex);
2274  if (op == CEPH_SESSION_CLOSE)
2275  __unregister_session(mdsc, session);
2276  /* FIXME: this ttl calculation is generous */
2277  session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2278  mutex_unlock(&mdsc->mutex);
2279 
2280  mutex_lock(&session->s_mutex);
2281 
2282  dout("handle_session mds%d %s %p state %s seq %llu\n",
2283  mds, ceph_session_op_name(op), session,
2284  session_state_name(session->s_state), seq);
2285 
2286  if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2287  session->s_state = CEPH_MDS_SESSION_OPEN;
2288  pr_info("mds%d came back\n", session->s_mds);
2289  }
2290 
2291  switch (op) {
2292  case CEPH_SESSION_OPEN:
2293  if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2294  pr_info("mds%d reconnect success\n", session->s_mds);
2295  session->s_state = CEPH_MDS_SESSION_OPEN;
2296  renewed_caps(mdsc, session, 0);
2297  wake = 1;
2298  if (mdsc->stopping)
2299  __close_session(mdsc, session);
2300  break;
2301 
2303  if (session->s_renew_seq == seq)
2304  renewed_caps(mdsc, session, 1);
2305  break;
2306 
2307  case CEPH_SESSION_CLOSE:
2308  if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2309  pr_info("mds%d reconnect denied\n", session->s_mds);
2310  remove_session_caps(session);
2311  wake = 1; /* for good measure */
2312  wake_up_all(&mdsc->session_close_wq);
2313  kick_requests(mdsc, mds);
2314  break;
2315 
2316  case CEPH_SESSION_STALE:
2317  pr_info("mds%d caps went stale, renewing\n",
2318  session->s_mds);
2319  spin_lock(&session->s_gen_ttl_lock);
2320  session->s_cap_gen++;
2321  session->s_cap_ttl = jiffies - 1;
2322  spin_unlock(&session->s_gen_ttl_lock);
2323  send_renew_caps(mdsc, session);
2324  break;
2325 
2327  trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2328  break;
2329 
2330  default:
2331  pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2332  WARN_ON(1);
2333  }
2334 
2335  mutex_unlock(&session->s_mutex);
2336  if (wake) {
2337  mutex_lock(&mdsc->mutex);
2338  __wake_requests(mdsc, &session->s_waiting);
2339  mutex_unlock(&mdsc->mutex);
2340  }
2341  return;
2342 
2343 bad:
2344  pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2345  (int)msg->front.iov_len);
2346  ceph_msg_dump(msg);
2347  return;
2348 }
2349 
2350 
2351 /*
2352  * called under session->mutex.
2353  */
2354 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2355  struct ceph_mds_session *session)
2356 {
2357  struct ceph_mds_request *req, *nreq;
2358  int err;
2359 
2360  dout("replay_unsafe_requests mds%d\n", session->s_mds);
2361 
2362  mutex_lock(&mdsc->mutex);
2363  list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2364  err = __prepare_send_request(mdsc, req, session->s_mds);
2365  if (!err) {
2366  ceph_msg_get(req->r_request);
2367  ceph_con_send(&session->s_con, req->r_request);
2368  }
2369  }
2370  mutex_unlock(&mdsc->mutex);
2371 }
2372 
2373 /*
2374  * Encode information about a cap for a reconnect with the MDS.
2375  */
2376 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2377  void *arg)
2378 {
2379  union {
2380  struct ceph_mds_cap_reconnect v2;
2382  } rec;
2383  size_t reclen;
2384  struct ceph_inode_info *ci;
2385  struct ceph_reconnect_state *recon_state = arg;
2386  struct ceph_pagelist *pagelist = recon_state->pagelist;
2387  char *path;
2388  int pathlen, err;
2389  u64 pathbase;
2390  struct dentry *dentry;
2391 
2392  ci = cap->ci;
2393 
2394  dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2395  inode, ceph_vinop(inode), cap, cap->cap_id,
2396  ceph_cap_string(cap->issued));
2397  err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2398  if (err)
2399  return err;
2400 
2401  dentry = d_find_alias(inode);
2402  if (dentry) {
2403  path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2404  if (IS_ERR(path)) {
2405  err = PTR_ERR(path);
2406  goto out_dput;
2407  }
2408  } else {
2409  path = NULL;
2410  pathlen = 0;
2411  }
2412  err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2413  if (err)
2414  goto out_free;
2415 
2416  spin_lock(&ci->i_ceph_lock);
2417  cap->seq = 0; /* reset cap seq */
2418  cap->issue_seq = 0; /* and issue_seq */
2419 
2420  if (recon_state->flock) {
2421  rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2422  rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2423  rec.v2.issued = cpu_to_le32(cap->issued);
2424  rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2425  rec.v2.pathbase = cpu_to_le64(pathbase);
2426  rec.v2.flock_len = 0;
2427  reclen = sizeof(rec.v2);
2428  } else {
2429  rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2430  rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2431  rec.v1.issued = cpu_to_le32(cap->issued);
2432  rec.v1.size = cpu_to_le64(inode->i_size);
2433  ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2434  ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2435  rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2436  rec.v1.pathbase = cpu_to_le64(pathbase);
2437  reclen = sizeof(rec.v1);
2438  }
2439  spin_unlock(&ci->i_ceph_lock);
2440 
2441  if (recon_state->flock) {
2442  int num_fcntl_locks, num_flock_locks;
2443  struct ceph_pagelist_cursor trunc_point;
2444 
2445  ceph_pagelist_set_cursor(pagelist, &trunc_point);
2446  do {
2447  lock_flocks();
2448  ceph_count_locks(inode, &num_fcntl_locks,
2449  &num_flock_locks);
2450  rec.v2.flock_len = (2*sizeof(u32) +
2451  (num_fcntl_locks+num_flock_locks) *
2452  sizeof(struct ceph_filelock));
2453  unlock_flocks();
2454 
2455  /* pre-alloc pagelist */
2456  ceph_pagelist_truncate(pagelist, &trunc_point);
2457  err = ceph_pagelist_append(pagelist, &rec, reclen);
2458  if (!err)
2459  err = ceph_pagelist_reserve(pagelist,
2460  rec.v2.flock_len);
2461 
2462  /* encode locks */
2463  if (!err) {
2464  lock_flocks();
2465  err = ceph_encode_locks(inode,
2466  pagelist,
2467  num_fcntl_locks,
2468  num_flock_locks);
2469  unlock_flocks();
2470  }
2471  } while (err == -ENOSPC);
2472  } else {
2473  err = ceph_pagelist_append(pagelist, &rec, reclen);
2474  }
2475 
2476 out_free:
2477  kfree(path);
2478 out_dput:
2479  dput(dentry);
2480  return err;
2481 }
2482 
2483 
2484 /*
2485  * If an MDS fails and recovers, clients need to reconnect in order to
2486  * reestablish shared state. This includes all caps issued through
2487  * this session _and_ the snap_realm hierarchy. Because it's not
2488  * clear which snap realms the mds cares about, we send everything we
2489  * know about.. that ensures we'll then get any new info the
2490  * recovering MDS might have.
2491  *
2492  * This is a relatively heavyweight operation, but it's rare.
2493  *
2494  * called with mdsc->mutex held.
2495  */
2496 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2497  struct ceph_mds_session *session)
2498 {
2499  struct ceph_msg *reply;
2500  struct rb_node *p;
2501  int mds = session->s_mds;
2502  int err = -ENOMEM;
2503  struct ceph_pagelist *pagelist;
2504  struct ceph_reconnect_state recon_state;
2505 
2506  pr_info("mds%d reconnect start\n", mds);
2507 
2508  pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2509  if (!pagelist)
2510  goto fail_nopagelist;
2511  ceph_pagelist_init(pagelist);
2512 
2513  reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2514  if (!reply)
2515  goto fail_nomsg;
2516 
2517  mutex_lock(&session->s_mutex);
2519  session->s_seq = 0;
2520 
2521  ceph_con_close(&session->s_con);
2522  ceph_con_open(&session->s_con,
2523  CEPH_ENTITY_TYPE_MDS, mds,
2524  ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2525 
2526  /* replay unsafe requests */
2527  replay_unsafe_requests(mdsc, session);
2528 
2529  down_read(&mdsc->snap_rwsem);
2530 
2531  dout("session %p state %s\n", session,
2532  session_state_name(session->s_state));
2533 
2534  /* drop old cap expires; we're about to reestablish that state */
2535  discard_cap_releases(mdsc, session);
2536 
2537  /* traverse this session's caps */
2538  err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2539  if (err)
2540  goto fail;
2541 
2542  recon_state.pagelist = pagelist;
2543  recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2544  err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2545  if (err < 0)
2546  goto fail;
2547 
2548  /*
2549  * snaprealms. we provide mds with the ino, seq (version), and
2550  * parent for all of our realms. If the mds has any newer info,
2551  * it will tell us.
2552  */
2553  for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2554  struct ceph_snap_realm *realm =
2555  rb_entry(p, struct ceph_snap_realm, node);
2556  struct ceph_mds_snaprealm_reconnect sr_rec;
2557 
2558  dout(" adding snap realm %llx seq %lld parent %llx\n",
2559  realm->ino, realm->seq, realm->parent_ino);
2560  sr_rec.ino = cpu_to_le64(realm->ino);
2561  sr_rec.seq = cpu_to_le64(realm->seq);
2562  sr_rec.parent = cpu_to_le64(realm->parent_ino);
2563  err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2564  if (err)
2565  goto fail;
2566  }
2567 
2568  reply->pagelist = pagelist;
2569  if (recon_state.flock)
2570  reply->hdr.version = cpu_to_le16(2);
2571  reply->hdr.data_len = cpu_to_le32(pagelist->length);
2572  reply->nr_pages = calc_pages_for(0, pagelist->length);
2573  ceph_con_send(&session->s_con, reply);
2574 
2575  mutex_unlock(&session->s_mutex);
2576 
2577  mutex_lock(&mdsc->mutex);
2578  __wake_requests(mdsc, &session->s_waiting);
2579  mutex_unlock(&mdsc->mutex);
2580 
2581  up_read(&mdsc->snap_rwsem);
2582  return;
2583 
2584 fail:
2585  ceph_msg_put(reply);
2586  up_read(&mdsc->snap_rwsem);
2587  mutex_unlock(&session->s_mutex);
2588 fail_nomsg:
2589  ceph_pagelist_release(pagelist);
2590  kfree(pagelist);
2591 fail_nopagelist:
2592  pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2593  return;
2594 }
2595 
2596 
2597 /*
2598  * compare old and new mdsmaps, kicking requests
2599  * and closing out old connections as necessary
2600  *
2601  * called under mdsc->mutex.
2602  */
2603 static void check_new_map(struct ceph_mds_client *mdsc,
2604  struct ceph_mdsmap *newmap,
2605  struct ceph_mdsmap *oldmap)
2606 {
2607  int i;
2608  int oldstate, newstate;
2609  struct ceph_mds_session *s;
2610 
2611  dout("check_new_map new %u old %u\n",
2612  newmap->m_epoch, oldmap->m_epoch);
2613 
2614  for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2615  if (mdsc->sessions[i] == NULL)
2616  continue;
2617  s = mdsc->sessions[i];
2618  oldstate = ceph_mdsmap_get_state(oldmap, i);
2619  newstate = ceph_mdsmap_get_state(newmap, i);
2620 
2621  dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2622  i, ceph_mds_state_name(oldstate),
2623  ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2624  ceph_mds_state_name(newstate),
2625  ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2626  session_state_name(s->s_state));
2627 
2628  if (i >= newmap->m_max_mds ||
2629  memcmp(ceph_mdsmap_get_addr(oldmap, i),
2630  ceph_mdsmap_get_addr(newmap, i),
2631  sizeof(struct ceph_entity_addr))) {
2632  if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2633  /* the session never opened, just close it
2634  * out now */
2635  __wake_requests(mdsc, &s->s_waiting);
2636  __unregister_session(mdsc, s);
2637  } else {
2638  /* just close it */
2639  mutex_unlock(&mdsc->mutex);
2640  mutex_lock(&s->s_mutex);
2641  mutex_lock(&mdsc->mutex);
2642  ceph_con_close(&s->s_con);
2643  mutex_unlock(&s->s_mutex);
2645  }
2646 
2647  /* kick any requests waiting on the recovering mds */
2648  kick_requests(mdsc, i);
2649  } else if (oldstate == newstate) {
2650  continue; /* nothing new with this mds */
2651  }
2652 
2653  /*
2654  * send reconnect?
2655  */
2657  newstate >= CEPH_MDS_STATE_RECONNECT) {
2658  mutex_unlock(&mdsc->mutex);
2659  send_mds_reconnect(mdsc, s);
2660  mutex_lock(&mdsc->mutex);
2661  }
2662 
2663  /*
2664  * kick request on any mds that has gone active.
2665  */
2666  if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2667  newstate >= CEPH_MDS_STATE_ACTIVE) {
2668  if (oldstate != CEPH_MDS_STATE_CREATING &&
2669  oldstate != CEPH_MDS_STATE_STARTING)
2670  pr_info("mds%d recovery completed\n", s->s_mds);
2671  kick_requests(mdsc, i);
2672  ceph_kick_flushing_caps(mdsc, s);
2673  wake_up_session_caps(s, 1);
2674  }
2675  }
2676 
2677  for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2678  s = mdsc->sessions[i];
2679  if (!s)
2680  continue;
2681  if (!ceph_mdsmap_is_laggy(newmap, i))
2682  continue;
2683  if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2686  dout(" connecting to export targets of laggy mds%d\n",
2687  i);
2688  __open_export_target_sessions(mdsc, s);
2689  }
2690  }
2691 }
2692 
2693 
2694 
2695 /*
2696  * leases
2697  */
2698 
2699 /*
2700  * caller must hold session s_mutex, dentry->d_lock
2701  */
2702 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2703 {
2704  struct ceph_dentry_info *di = ceph_dentry(dentry);
2705 
2707  di->lease_session = NULL;
2708 }
2709 
2710 static void handle_lease(struct ceph_mds_client *mdsc,
2711  struct ceph_mds_session *session,
2712  struct ceph_msg *msg)
2713 {
2714  struct super_block *sb = mdsc->fsc->sb;
2715  struct inode *inode;
2716  struct dentry *parent, *dentry;
2717  struct ceph_dentry_info *di;
2718  int mds = session->s_mds;
2719  struct ceph_mds_lease *h = msg->front.iov_base;
2720  u32 seq;
2721  struct ceph_vino vino;
2722  struct qstr dname;
2723  int release = 0;
2724 
2725  dout("handle_lease from mds%d\n", mds);
2726 
2727  /* decode */
2728  if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2729  goto bad;
2730  vino.ino = le64_to_cpu(h->ino);
2731  vino.snap = CEPH_NOSNAP;
2732  seq = le32_to_cpu(h->seq);
2733  dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2734  dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2735  if (dname.len != get_unaligned_le32(h+1))
2736  goto bad;
2737 
2738  mutex_lock(&session->s_mutex);
2739  session->s_seq++;
2740 
2741  /* lookup inode */
2742  inode = ceph_find_inode(sb, vino);
2743  dout("handle_lease %s, ino %llx %p %.*s\n",
2744  ceph_lease_op_name(h->action), vino.ino, inode,
2745  dname.len, dname.name);
2746  if (inode == NULL) {
2747  dout("handle_lease no inode %llx\n", vino.ino);
2748  goto release;
2749  }
2750 
2751  /* dentry */
2752  parent = d_find_alias(inode);
2753  if (!parent) {
2754  dout("no parent dentry on inode %p\n", inode);
2755  WARN_ON(1);
2756  goto release; /* hrm... */
2757  }
2758  dname.hash = full_name_hash(dname.name, dname.len);
2759  dentry = d_lookup(parent, &dname);
2760  dput(parent);
2761  if (!dentry)
2762  goto release;
2763 
2764  spin_lock(&dentry->d_lock);
2765  di = ceph_dentry(dentry);
2766  switch (h->action) {
2767  case CEPH_MDS_LEASE_REVOKE:
2768  if (di->lease_session == session) {
2769  if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2770  h->seq = cpu_to_le32(di->lease_seq);
2772  }
2773  release = 1;
2774  break;
2775 
2776  case CEPH_MDS_LEASE_RENEW:
2777  if (di->lease_session == session &&
2778  di->lease_gen == session->s_cap_gen &&
2779  di->lease_renew_from &&
2780  di->lease_renew_after == 0) {
2781  unsigned long duration =
2782  le32_to_cpu(h->duration_ms) * HZ / 1000;
2783 
2784  di->lease_seq = seq;
2785  dentry->d_time = di->lease_renew_from + duration;
2787  (duration >> 1);
2788  di->lease_renew_from = 0;
2789  }
2790  break;
2791  }
2792  spin_unlock(&dentry->d_lock);
2793  dput(dentry);
2794 
2795  if (!release)
2796  goto out;
2797 
2798 release:
2799  /* let's just reuse the same message */
2801  ceph_msg_get(msg);
2802  ceph_con_send(&session->s_con, msg);
2803 
2804 out:
2805  iput(inode);
2806  mutex_unlock(&session->s_mutex);
2807  return;
2808 
2809 bad:
2810  pr_err("corrupt lease message\n");
2811  ceph_msg_dump(msg);
2812 }
2813 
2815  struct inode *inode,
2816  struct dentry *dentry, char action,
2817  u32 seq)
2818 {
2819  struct ceph_msg *msg;
2820  struct ceph_mds_lease *lease;
2821  int len = sizeof(*lease) + sizeof(u32);
2822  int dnamelen = 0;
2823 
2824  dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2825  inode, dentry, ceph_lease_op_name(action), session->s_mds);
2826  dnamelen = dentry->d_name.len;
2827  len += dnamelen;
2828 
2829  msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
2830  if (!msg)
2831  return;
2832  lease = msg->front.iov_base;
2833  lease->action = action;
2834  lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2835  lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2836  lease->seq = cpu_to_le32(seq);
2837  put_unaligned_le32(dnamelen, lease + 1);
2838  memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2839 
2840  /*
2841  * if this is a preemptive lease RELEASE, no need to
2842  * flush request stream, since the actual request will
2843  * soon follow.
2844  */
2845  msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2846 
2847  ceph_con_send(&session->s_con, msg);
2848 }
2849 
2850 /*
2851  * Preemptively release a lease we expect to invalidate anyway.
2852  * Pass @inode always, @dentry is optional.
2853  */
2854 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2855  struct dentry *dentry)
2856 {
2857  struct ceph_dentry_info *di;
2858  struct ceph_mds_session *session;
2859  u32 seq;
2860 
2861  BUG_ON(inode == NULL);
2862  BUG_ON(dentry == NULL);
2863 
2864  /* is dentry lease valid? */
2865  spin_lock(&dentry->d_lock);
2866  di = ceph_dentry(dentry);
2867  if (!di || !di->lease_session ||
2868  di->lease_session->s_mds < 0 ||
2869  di->lease_gen != di->lease_session->s_cap_gen ||
2870  !time_before(jiffies, dentry->d_time)) {
2871  dout("lease_release inode %p dentry %p -- "
2872  "no lease\n",
2873  inode, dentry);
2874  spin_unlock(&dentry->d_lock);
2875  return;
2876  }
2877 
2878  /* we do have a lease on this dentry; note mds and seq */
2879  session = ceph_get_mds_session(di->lease_session);
2880  seq = di->lease_seq;
2882  spin_unlock(&dentry->d_lock);
2883 
2884  dout("lease_release inode %p dentry %p to mds%d\n",
2885  inode, dentry, session->s_mds);
2886  ceph_mdsc_lease_send_msg(session, inode, dentry,
2887  CEPH_MDS_LEASE_RELEASE, seq);
2888  ceph_put_mds_session(session);
2889 }
2890 
2891 /*
2892  * drop all leases (and dentry refs) in preparation for umount
2893  */
2894 static void drop_leases(struct ceph_mds_client *mdsc)
2895 {
2896  int i;
2897 
2898  dout("drop_leases\n");
2899  mutex_lock(&mdsc->mutex);
2900  for (i = 0; i < mdsc->max_sessions; i++) {
2901  struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2902  if (!s)
2903  continue;
2904  mutex_unlock(&mdsc->mutex);
2905  mutex_lock(&s->s_mutex);
2906  mutex_unlock(&s->s_mutex);
2908  mutex_lock(&mdsc->mutex);
2909  }
2910  mutex_unlock(&mdsc->mutex);
2911 }
2912 
2913 
2914 
2915 /*
2916  * delayed work -- periodically trim expired leases, renew caps with mds
2917  */
2918 static void schedule_delayed(struct ceph_mds_client *mdsc)
2919 {
2920  int delay = 5;
2921  unsigned hz = round_jiffies_relative(HZ * delay);
2922  schedule_delayed_work(&mdsc->delayed_work, hz);
2923 }
2924 
2925 static void delayed_work(struct work_struct *work)
2926 {
2927  int i;
2928  struct ceph_mds_client *mdsc =
2930  int renew_interval;
2931  int renew_caps;
2932 
2933  dout("mdsc delayed_work\n");
2935 
2936  mutex_lock(&mdsc->mutex);
2937  renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2938  renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2939  mdsc->last_renew_caps);
2940  if (renew_caps)
2941  mdsc->last_renew_caps = jiffies;
2942 
2943  for (i = 0; i < mdsc->max_sessions; i++) {
2944  struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2945  if (s == NULL)
2946  continue;
2947  if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2948  dout("resending session close request for mds%d\n",
2949  s->s_mds);
2950  request_close_session(mdsc, s);
2952  continue;
2953  }
2954  if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2955  if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2957  pr_info("mds%d hung\n", s->s_mds);
2958  }
2959  }
2960  if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2961  /* this mds is failed or recovering, just wait */
2963  continue;
2964  }
2965  mutex_unlock(&mdsc->mutex);
2966 
2967  mutex_lock(&s->s_mutex);
2968  if (renew_caps)
2969  send_renew_caps(mdsc, s);
2970  else
2972  ceph_add_cap_releases(mdsc, s);
2973  if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2975  ceph_send_cap_releases(mdsc, s);
2976  mutex_unlock(&s->s_mutex);
2978 
2979  mutex_lock(&mdsc->mutex);
2980  }
2981  mutex_unlock(&mdsc->mutex);
2982 
2983  schedule_delayed(mdsc);
2984 }
2985 
2987 
2988 {
2989  struct ceph_mds_client *mdsc;
2990 
2991  mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2992  if (!mdsc)
2993  return -ENOMEM;
2994  mdsc->fsc = fsc;
2995  fsc->mdsc = mdsc;
2996  mutex_init(&mdsc->mutex);
2997  mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2998  if (mdsc->mdsmap == NULL)
2999  return -ENOMEM;
3000 
3001  init_completion(&mdsc->safe_umount_waiters);
3003  INIT_LIST_HEAD(&mdsc->waiting_for_map);
3004  mdsc->sessions = NULL;
3005  mdsc->max_sessions = 0;
3006  mdsc->stopping = 0;
3007  init_rwsem(&mdsc->snap_rwsem);
3008  mdsc->snap_realms = RB_ROOT;
3009  INIT_LIST_HEAD(&mdsc->snap_empty);
3011  mdsc->last_tid = 0;
3012  mdsc->request_tree = RB_ROOT;
3014  mdsc->last_renew_caps = jiffies;
3015  INIT_LIST_HEAD(&mdsc->cap_delay_list);
3017  INIT_LIST_HEAD(&mdsc->snap_flush_list);
3019  mdsc->cap_flush_seq = 0;
3020  INIT_LIST_HEAD(&mdsc->cap_dirty);
3021  INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3022  mdsc->num_cap_flushing = 0;
3026  INIT_LIST_HEAD(&mdsc->dentry_lru);
3027 
3028  ceph_caps_init(mdsc);
3029  ceph_adjust_min_caps(mdsc, fsc->min_caps);
3030 
3031  return 0;
3032 }
3033 
3034 /*
3035  * Wait for safe replies on open mds requests. If we time out, drop
3036  * all requests from the tree to avoid dangling dentry refs.
3037  */
3038 static void wait_requests(struct ceph_mds_client *mdsc)
3039 {
3040  struct ceph_mds_request *req;
3041  struct ceph_fs_client *fsc = mdsc->fsc;
3042 
3043  mutex_lock(&mdsc->mutex);
3044  if (__get_oldest_req(mdsc)) {
3045  mutex_unlock(&mdsc->mutex);
3046 
3047  dout("wait_requests waiting for requests\n");
3049  fsc->client->options->mount_timeout * HZ);
3050 
3051  /* tear down remaining requests */
3052  mutex_lock(&mdsc->mutex);
3053  while ((req = __get_oldest_req(mdsc))) {
3054  dout("wait_requests timed out on tid %llu\n",
3055  req->r_tid);
3056  __unregister_request(mdsc, req);
3057  }
3058  }
3059  mutex_unlock(&mdsc->mutex);
3060  dout("wait_requests done\n");
3061 }
3062 
3063 /*
3064  * called before mount is ro, and before dentries are torn down.
3065  * (hmm, does this still race with new lookups?)
3066  */
3068 {
3069  dout("pre_umount\n");
3070  mdsc->stopping = 1;
3071 
3072  drop_leases(mdsc);
3073  ceph_flush_dirty_caps(mdsc);
3074  wait_requests(mdsc);
3075 
3076  /*
3077  * wait for reply handlers to drop their request refs and
3078  * their inode/dcache refs
3079  */
3080  ceph_msgr_flush();
3081 }
3082 
3083 /*
3084  * wait for all write mds requests to flush.
3085  */
3086 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3087 {
3088  struct ceph_mds_request *req = NULL, *nextreq;
3089  struct rb_node *n;
3090 
3091  mutex_lock(&mdsc->mutex);
3092  dout("wait_unsafe_requests want %lld\n", want_tid);
3093 restart:
3094  req = __get_oldest_req(mdsc);
3095  while (req && req->r_tid <= want_tid) {
3096  /* find next request */
3097  n = rb_next(&req->r_node);
3098  if (n)
3099  nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3100  else
3101  nextreq = NULL;
3102  if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3103  /* write op */
3104  ceph_mdsc_get_request(req);
3105  if (nextreq)
3106  ceph_mdsc_get_request(nextreq);
3107  mutex_unlock(&mdsc->mutex);
3108  dout("wait_unsafe_requests wait on %llu (want %llu)\n",
3109  req->r_tid, want_tid);
3111  mutex_lock(&mdsc->mutex);
3112  ceph_mdsc_put_request(req);
3113  if (!nextreq)
3114  break; /* next dne before, so we're done! */
3115  if (RB_EMPTY_NODE(&nextreq->r_node)) {
3116  /* next request was removed from tree */
3117  ceph_mdsc_put_request(nextreq);
3118  goto restart;
3119  }
3120  ceph_mdsc_put_request(nextreq); /* won't go away */
3121  }
3122  req = nextreq;
3123  }
3124  mutex_unlock(&mdsc->mutex);
3125  dout("wait_unsafe_requests done\n");
3126 }
3127 
3129 {
3130  u64 want_tid, want_flush;
3131 
3132  if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3133  return;
3134 
3135  dout("sync\n");
3136  mutex_lock(&mdsc->mutex);
3137  want_tid = mdsc->last_tid;
3138  want_flush = mdsc->cap_flush_seq;
3139  mutex_unlock(&mdsc->mutex);
3140  dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3141 
3142  ceph_flush_dirty_caps(mdsc);
3143 
3144  wait_unsafe_requests(mdsc, want_tid);
3145  wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3146 }
3147 
3148 /*
3149  * true if all sessions are closed, or we force unmount
3150  */
3151 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3152 {
3153  int i, n = 0;
3154 
3155  if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3156  return true;
3157 
3158  mutex_lock(&mdsc->mutex);
3159  for (i = 0; i < mdsc->max_sessions; i++)
3160  if (mdsc->sessions[i])
3161  n++;
3162  mutex_unlock(&mdsc->mutex);
3163  return n == 0;
3164 }
3165 
3166 /*
3167  * called after sb is ro.
3168  */
3170 {
3171  struct ceph_mds_session *session;
3172  int i;
3173  struct ceph_fs_client *fsc = mdsc->fsc;
3174  unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3175 
3176  dout("close_sessions\n");
3177 
3178  /* close sessions */
3179  mutex_lock(&mdsc->mutex);
3180  for (i = 0; i < mdsc->max_sessions; i++) {
3181  session = __ceph_lookup_mds_session(mdsc, i);
3182  if (!session)
3183  continue;
3184  mutex_unlock(&mdsc->mutex);
3185  mutex_lock(&session->s_mutex);
3186  __close_session(mdsc, session);
3187  mutex_unlock(&session->s_mutex);
3188  ceph_put_mds_session(session);
3189  mutex_lock(&mdsc->mutex);
3190  }
3191  mutex_unlock(&mdsc->mutex);
3192 
3193  dout("waiting for sessions to close\n");
3194  wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3195  timeout);
3196 
3197  /* tear down remaining sessions */
3198  mutex_lock(&mdsc->mutex);
3199  for (i = 0; i < mdsc->max_sessions; i++) {
3200  if (mdsc->sessions[i]) {
3201  session = get_session(mdsc->sessions[i]);
3202  __unregister_session(mdsc, session);
3203  mutex_unlock(&mdsc->mutex);
3204  mutex_lock(&session->s_mutex);
3205  remove_session_caps(session);
3206  mutex_unlock(&session->s_mutex);
3207  ceph_put_mds_session(session);
3208  mutex_lock(&mdsc->mutex);
3209  }
3210  }
3211  WARN_ON(!list_empty(&mdsc->cap_delay_list));
3212  mutex_unlock(&mdsc->mutex);
3213 
3215 
3216  cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3217 
3218  dout("stopped\n");
3219 }
3220 
3221 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3222 {
3223  dout("stop\n");
3224  cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3225  if (mdsc->mdsmap)
3226  ceph_mdsmap_destroy(mdsc->mdsmap);
3227  kfree(mdsc->sessions);
3228  ceph_caps_finalize(mdsc);
3229 }
3230 
3232 {
3233  struct ceph_mds_client *mdsc = fsc->mdsc;
3234 
3235  dout("mdsc_destroy %p\n", mdsc);
3236  ceph_mdsc_stop(mdsc);
3237 
3238  /* flush out any connection work with references to us */
3239  ceph_msgr_flush();
3240 
3241  fsc->mdsc = NULL;
3242  kfree(mdsc);
3243  dout("mdsc_destroy %p done\n", mdsc);
3244 }
3245 
3246 
3247 /*
3248  * handle mds map update.
3249  */
3250 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3251 {
3252  u32 epoch;
3253  u32 maplen;
3254  void *p = msg->front.iov_base;
3255  void *end = p + msg->front.iov_len;
3256  struct ceph_mdsmap *newmap, *oldmap;
3257  struct ceph_fsid fsid;
3258  int err = -EINVAL;
3259 
3260  ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3261  ceph_decode_copy(&p, &fsid, sizeof(fsid));
3262  if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3263  return;
3264  epoch = ceph_decode_32(&p);
3265  maplen = ceph_decode_32(&p);
3266  dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3267 
3268  /* do we need it? */
3269  ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3270  mutex_lock(&mdsc->mutex);
3271  if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3272  dout("handle_map epoch %u <= our %u\n",
3273  epoch, mdsc->mdsmap->m_epoch);
3274  mutex_unlock(&mdsc->mutex);
3275  return;
3276  }
3277 
3278  newmap = ceph_mdsmap_decode(&p, end);
3279  if (IS_ERR(newmap)) {
3280  err = PTR_ERR(newmap);
3281  goto bad_unlock;
3282  }
3283 
3284  /* swap into place */
3285  if (mdsc->mdsmap) {
3286  oldmap = mdsc->mdsmap;
3287  mdsc->mdsmap = newmap;
3288  check_new_map(mdsc, newmap, oldmap);
3289  ceph_mdsmap_destroy(oldmap);
3290  } else {
3291  mdsc->mdsmap = newmap; /* first mds map */
3292  }
3293  mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3294 
3295  __wake_requests(mdsc, &mdsc->waiting_for_map);
3296 
3297  mutex_unlock(&mdsc->mutex);
3298  schedule_delayed(mdsc);
3299  return;
3300 
3301 bad_unlock:
3302  mutex_unlock(&mdsc->mutex);
3303 bad:
3304  pr_err("error decoding mdsmap %d\n", err);
3305  return;
3306 }
3307 
3308 static struct ceph_connection *con_get(struct ceph_connection *con)
3309 {
3310  struct ceph_mds_session *s = con->private;
3311 
3312  if (get_session(s)) {
3313  dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3314  return con;
3315  }
3316  dout("mdsc con_get %p FAIL\n", s);
3317  return NULL;
3318 }
3319 
3320 static void con_put(struct ceph_connection *con)
3321 {
3322  struct ceph_mds_session *s = con->private;
3323 
3324  dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3326 }
3327 
3328 /*
3329  * if the client is unresponsive for long enough, the mds will kill
3330  * the session entirely.
3331  */
3332 static void peer_reset(struct ceph_connection *con)
3333 {
3334  struct ceph_mds_session *s = con->private;
3335  struct ceph_mds_client *mdsc = s->s_mdsc;
3336 
3337  pr_warning("mds%d closed our session\n", s->s_mds);
3338  send_mds_reconnect(mdsc, s);
3339 }
3340 
3341 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3342 {
3343  struct ceph_mds_session *s = con->private;
3344  struct ceph_mds_client *mdsc = s->s_mdsc;
3345  int type = le16_to_cpu(msg->hdr.type);
3346 
3347  mutex_lock(&mdsc->mutex);
3348  if (__verify_registered_session(mdsc, s) < 0) {
3349  mutex_unlock(&mdsc->mutex);
3350  goto out;
3351  }
3352  mutex_unlock(&mdsc->mutex);
3353 
3354  switch (type) {
3355  case CEPH_MSG_MDS_MAP:
3356  ceph_mdsc_handle_map(mdsc, msg);
3357  break;
3359  handle_session(s, msg);
3360  break;
3361  case CEPH_MSG_CLIENT_REPLY:
3362  handle_reply(s, msg);
3363  break;
3365  handle_forward(mdsc, s, msg);
3366  break;
3367  case CEPH_MSG_CLIENT_CAPS:
3368  ceph_handle_caps(s, msg);
3369  break;
3370  case CEPH_MSG_CLIENT_SNAP:
3371  ceph_handle_snap(mdsc, s, msg);
3372  break;
3373  case CEPH_MSG_CLIENT_LEASE:
3374  handle_lease(mdsc, s, msg);
3375  break;
3376 
3377  default:
3378  pr_err("received unknown message type %d %s\n", type,
3379  ceph_msg_type_name(type));
3380  }
3381 out:
3382  ceph_msg_put(msg);
3383 }
3384 
3385 /*
3386  * authentication
3387  */
3388 
3389 /*
3390  * Note: returned pointer is the address of a structure that's
3391  * managed separately. Caller must *not* attempt to free it.
3392  */
3393 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3394  int *proto, int force_new)
3395 {
3396  struct ceph_mds_session *s = con->private;
3397  struct ceph_mds_client *mdsc = s->s_mdsc;
3398  struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3399  struct ceph_auth_handshake *auth = &s->s_auth;
3400 
3401  if (force_new && auth->authorizer) {
3402  if (ac->ops && ac->ops->destroy_authorizer)
3403  ac->ops->destroy_authorizer(ac, auth->authorizer);
3404  auth->authorizer = NULL;
3405  }
3406  if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
3407  int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3408  auth);
3409  if (ret)
3410  return ERR_PTR(ret);
3411  }
3412  *proto = ac->protocol;
3413 
3414  return auth;
3415 }
3416 
3417 
3418 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3419 {
3420  struct ceph_mds_session *s = con->private;
3421  struct ceph_mds_client *mdsc = s->s_mdsc;
3422  struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3423 
3424  return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3425 }
3426 
3427 static int invalidate_authorizer(struct ceph_connection *con)
3428 {
3429  struct ceph_mds_session *s = con->private;
3430  struct ceph_mds_client *mdsc = s->s_mdsc;
3431  struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3432 
3433  if (ac->ops->invalidate_authorizer)
3434  ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3435 
3436  return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3437 }
3438 
3439 static const struct ceph_connection_operations mds_con_ops = {
3440  .get = con_get,
3441  .put = con_put,
3442  .dispatch = dispatch,
3443  .get_authorizer = get_authorizer,
3444  .verify_authorizer_reply = verify_authorizer_reply,
3445  .invalidate_authorizer = invalidate_authorizer,
3446  .peer_reset = peer_reset,
3447 };
3448 
3449 /* eof */