Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
mon_client.c
Go to the documentation of this file.
2 
3 #include <linux/module.h>
4 #include <linux/types.h>
5 #include <linux/slab.h>
6 #include <linux/random.h>
7 #include <linux/sched.h>
8 
10 #include <linux/ceph/libceph.h>
11 #include <linux/ceph/debugfs.h>
12 #include <linux/ceph/decode.h>
13 #include <linux/ceph/auth.h>
14 
15 /*
16  * Interact with Ceph monitor cluster. Handle requests for new map
17  * versions, and periodically resend as needed. Also implement
18  * statfs() and umount().
19  *
20  * A small cluster of Ceph "monitors" are responsible for managing critical
21  * cluster configuration and state information. An odd number (e.g., 3, 5)
22  * of cmon daemons use a modified version of the Paxos part-time parliament
23  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
24  * list of clients who have mounted the file system.
25  *
26  * We maintain an open, active session with a monitor at all times in order to
27  * receive timely MDSMap updates. We periodically send a keepalive byte on the
28  * TCP socket to ensure we detect a failure. If the connection does break, we
29  * randomly hunt for a new monitor. Once the connection is reestablished, we
30  * resend any outstanding requests.
31  */
32 
33 static const struct ceph_connection_operations mon_con_ops;
34 
35 static int __validate_auth(struct ceph_mon_client *monc);
36 
37 /*
38  * Decode a monmap blob (e.g., during mount).
39  */
40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
41 {
42  struct ceph_monmap *m = NULL;
43  int i, err = -EINVAL;
44  struct ceph_fsid fsid;
45  u32 epoch, num_mon;
46  u16 version;
47  u32 len;
48 
49  ceph_decode_32_safe(&p, end, len, bad);
50  ceph_decode_need(&p, end, len, bad);
51 
52  dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
53 
54  ceph_decode_16_safe(&p, end, version, bad);
55 
56  ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
57  ceph_decode_copy(&p, &fsid, sizeof(fsid));
58  epoch = ceph_decode_32(&p);
59 
60  num_mon = ceph_decode_32(&p);
61  ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
62 
63  if (num_mon >= CEPH_MAX_MON)
64  goto bad;
65  m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
66  if (m == NULL)
67  return ERR_PTR(-ENOMEM);
68  m->fsid = fsid;
69  m->epoch = epoch;
70  m->num_mon = num_mon;
71  ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
72  for (i = 0; i < num_mon; i++)
73  ceph_decode_addr(&m->mon_inst[i].addr);
74 
75  dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
76  m->num_mon);
77  for (i = 0; i < m->num_mon; i++)
78  dout("monmap_decode mon%d is %s\n", i,
79  ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
80  return m;
81 
82 bad:
83  dout("monmap_decode failed with %d\n", err);
84  kfree(m);
85  return ERR_PTR(err);
86 }
87 
88 /*
89  * return true if *addr is included in the monmap.
90  */
92 {
93  int i;
94 
95  for (i = 0; i < m->num_mon; i++)
96  if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
97  return 1;
98  return 0;
99 }
100 
101 /*
102  * Send an auth request.
103  */
104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
105 {
106  monc->pending_auth = 1;
107  monc->m_auth->front.iov_len = len;
108  monc->m_auth->hdr.front_len = cpu_to_le32(len);
109  ceph_msg_revoke(monc->m_auth);
110  ceph_msg_get(monc->m_auth); /* keep our ref */
111  ceph_con_send(&monc->con, monc->m_auth);
112 }
113 
114 /*
115  * Close monitor session, if any.
116  */
117 static void __close_session(struct ceph_mon_client *monc)
118 {
119  dout("__close_session closing mon%d\n", monc->cur_mon);
120  ceph_msg_revoke(monc->m_auth);
124  ceph_con_close(&monc->con);
125  monc->cur_mon = -1;
126  monc->pending_auth = 0;
127  ceph_auth_reset(monc->auth);
128 }
129 
130 /*
131  * Open a session with a (new) monitor.
132  */
133 static int __open_session(struct ceph_mon_client *monc)
134 {
135  char r;
136  int ret;
137 
138  if (monc->cur_mon < 0) {
139  get_random_bytes(&r, 1);
140  monc->cur_mon = r % monc->monmap->num_mon;
141  dout("open_session num=%d r=%d -> mon%d\n",
142  monc->monmap->num_mon, r, monc->cur_mon);
143  monc->sub_sent = 0;
144  monc->sub_renew_after = jiffies; /* i.e., expired */
145  monc->want_next_osdmap = !!monc->want_next_osdmap;
146 
147  dout("open_session mon%d opening\n", monc->cur_mon);
148  ceph_con_open(&monc->con,
150  &monc->monmap->mon_inst[monc->cur_mon].addr);
151 
152  /* initiatiate authentication handshake */
153  ret = ceph_auth_build_hello(monc->auth,
154  monc->m_auth->front.iov_base,
155  monc->m_auth->front_max);
156  __send_prepared_auth_request(monc, ret);
157  } else {
158  dout("open_session mon%d already open\n", monc->cur_mon);
159  }
160  return 0;
161 }
162 
163 static bool __sub_expired(struct ceph_mon_client *monc)
164 {
165  return time_after_eq(jiffies, monc->sub_renew_after);
166 }
167 
168 /*
169  * Reschedule delayed work timer.
170  */
171 static void __schedule_delayed(struct ceph_mon_client *monc)
172 {
173  unsigned int delay;
174 
175  if (monc->cur_mon < 0 || __sub_expired(monc))
176  delay = 10 * HZ;
177  else
178  delay = 20 * HZ;
179  dout("__schedule_delayed after %u\n", delay);
180  schedule_delayed_work(&monc->delayed_work, delay);
181 }
182 
183 /*
184  * Send subscribe request for mdsmap and/or osdmap.
185  */
186 static void __send_subscribe(struct ceph_mon_client *monc)
187 {
188  dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
189  (unsigned int)monc->sub_sent, __sub_expired(monc),
190  monc->want_next_osdmap);
191  if ((__sub_expired(monc) && !monc->sub_sent) ||
192  monc->want_next_osdmap == 1) {
193  struct ceph_msg *msg = monc->m_subscribe;
194  struct ceph_mon_subscribe_item *i;
195  void *p, *end;
196  int num;
197 
198  p = msg->front.iov_base;
199  end = p + msg->front_max;
200 
201  num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
202  ceph_encode_32(&p, num);
203 
204  if (monc->want_next_osdmap) {
205  dout("__send_subscribe to 'osdmap' %u\n",
206  (unsigned int)monc->have_osdmap);
207  ceph_encode_string(&p, end, "osdmap", 6);
208  i = p;
209  i->have = cpu_to_le64(monc->have_osdmap);
210  i->onetime = 1;
211  p += sizeof(*i);
212  monc->want_next_osdmap = 2; /* requested */
213  }
214  if (monc->want_mdsmap) {
215  dout("__send_subscribe to 'mdsmap' %u+\n",
216  (unsigned int)monc->have_mdsmap);
217  ceph_encode_string(&p, end, "mdsmap", 6);
218  i = p;
219  i->have = cpu_to_le64(monc->have_mdsmap);
220  i->onetime = 0;
221  p += sizeof(*i);
222  }
223  ceph_encode_string(&p, end, "monmap", 6);
224  i = p;
225  i->have = 0;
226  i->onetime = 0;
227  p += sizeof(*i);
228 
229  msg->front.iov_len = p - msg->front.iov_base;
230  msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
231  ceph_msg_revoke(msg);
232  ceph_con_send(&monc->con, ceph_msg_get(msg));
233 
234  monc->sub_sent = jiffies | 1; /* never 0 */
235  }
236 }
237 
238 static void handle_subscribe_ack(struct ceph_mon_client *monc,
239  struct ceph_msg *msg)
240 {
241  unsigned int seconds;
242  struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
243 
244  if (msg->front.iov_len < sizeof(*h))
245  goto bad;
246  seconds = le32_to_cpu(h->duration);
247 
248  mutex_lock(&monc->mutex);
249  if (monc->hunting) {
250  pr_info("mon%d %s session established\n",
251  monc->cur_mon,
252  ceph_pr_addr(&monc->con.peer_addr.in_addr));
253  monc->hunting = false;
254  }
255  dout("handle_subscribe_ack after %d seconds\n", seconds);
256  monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
257  monc->sub_sent = 0;
258  mutex_unlock(&monc->mutex);
259  return;
260 bad:
261  pr_err("got corrupt subscribe-ack msg\n");
262  ceph_msg_dump(msg);
263 }
264 
265 /*
266  * Keep track of which maps we have
267  */
269 {
270  mutex_lock(&monc->mutex);
271  monc->have_mdsmap = got;
272  mutex_unlock(&monc->mutex);
273  return 0;
274 }
276 
278 {
279  mutex_lock(&monc->mutex);
280  monc->have_osdmap = got;
281  monc->want_next_osdmap = 0;
282  mutex_unlock(&monc->mutex);
283  return 0;
284 }
285 
286 /*
287  * Register interest in the next osdmap
288  */
290 {
291  dout("request_next_osdmap have %u\n", monc->have_osdmap);
292  mutex_lock(&monc->mutex);
293  if (!monc->want_next_osdmap)
294  monc->want_next_osdmap = 1;
295  if (monc->want_next_osdmap < 2)
296  __send_subscribe(monc);
297  mutex_unlock(&monc->mutex);
298 }
299 
300 /*
301  *
302  */
304 {
305  mutex_lock(&monc->mutex);
306  __open_session(monc);
307  __schedule_delayed(monc);
308  mutex_unlock(&monc->mutex);
309  return 0;
310 }
312 
313 /*
314  * We require the fsid and global_id in order to initialize our
315  * debugfs dir.
316  */
317 static bool have_debugfs_info(struct ceph_mon_client *monc)
318 {
319  dout("have_debugfs_info fsid %d globalid %lld\n",
320  (int)monc->client->have_fsid, monc->auth->global_id);
321  return monc->client->have_fsid && monc->auth->global_id > 0;
322 }
323 
324 /*
325  * The monitor responds with mount ack indicate mount success. The
326  * included client ticket allows the client to talk to MDSs and OSDs.
327  */
328 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
329  struct ceph_msg *msg)
330 {
331  struct ceph_client *client = monc->client;
332  struct ceph_monmap *monmap = NULL, *old = monc->monmap;
333  void *p, *end;
334  int had_debugfs_info, init_debugfs = 0;
335 
336  mutex_lock(&monc->mutex);
337 
338  had_debugfs_info = have_debugfs_info(monc);
339 
340  dout("handle_monmap\n");
341  p = msg->front.iov_base;
342  end = p + msg->front.iov_len;
343 
344  monmap = ceph_monmap_decode(p, end);
345  if (IS_ERR(monmap)) {
346  pr_err("problem decoding monmap, %d\n",
347  (int)PTR_ERR(monmap));
348  goto out;
349  }
350 
351  if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
352  kfree(monmap);
353  goto out;
354  }
355 
356  client->monc.monmap = monmap;
357  kfree(old);
358 
359  if (!client->have_fsid) {
360  client->have_fsid = true;
361  if (!had_debugfs_info && have_debugfs_info(monc)) {
362  pr_info("client%lld fsid %pU\n",
363  ceph_client_id(monc->client),
364  &monc->client->fsid);
365  init_debugfs = 1;
366  }
367  mutex_unlock(&monc->mutex);
368 
369  if (init_debugfs) {
370  /*
371  * do debugfs initialization without mutex to avoid
372  * creating a locking dependency
373  */
375  }
376 
377  goto out_unlocked;
378  }
379 out:
380  mutex_unlock(&monc->mutex);
381 out_unlocked:
382  wake_up_all(&client->auth_wq);
383 }
384 
385 /*
386  * generic requests (e.g., statfs, poolop)
387  */
388 static struct ceph_mon_generic_request *__lookup_generic_req(
389  struct ceph_mon_client *monc, u64 tid)
390 {
392  struct rb_node *n = monc->generic_request_tree.rb_node;
393 
394  while (n) {
395  req = rb_entry(n, struct ceph_mon_generic_request, node);
396  if (tid < req->tid)
397  n = n->rb_left;
398  else if (tid > req->tid)
399  n = n->rb_right;
400  else
401  return req;
402  }
403  return NULL;
404 }
405 
406 static void __insert_generic_request(struct ceph_mon_client *monc,
407  struct ceph_mon_generic_request *new)
408 {
409  struct rb_node **p = &monc->generic_request_tree.rb_node;
410  struct rb_node *parent = NULL;
411  struct ceph_mon_generic_request *req = NULL;
412 
413  while (*p) {
414  parent = *p;
415  req = rb_entry(parent, struct ceph_mon_generic_request, node);
416  if (new->tid < req->tid)
417  p = &(*p)->rb_left;
418  else if (new->tid > req->tid)
419  p = &(*p)->rb_right;
420  else
421  BUG();
422  }
423 
424  rb_link_node(&new->node, parent, p);
425  rb_insert_color(&new->node, &monc->generic_request_tree);
426 }
427 
428 static void release_generic_request(struct kref *kref)
429 {
430  struct ceph_mon_generic_request *req =
431  container_of(kref, struct ceph_mon_generic_request, kref);
432 
433  if (req->reply)
434  ceph_msg_put(req->reply);
435  if (req->request)
436  ceph_msg_put(req->request);
437 
438  kfree(req);
439 }
440 
441 static void put_generic_request(struct ceph_mon_generic_request *req)
442 {
443  kref_put(&req->kref, release_generic_request);
444 }
445 
446 static void get_generic_request(struct ceph_mon_generic_request *req)
447 {
448  kref_get(&req->kref);
449 }
450 
451 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
452  struct ceph_msg_header *hdr,
453  int *skip)
454 {
455  struct ceph_mon_client *monc = con->private;
457  u64 tid = le64_to_cpu(hdr->tid);
458  struct ceph_msg *m;
459 
460  mutex_lock(&monc->mutex);
461  req = __lookup_generic_req(monc, tid);
462  if (!req) {
463  dout("get_generic_reply %lld dne\n", tid);
464  *skip = 1;
465  m = NULL;
466  } else {
467  dout("get_generic_reply %lld got %p\n", tid, req->reply);
468  *skip = 0;
469  m = ceph_msg_get(req->reply);
470  /*
471  * we don't need to track the connection reading into
472  * this reply because we only have one open connection
473  * at a time, ever.
474  */
475  }
476  mutex_unlock(&monc->mutex);
477  return m;
478 }
479 
480 static int do_generic_request(struct ceph_mon_client *monc,
481  struct ceph_mon_generic_request *req)
482 {
483  int err;
484 
485  /* register request */
486  mutex_lock(&monc->mutex);
487  req->tid = ++monc->last_tid;
488  req->request->hdr.tid = cpu_to_le64(req->tid);
489  __insert_generic_request(monc, req);
490  monc->num_generic_requests++;
491  ceph_con_send(&monc->con, ceph_msg_get(req->request));
492  mutex_unlock(&monc->mutex);
493 
495 
496  mutex_lock(&monc->mutex);
497  rb_erase(&req->node, &monc->generic_request_tree);
498  monc->num_generic_requests--;
499  mutex_unlock(&monc->mutex);
500 
501  if (!err)
502  err = req->result;
503  return err;
504 }
505 
506 /*
507  * statfs
508  */
509 static void handle_statfs_reply(struct ceph_mon_client *monc,
510  struct ceph_msg *msg)
511 {
513  struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
514  u64 tid = le64_to_cpu(msg->hdr.tid);
515 
516  if (msg->front.iov_len != sizeof(*reply))
517  goto bad;
518  dout("handle_statfs_reply %p tid %llu\n", msg, tid);
519 
520  mutex_lock(&monc->mutex);
521  req = __lookup_generic_req(monc, tid);
522  if (req) {
523  *(struct ceph_statfs *)req->buf = reply->st;
524  req->result = 0;
525  get_generic_request(req);
526  }
527  mutex_unlock(&monc->mutex);
528  if (req) {
529  complete_all(&req->completion);
530  put_generic_request(req);
531  }
532  return;
533 
534 bad:
535  pr_err("corrupt generic reply, tid %llu\n", tid);
536  ceph_msg_dump(msg);
537 }
538 
539 /*
540  * Do a synchronous statfs().
541  */
543 {
545  struct ceph_mon_statfs *h;
546  int err;
547 
548  req = kzalloc(sizeof(*req), GFP_NOFS);
549  if (!req)
550  return -ENOMEM;
551 
552  kref_init(&req->kref);
553  req->buf = buf;
554  req->buf_len = sizeof(*buf);
555  init_completion(&req->completion);
556 
557  err = -ENOMEM;
558  req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
559  true);
560  if (!req->request)
561  goto out;
563  true);
564  if (!req->reply)
565  goto out;
566 
567  /* fill out request */
568  h = req->request->front.iov_base;
569  h->monhdr.have_version = 0;
570  h->monhdr.session_mon = cpu_to_le16(-1);
571  h->monhdr.session_mon_tid = 0;
572  h->fsid = monc->monmap->fsid;
573 
574  err = do_generic_request(monc, req);
575 
576 out:
577  kref_put(&req->kref, release_generic_request);
578  return err;
579 }
581 
582 /*
583  * pool ops
584  */
585 static int get_poolop_reply_buf(const char *src, size_t src_len,
586  char *dst, size_t dst_len)
587 {
588  u32 buf_len;
589 
590  if (src_len != sizeof(u32) + dst_len)
591  return -EINVAL;
592 
593  buf_len = le32_to_cpu(*(u32 *)src);
594  if (buf_len != dst_len)
595  return -EINVAL;
596 
597  memcpy(dst, src + sizeof(u32), dst_len);
598  return 0;
599 }
600 
601 static void handle_poolop_reply(struct ceph_mon_client *monc,
602  struct ceph_msg *msg)
603 {
605  struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
606  u64 tid = le64_to_cpu(msg->hdr.tid);
607 
608  if (msg->front.iov_len < sizeof(*reply))
609  goto bad;
610  dout("handle_poolop_reply %p tid %llu\n", msg, tid);
611 
612  mutex_lock(&monc->mutex);
613  req = __lookup_generic_req(monc, tid);
614  if (req) {
615  if (req->buf_len &&
616  get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
617  msg->front.iov_len - sizeof(*reply),
618  req->buf, req->buf_len) < 0) {
619  mutex_unlock(&monc->mutex);
620  goto bad;
621  }
622  req->result = le32_to_cpu(reply->reply_code);
623  get_generic_request(req);
624  }
625  mutex_unlock(&monc->mutex);
626  if (req) {
627  complete(&req->completion);
628  put_generic_request(req);
629  }
630  return;
631 
632 bad:
633  pr_err("corrupt generic reply, tid %llu\n", tid);
634  ceph_msg_dump(msg);
635 }
636 
637 /*
638  * Do a synchronous pool op.
639  */
640 static int do_poolop(struct ceph_mon_client *monc, u32 op,
641  u32 pool, u64 snapid,
642  char *buf, int len)
643 {
645  struct ceph_mon_poolop *h;
646  int err;
647 
648  req = kzalloc(sizeof(*req), GFP_NOFS);
649  if (!req)
650  return -ENOMEM;
651 
652  kref_init(&req->kref);
653  req->buf = buf;
654  req->buf_len = len;
655  init_completion(&req->completion);
656 
657  err = -ENOMEM;
658  req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
659  true);
660  if (!req->request)
661  goto out;
663  true);
664  if (!req->reply)
665  goto out;
666 
667  /* fill out request */
668  req->request->hdr.version = cpu_to_le16(2);
669  h = req->request->front.iov_base;
670  h->monhdr.have_version = 0;
671  h->monhdr.session_mon = cpu_to_le16(-1);
672  h->monhdr.session_mon_tid = 0;
673  h->fsid = monc->monmap->fsid;
674  h->pool = cpu_to_le32(pool);
675  h->op = cpu_to_le32(op);
676  h->auid = 0;
677  h->snapid = cpu_to_le64(snapid);
678  h->name_len = 0;
679 
680  err = do_generic_request(monc, req);
681 
682 out:
683  kref_put(&req->kref, release_generic_request);
684  return err;
685 }
686 
688  u32 pool, u64 *snapid)
689 {
690  return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
691  pool, 0, (char *)snapid, sizeof(*snapid));
692 
693 }
695 
697  u32 pool, u64 snapid)
698 {
699  return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
700  pool, snapid, 0, 0);
701 
702 }
703 
704 /*
705  * Resend pending generic requests.
706  */
707 static void __resend_generic_request(struct ceph_mon_client *monc)
708 {
710  struct rb_node *p;
711 
712  for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
713  req = rb_entry(p, struct ceph_mon_generic_request, node);
714  ceph_msg_revoke(req->request);
716  ceph_con_send(&monc->con, ceph_msg_get(req->request));
717  }
718 }
719 
720 /*
721  * Delayed work. If we haven't mounted yet, retry. Otherwise,
722  * renew/retry subscription as needed (in case it is timing out, or we
723  * got an ENOMEM). And keep the monitor connection alive.
724  */
725 static void delayed_work(struct work_struct *work)
726 {
727  struct ceph_mon_client *monc =
729 
730  dout("monc delayed_work\n");
731  mutex_lock(&monc->mutex);
732  if (monc->hunting) {
733  __close_session(monc);
734  __open_session(monc); /* continue hunting */
735  } else {
736  ceph_con_keepalive(&monc->con);
737 
738  __validate_auth(monc);
739 
740  if (monc->auth->ops->is_authenticated(monc->auth))
741  __send_subscribe(monc);
742  }
743  __schedule_delayed(monc);
744  mutex_unlock(&monc->mutex);
745 }
746 
747 /*
748  * On startup, we build a temporary monmap populated with the IPs
749  * provided by mount(2).
750  */
751 static int build_initial_monmap(struct ceph_mon_client *monc)
752 {
753  struct ceph_options *opt = monc->client->options;
754  struct ceph_entity_addr *mon_addr = opt->mon_addr;
755  int num_mon = opt->num_mon;
756  int i;
757 
758  /* build initial monmap */
759  monc->monmap = kzalloc(sizeof(*monc->monmap) +
760  num_mon*sizeof(monc->monmap->mon_inst[0]),
761  GFP_KERNEL);
762  if (!monc->monmap)
763  return -ENOMEM;
764  for (i = 0; i < num_mon; i++) {
765  monc->monmap->mon_inst[i].addr = mon_addr[i];
766  monc->monmap->mon_inst[i].addr.nonce = 0;
767  monc->monmap->mon_inst[i].name.type =
769  monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
770  }
771  monc->monmap->num_mon = num_mon;
772  return 0;
773 }
774 
775 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
776 {
777  int err = 0;
778 
779  dout("init\n");
780  memset(monc, 0, sizeof(*monc));
781  monc->client = cl;
782  monc->monmap = NULL;
783  mutex_init(&monc->mutex);
784 
785  err = build_initial_monmap(monc);
786  if (err)
787  goto out;
788 
789  /* connection */
790  /* authentication */
791  monc->auth = ceph_auth_init(cl->options->name,
792  cl->options->key);
793  if (IS_ERR(monc->auth)) {
794  err = PTR_ERR(monc->auth);
795  goto out_monmap;
796  }
797  monc->auth->want_keys =
800 
801  /* msgs */
802  err = -ENOMEM;
804  sizeof(struct ceph_mon_subscribe_ack),
805  GFP_NOFS, true);
806  if (!monc->m_subscribe_ack)
807  goto out_auth;
808 
810  true);
811  if (!monc->m_subscribe)
812  goto out_subscribe_ack;
813 
815  true);
816  if (!monc->m_auth_reply)
817  goto out_subscribe;
818 
819  monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
820  monc->pending_auth = 0;
821  if (!monc->m_auth)
822  goto out_auth_reply;
823 
824  ceph_con_init(&monc->con, monc, &mon_con_ops,
825  &monc->client->msgr);
826 
827  monc->cur_mon = -1;
828  monc->hunting = true;
829  monc->sub_renew_after = jiffies;
830  monc->sub_sent = 0;
831 
834  monc->num_generic_requests = 0;
835  monc->last_tid = 0;
836 
837  monc->have_mdsmap = 0;
838  monc->have_osdmap = 0;
839  monc->want_next_osdmap = 1;
840  return 0;
841 
842 out_auth_reply:
843  ceph_msg_put(monc->m_auth_reply);
844 out_subscribe:
845  ceph_msg_put(monc->m_subscribe);
846 out_subscribe_ack:
847  ceph_msg_put(monc->m_subscribe_ack);
848 out_auth:
849  ceph_auth_destroy(monc->auth);
850 out_monmap:
851  kfree(monc->monmap);
852 out:
853  return err;
854 }
856 
857 void ceph_monc_stop(struct ceph_mon_client *monc)
858 {
859  dout("stop\n");
861 
862  mutex_lock(&monc->mutex);
863  __close_session(monc);
864 
865  mutex_unlock(&monc->mutex);
866 
867  /*
868  * flush msgr queue before we destroy ourselves to ensure that:
869  * - any work that references our embedded con is finished.
870  * - any osd_client or other work that may reference an authorizer
871  * finishes before we shut down the auth subsystem.
872  */
873  ceph_msgr_flush();
874 
875  ceph_auth_destroy(monc->auth);
876 
877  ceph_msg_put(monc->m_auth);
878  ceph_msg_put(monc->m_auth_reply);
879  ceph_msg_put(monc->m_subscribe);
880  ceph_msg_put(monc->m_subscribe_ack);
881 
882  kfree(monc->monmap);
883 }
885 
886 static void handle_auth_reply(struct ceph_mon_client *monc,
887  struct ceph_msg *msg)
888 {
889  int ret;
890  int was_auth = 0;
891  int had_debugfs_info, init_debugfs = 0;
892 
893  mutex_lock(&monc->mutex);
894  had_debugfs_info = have_debugfs_info(monc);
895  if (monc->auth->ops)
896  was_auth = monc->auth->ops->is_authenticated(monc->auth);
897  monc->pending_auth = 0;
898  ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
899  msg->front.iov_len,
900  monc->m_auth->front.iov_base,
901  monc->m_auth->front_max);
902  if (ret < 0) {
903  monc->client->auth_err = ret;
904  wake_up_all(&monc->client->auth_wq);
905  } else if (ret > 0) {
906  __send_prepared_auth_request(monc, ret);
907  } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
908  dout("authenticated, starting session\n");
909 
910  monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
911  monc->client->msgr.inst.name.num =
912  cpu_to_le64(monc->auth->global_id);
913 
914  __send_subscribe(monc);
915  __resend_generic_request(monc);
916  }
917 
918  if (!had_debugfs_info && have_debugfs_info(monc)) {
919  pr_info("client%lld fsid %pU\n",
920  ceph_client_id(monc->client),
921  &monc->client->fsid);
922  init_debugfs = 1;
923  }
924  mutex_unlock(&monc->mutex);
925 
926  if (init_debugfs) {
927  /*
928  * do debugfs initialization without mutex to avoid
929  * creating a locking dependency
930  */
932  }
933 }
934 
935 static int __validate_auth(struct ceph_mon_client *monc)
936 {
937  int ret;
938 
939  if (monc->pending_auth)
940  return 0;
941 
942  ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
943  monc->m_auth->front_max);
944  if (ret <= 0)
945  return ret; /* either an error, or no need to authenticate */
946  __send_prepared_auth_request(monc, ret);
947  return 0;
948 }
949 
951 {
952  int ret;
953 
954  mutex_lock(&monc->mutex);
955  ret = __validate_auth(monc);
956  mutex_unlock(&monc->mutex);
957  return ret;
958 }
960 
961 /*
962  * handle incoming message
963  */
964 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
965 {
966  struct ceph_mon_client *monc = con->private;
967  int type = le16_to_cpu(msg->hdr.type);
968 
969  if (!monc)
970  return;
971 
972  switch (type) {
973  case CEPH_MSG_AUTH_REPLY:
974  handle_auth_reply(monc, msg);
975  break;
976 
978  handle_subscribe_ack(monc, msg);
979  break;
980 
982  handle_statfs_reply(monc, msg);
983  break;
984 
986  handle_poolop_reply(monc, msg);
987  break;
988 
989  case CEPH_MSG_MON_MAP:
990  ceph_monc_handle_map(monc, msg);
991  break;
992 
993  case CEPH_MSG_OSD_MAP:
994  ceph_osdc_handle_map(&monc->client->osdc, msg);
995  break;
996 
997  default:
998  /* can the chained handler handle it? */
999  if (monc->client->extra_mon_dispatch &&
1000  monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1001  break;
1002 
1003  pr_err("received unknown message type %d %s\n", type,
1004  ceph_msg_type_name(type));
1005  }
1006  ceph_msg_put(msg);
1007 }
1008 
1009 /*
1010  * Allocate memory for incoming message
1011  */
1012 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1013  struct ceph_msg_header *hdr,
1014  int *skip)
1015 {
1016  struct ceph_mon_client *monc = con->private;
1017  int type = le16_to_cpu(hdr->type);
1018  int front_len = le32_to_cpu(hdr->front_len);
1019  struct ceph_msg *m = NULL;
1020 
1021  *skip = 0;
1022 
1023  switch (type) {
1025  m = ceph_msg_get(monc->m_subscribe_ack);
1026  break;
1027  case CEPH_MSG_POOLOP_REPLY:
1028  case CEPH_MSG_STATFS_REPLY:
1029  return get_generic_reply(con, hdr, skip);
1030  case CEPH_MSG_AUTH_REPLY:
1031  m = ceph_msg_get(monc->m_auth_reply);
1032  break;
1033  case CEPH_MSG_MON_MAP:
1034  case CEPH_MSG_MDS_MAP:
1035  case CEPH_MSG_OSD_MAP:
1036  m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1037  if (!m)
1038  return NULL; /* ENOMEM--return skip == 0 */
1039  break;
1040  }
1041 
1042  if (!m) {
1043  pr_info("alloc_msg unknown type %d\n", type);
1044  *skip = 1;
1045  }
1046  return m;
1047 }
1048 
1049 /*
1050  * If the monitor connection resets, pick a new monitor and resubmit
1051  * any pending requests.
1052  */
1053 static void mon_fault(struct ceph_connection *con)
1054 {
1055  struct ceph_mon_client *monc = con->private;
1056 
1057  if (!monc)
1058  return;
1059 
1060  dout("mon_fault\n");
1061  mutex_lock(&monc->mutex);
1062  if (!con->private)
1063  goto out;
1064 
1065  if (!monc->hunting)
1066  pr_info("mon%d %s session lost, "
1067  "hunting for new mon\n", monc->cur_mon,
1068  ceph_pr_addr(&monc->con.peer_addr.in_addr));
1069 
1070  __close_session(monc);
1071  if (!monc->hunting) {
1072  /* start hunting */
1073  monc->hunting = true;
1074  __open_session(monc);
1075  } else {
1076  /* already hunting, let's wait a bit */
1077  __schedule_delayed(monc);
1078  }
1079 out:
1080  mutex_unlock(&monc->mutex);
1081 }
1082 
1083 /*
1084  * We can ignore refcounting on the connection struct, as all references
1085  * will come from the messenger workqueue, which is drained prior to
1086  * mon_client destruction.
1087  */
1088 static struct ceph_connection *con_get(struct ceph_connection *con)
1089 {
1090  return con;
1091 }
1092 
1093 static void con_put(struct ceph_connection *con)
1094 {
1095 }
1096 
1097 static const struct ceph_connection_operations mon_con_ops = {
1098  .get = con_get,
1099  .put = con_put,
1100  .dispatch = dispatch,
1101  .fault = mon_fault,
1102  .alloc_msg = mon_alloc_msg,
1103 };