Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
osd_client.c
Go to the documentation of this file.
2 
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13 
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20 
21 #define OSD_OP_FRONT_LEN 4096
22 #define OSD_OPREPLY_FRONT_LEN 512
23 
24 static const struct ceph_connection_operations osd_con_ops;
25 
26 static void send_queued(struct ceph_osd_client *osdc);
27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28 static void __register_request(struct ceph_osd_client *osdc,
29  struct ceph_osd_request *req);
30 static void __unregister_linger_request(struct ceph_osd_client *osdc,
31  struct ceph_osd_request *req);
32 static void __send_request(struct ceph_osd_client *osdc,
33  struct ceph_osd_request *req);
34 
35 static int op_needs_trail(int op)
36 {
37  switch (op) {
41  case CEPH_OSD_OP_CALL:
42  case CEPH_OSD_OP_NOTIFY:
43  return 1;
44  default:
45  return 0;
46  }
47 }
48 
49 static int op_has_extent(int op)
50 {
51  return (op == CEPH_OSD_OP_READ ||
52  op == CEPH_OSD_OP_WRITE);
53 }
54 
56  struct ceph_file_layout *layout,
57  u64 snapid,
58  u64 off, u64 *plen, u64 *bno,
59  struct ceph_osd_request *req,
60  struct ceph_osd_req_op *op)
61 {
62  struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63  u64 orig_len = *plen;
64  u64 objoff, objlen; /* extent in object */
65  int r;
66 
67  reqhead->snapid = cpu_to_le64(snapid);
68 
69  /* object extent? */
70  r = ceph_calc_file_object_mapping(layout, off, plen, bno,
71  &objoff, &objlen);
72  if (r < 0)
73  return r;
74  if (*plen < orig_len)
75  dout(" skipping last %llu, final file extent %llu~%llu\n",
76  orig_len - *plen, off, *plen);
77 
78  if (op_has_extent(op->op)) {
79  op->extent.offset = objoff;
80  op->extent.length = objlen;
81  }
82  req->r_num_pages = calc_pages_for(off, *plen);
83  req->r_page_alignment = off & ~PAGE_MASK;
84  if (op->op == CEPH_OSD_OP_WRITE)
85  op->payload_len = *plen;
86 
87  dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
88  *bno, objoff, objlen, req->r_num_pages);
89  return 0;
90 }
92 
93 /*
94  * Implement client access to distributed object storage cluster.
95  *
96  * All data objects are stored within a cluster/cloud of OSDs, or
97  * "object storage devices." (Note that Ceph OSDs have _nothing_ to
98  * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
99  * remote daemons serving up and coordinating consistent and safe
100  * access to storage.
101  *
102  * Cluster membership and the mapping of data objects onto storage devices
103  * are described by the osd map.
104  *
105  * We keep track of pending OSD requests (read, write), resubmit
106  * requests to different OSDs when the cluster topology/data layout
107  * change, or retry the affected requests when the communications
108  * channel with an OSD is reset.
109  */
110 
111 /*
112  * calculate the mapping of a file extent onto an object, and fill out the
113  * request accordingly. shorten extent as necessary if it crosses an
114  * object boundary.
115  *
116  * fill osd op in request message.
117  */
118 static int calc_layout(struct ceph_osd_client *osdc,
119  struct ceph_vino vino,
120  struct ceph_file_layout *layout,
121  u64 off, u64 *plen,
122  struct ceph_osd_request *req,
123  struct ceph_osd_req_op *op)
124 {
125  u64 bno;
126  int r;
127 
128  r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
129  plen, &bno, req, op);
130  if (r < 0)
131  return r;
132 
133  snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
134  req->r_oid_len = strlen(req->r_oid);
135 
136  return r;
137 }
138 
139 /*
140  * requests
141  */
143 {
144  struct ceph_osd_request *req = container_of(kref,
145  struct ceph_osd_request,
146  r_kref);
147 
148  if (req->r_request)
149  ceph_msg_put(req->r_request);
150  if (req->r_con_filling_msg) {
151  dout("%s revoking pages %p from con %p\n", __func__,
152  req->r_pages, req->r_con_filling_msg);
154  req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
155  }
156  if (req->r_reply)
157  ceph_msg_put(req->r_reply);
158  if (req->r_own_pages)
160  req->r_num_pages);
161 #ifdef CONFIG_BLOCK
162  if (req->r_bio)
163  bio_put(req->r_bio);
164 #endif
165  ceph_put_snap_context(req->r_snapc);
166  if (req->r_trail) {
168  kfree(req->r_trail);
169  }
170  if (req->r_mempool)
171  mempool_free(req, req->r_osdc->req_mempool);
172  else
173  kfree(req);
174 }
176 
177 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
178 {
179  int i = 0;
180 
181  if (needs_trail)
182  *needs_trail = 0;
183  while (ops[i].op) {
184  if (needs_trail && op_needs_trail(ops[i].op))
185  *needs_trail = 1;
186  i++;
187  }
188 
189  return i;
190 }
191 
193  int flags,
194  struct ceph_snap_context *snapc,
195  struct ceph_osd_req_op *ops,
196  bool use_mempool,
197  gfp_t gfp_flags,
198  struct page **pages,
199  struct bio *bio)
200 {
201  struct ceph_osd_request *req;
202  struct ceph_msg *msg;
203  int needs_trail;
204  int num_op = get_num_ops(ops, &needs_trail);
205  size_t msg_size = sizeof(struct ceph_osd_request_head);
206 
207  msg_size += num_op*sizeof(struct ceph_osd_op);
208 
209  if (use_mempool) {
210  req = mempool_alloc(osdc->req_mempool, gfp_flags);
211  memset(req, 0, sizeof(*req));
212  } else {
213  req = kzalloc(sizeof(*req), gfp_flags);
214  }
215  if (req == NULL)
216  return NULL;
217 
218  req->r_osdc = osdc;
219  req->r_mempool = use_mempool;
220 
221  kref_init(&req->r_kref);
222  init_completion(&req->r_completion);
223  init_completion(&req->r_safe_completion);
224  INIT_LIST_HEAD(&req->r_unsafe_item);
225  INIT_LIST_HEAD(&req->r_linger_item);
226  INIT_LIST_HEAD(&req->r_linger_osd);
227  INIT_LIST_HEAD(&req->r_req_lru_item);
228  INIT_LIST_HEAD(&req->r_osd_item);
229 
230  req->r_flags = flags;
231 
233 
234  /* create reply message */
235  if (use_mempool)
236  msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
237  else
239  OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
240  if (!msg) {
241  ceph_osdc_put_request(req);
242  return NULL;
243  }
244  req->r_reply = msg;
245 
246  /* allocate space for the trailing data */
247  if (needs_trail) {
248  req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
249  if (!req->r_trail) {
250  ceph_osdc_put_request(req);
251  return NULL;
252  }
253  ceph_pagelist_init(req->r_trail);
254  }
255 
256  /* create request message; allow space for oid */
257  msg_size += MAX_OBJ_NAME_SIZE;
258  if (snapc)
259  msg_size += sizeof(u64) * snapc->num_snaps;
260  if (use_mempool)
261  msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
262  else
263  msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
264  if (!msg) {
265  ceph_osdc_put_request(req);
266  return NULL;
267  }
268 
269  memset(msg->front.iov_base, 0, msg->front.iov_len);
270 
271  req->r_request = msg;
272  req->r_pages = pages;
273 #ifdef CONFIG_BLOCK
274  if (bio) {
275  req->r_bio = bio;
276  bio_get(req->r_bio);
277  }
278 #endif
279 
280  return req;
281 }
283 
284 static void osd_req_encode_op(struct ceph_osd_request *req,
285  struct ceph_osd_op *dst,
286  struct ceph_osd_req_op *src)
287 {
288  dst->op = cpu_to_le16(src->op);
289 
290  switch (src->op) {
291  case CEPH_OSD_OP_READ:
292  case CEPH_OSD_OP_WRITE:
293  dst->extent.offset =
294  cpu_to_le64(src->extent.offset);
295  dst->extent.length =
296  cpu_to_le64(src->extent.length);
297  dst->extent.truncate_size =
298  cpu_to_le64(src->extent.truncate_size);
299  dst->extent.truncate_seq =
300  cpu_to_le32(src->extent.truncate_seq);
301  break;
302 
306  BUG_ON(!req->r_trail);
307 
308  dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
309  dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
310  dst->xattr.cmp_op = src->xattr.cmp_op;
311  dst->xattr.cmp_mode = src->xattr.cmp_mode;
312  ceph_pagelist_append(req->r_trail, src->xattr.name,
313  src->xattr.name_len);
314  ceph_pagelist_append(req->r_trail, src->xattr.val,
315  src->xattr.value_len);
316  break;
317  case CEPH_OSD_OP_CALL:
318  BUG_ON(!req->r_trail);
319 
320  dst->cls.class_len = src->cls.class_len;
321  dst->cls.method_len = src->cls.method_len;
322  dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
323 
324  ceph_pagelist_append(req->r_trail, src->cls.class_name,
325  src->cls.class_len);
326  ceph_pagelist_append(req->r_trail, src->cls.method_name,
327  src->cls.method_len);
328  ceph_pagelist_append(req->r_trail, src->cls.indata,
329  src->cls.indata_len);
330  break;
332  dst->snap.snapid = cpu_to_le64(src->snap.snapid);
333  break;
335  break;
336  case CEPH_OSD_OP_NOTIFY:
337  {
338  __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
339  __le32 timeout = cpu_to_le32(src->watch.timeout);
340 
341  BUG_ON(!req->r_trail);
342 
344  &prot_ver, sizeof(prot_ver));
346  &timeout, sizeof(timeout));
347  }
349  case CEPH_OSD_OP_WATCH:
350  dst->watch.cookie = cpu_to_le64(src->watch.cookie);
351  dst->watch.ver = cpu_to_le64(src->watch.ver);
352  dst->watch.flag = src->watch.flag;
353  break;
354  default:
355  pr_err("unrecognized osd opcode %d\n", dst->op);
356  WARN_ON(1);
357  break;
358  }
359  dst->payload_len = cpu_to_le32(src->payload_len);
360 }
361 
362 /*
363  * build new request AND message
364  *
365  */
367  u64 off, u64 *plen,
368  struct ceph_osd_req_op *src_ops,
369  struct ceph_snap_context *snapc,
370  struct timespec *mtime,
371  const char *oid,
372  int oid_len)
373 {
374  struct ceph_msg *msg = req->r_request;
375  struct ceph_osd_request_head *head;
376  struct ceph_osd_req_op *src_op;
377  struct ceph_osd_op *op;
378  void *p;
379  int num_op = get_num_ops(src_ops, NULL);
380  size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
381  int flags = req->r_flags;
382  u64 data_len = 0;
383  int i;
384 
385  head = msg->front.iov_base;
386  op = (void *)(head + 1);
387  p = (void *)(op + num_op);
388 
389  req->r_snapc = ceph_get_snap_context(snapc);
390 
391  head->client_inc = cpu_to_le32(1); /* always, for now. */
392  head->flags = cpu_to_le32(flags);
393  if (flags & CEPH_OSD_FLAG_WRITE)
394  ceph_encode_timespec(&head->mtime, mtime);
395  head->num_ops = cpu_to_le16(num_op);
396 
397 
398  /* fill in oid */
399  head->object_len = cpu_to_le32(oid_len);
400  memcpy(p, oid, oid_len);
401  p += oid_len;
402 
403  src_op = src_ops;
404  while (src_op->op) {
405  osd_req_encode_op(req, op, src_op);
406  src_op++;
407  op++;
408  }
409 
410  if (req->r_trail)
411  data_len += req->r_trail->length;
412 
413  if (snapc) {
414  head->snap_seq = cpu_to_le64(snapc->seq);
415  head->num_snaps = cpu_to_le32(snapc->num_snaps);
416  for (i = 0; i < snapc->num_snaps; i++) {
417  put_unaligned_le64(snapc->snaps[i], p);
418  p += sizeof(u64);
419  }
420  }
421 
422  if (flags & CEPH_OSD_FLAG_WRITE) {
423  req->r_request->hdr.data_off = cpu_to_le16(off);
424  req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
425  } else if (data_len) {
426  req->r_request->hdr.data_off = 0;
427  req->r_request->hdr.data_len = cpu_to_le32(data_len);
428  }
429 
430  req->r_request->page_alignment = req->r_page_alignment;
431 
432  BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
433  msg_size = p - msg->front.iov_base;
434  msg->front.iov_len = msg_size;
435  msg->hdr.front_len = cpu_to_le32(msg_size);
436  return;
437 }
439 
440 /*
441  * build new request AND message, calculate layout, and adjust file
442  * extent as needed.
443  *
444  * if the file was recently truncated, we include information about its
445  * old and new size so that the object can be updated appropriately. (we
446  * avoid synchronously deleting truncated objects because it's slow.)
447  *
448  * if @do_sync, include a 'startsync' command so that the osd will flush
449  * data quickly.
450  */
452  struct ceph_file_layout *layout,
453  struct ceph_vino vino,
454  u64 off, u64 *plen,
455  int opcode, int flags,
456  struct ceph_snap_context *snapc,
457  int do_sync,
460  struct timespec *mtime,
461  bool use_mempool, int num_reply,
462  int page_align)
463 {
464  struct ceph_osd_req_op ops[3];
465  struct ceph_osd_request *req;
466  int r;
467 
468  ops[0].op = opcode;
469  ops[0].extent.truncate_seq = truncate_seq;
470  ops[0].extent.truncate_size = truncate_size;
471  ops[0].payload_len = 0;
472 
473  if (do_sync) {
474  ops[1].op = CEPH_OSD_OP_STARTSYNC;
475  ops[1].payload_len = 0;
476  ops[2].op = 0;
477  } else
478  ops[1].op = 0;
479 
480  req = ceph_osdc_alloc_request(osdc, flags,
481  snapc, ops,
482  use_mempool,
483  GFP_NOFS, NULL, NULL);
484  if (!req)
485  return ERR_PTR(-ENOMEM);
486 
487  /* calculate max write size */
488  r = calc_layout(osdc, vino, layout, off, plen, req, ops);
489  if (r < 0)
490  return ERR_PTR(r);
491  req->r_file_layout = *layout; /* keep a copy */
492 
493  /* in case it differs from natural (file) alignment that
494  calc_layout filled in for us */
495  req->r_num_pages = calc_pages_for(page_align, *plen);
496  req->r_page_alignment = page_align;
497 
498  ceph_osdc_build_request(req, off, plen, ops,
499  snapc,
500  mtime,
501  req->r_oid, req->r_oid_len);
502 
503  return req;
504 }
506 
507 /*
508  * We keep osd requests in an rbtree, sorted by ->r_tid.
509  */
510 static void __insert_request(struct ceph_osd_client *osdc,
511  struct ceph_osd_request *new)
512 {
513  struct rb_node **p = &osdc->requests.rb_node;
514  struct rb_node *parent = NULL;
515  struct ceph_osd_request *req = NULL;
516 
517  while (*p) {
518  parent = *p;
519  req = rb_entry(parent, struct ceph_osd_request, r_node);
520  if (new->r_tid < req->r_tid)
521  p = &(*p)->rb_left;
522  else if (new->r_tid > req->r_tid)
523  p = &(*p)->rb_right;
524  else
525  BUG();
526  }
527 
528  rb_link_node(&new->r_node, parent, p);
529  rb_insert_color(&new->r_node, &osdc->requests);
530 }
531 
532 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
533  u64 tid)
534 {
535  struct ceph_osd_request *req;
536  struct rb_node *n = osdc->requests.rb_node;
537 
538  while (n) {
539  req = rb_entry(n, struct ceph_osd_request, r_node);
540  if (tid < req->r_tid)
541  n = n->rb_left;
542  else if (tid > req->r_tid)
543  n = n->rb_right;
544  else
545  return req;
546  }
547  return NULL;
548 }
549 
550 static struct ceph_osd_request *
551 __lookup_request_ge(struct ceph_osd_client *osdc,
552  u64 tid)
553 {
554  struct ceph_osd_request *req;
555  struct rb_node *n = osdc->requests.rb_node;
556 
557  while (n) {
558  req = rb_entry(n, struct ceph_osd_request, r_node);
559  if (tid < req->r_tid) {
560  if (!n->rb_left)
561  return req;
562  n = n->rb_left;
563  } else if (tid > req->r_tid) {
564  n = n->rb_right;
565  } else {
566  return req;
567  }
568  }
569  return NULL;
570 }
571 
572 /*
573  * Resubmit requests pending on the given osd.
574  */
575 static void __kick_osd_requests(struct ceph_osd_client *osdc,
576  struct ceph_osd *osd)
577 {
578  struct ceph_osd_request *req, *nreq;
579  int err;
580 
581  dout("__kick_osd_requests osd%d\n", osd->o_osd);
582  err = __reset_osd(osdc, osd);
583  if (err == -EAGAIN)
584  return;
585 
587  list_move(&req->r_req_lru_item, &osdc->req_unsent);
588  dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
589  osd->o_osd);
590  if (!req->r_linger)
592  }
593 
595  r_linger_osd) {
596  /*
597  * reregister request prior to unregistering linger so
598  * that r_osd is preserved.
599  */
600  BUG_ON(!list_empty(&req->r_req_lru_item));
601  __register_request(osdc, req);
602  list_add(&req->r_req_lru_item, &osdc->req_unsent);
603  list_add(&req->r_osd_item, &req->r_osd->o_requests);
604  __unregister_linger_request(osdc, req);
605  dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
606  osd->o_osd);
607  }
608 }
609 
610 static void kick_osd_requests(struct ceph_osd_client *osdc,
611  struct ceph_osd *kickosd)
612 {
613  mutex_lock(&osdc->request_mutex);
614  __kick_osd_requests(osdc, kickosd);
615  mutex_unlock(&osdc->request_mutex);
616 }
617 
618 /*
619  * If the osd connection drops, we need to resubmit all requests.
620  */
621 static void osd_reset(struct ceph_connection *con)
622 {
623  struct ceph_osd *osd = con->private;
624  struct ceph_osd_client *osdc;
625 
626  if (!osd)
627  return;
628  dout("osd_reset osd%d\n", osd->o_osd);
629  osdc = osd->o_osdc;
630  down_read(&osdc->map_sem);
631  kick_osd_requests(osdc, osd);
632  send_queued(osdc);
633  up_read(&osdc->map_sem);
634 }
635 
636 /*
637  * Track open sessions with osds.
638  */
639 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
640 {
641  struct ceph_osd *osd;
642 
643  osd = kzalloc(sizeof(*osd), GFP_NOFS);
644  if (!osd)
645  return NULL;
646 
647  atomic_set(&osd->o_ref, 1);
648  osd->o_osdc = osdc;
649  osd->o_osd = onum;
650  INIT_LIST_HEAD(&osd->o_requests);
651  INIT_LIST_HEAD(&osd->o_linger_requests);
652  INIT_LIST_HEAD(&osd->o_osd_lru);
653  osd->o_incarnation = 1;
654 
655  ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
656 
657  INIT_LIST_HEAD(&osd->o_keepalive_item);
658  return osd;
659 }
660 
661 static struct ceph_osd *get_osd(struct ceph_osd *osd)
662 {
663  if (atomic_inc_not_zero(&osd->o_ref)) {
664  dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
665  atomic_read(&osd->o_ref));
666  return osd;
667  } else {
668  dout("get_osd %p FAIL\n", osd);
669  return NULL;
670  }
671 }
672 
673 static void put_osd(struct ceph_osd *osd)
674 {
675  dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
676  atomic_read(&osd->o_ref) - 1);
677  if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
678  struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
679 
680  if (ac->ops && ac->ops->destroy_authorizer)
681  ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
682  kfree(osd);
683  }
684 }
685 
686 /*
687  * remove an osd from our map
688  */
689 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
690 {
691  dout("__remove_osd %p\n", osd);
692  BUG_ON(!list_empty(&osd->o_requests));
693  rb_erase(&osd->o_node, &osdc->osds);
694  list_del_init(&osd->o_osd_lru);
695  ceph_con_close(&osd->o_con);
696  put_osd(osd);
697 }
698 
699 static void remove_all_osds(struct ceph_osd_client *osdc)
700 {
701  dout("%s %p\n", __func__, osdc);
702  mutex_lock(&osdc->request_mutex);
703  while (!RB_EMPTY_ROOT(&osdc->osds)) {
704  struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
705  struct ceph_osd, o_node);
706  __remove_osd(osdc, osd);
707  }
708  mutex_unlock(&osdc->request_mutex);
709 }
710 
711 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
712  struct ceph_osd *osd)
713 {
714  dout("__move_osd_to_lru %p\n", osd);
715  BUG_ON(!list_empty(&osd->o_osd_lru));
716  list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
717  osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
718 }
719 
720 static void __remove_osd_from_lru(struct ceph_osd *osd)
721 {
722  dout("__remove_osd_from_lru %p\n", osd);
723  if (!list_empty(&osd->o_osd_lru))
724  list_del_init(&osd->o_osd_lru);
725 }
726 
727 static void remove_old_osds(struct ceph_osd_client *osdc)
728 {
729  struct ceph_osd *osd, *nosd;
730 
731  dout("__remove_old_osds %p\n", osdc);
732  mutex_lock(&osdc->request_mutex);
733  list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
734  if (time_before(jiffies, osd->lru_ttl))
735  break;
736  __remove_osd(osdc, osd);
737  }
738  mutex_unlock(&osdc->request_mutex);
739 }
740 
741 /*
742  * reset osd connect
743  */
744 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
745 {
746  struct ceph_osd_request *req;
747  int ret = 0;
748 
749  dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
750  if (list_empty(&osd->o_requests) &&
751  list_empty(&osd->o_linger_requests)) {
752  __remove_osd(osdc, osd);
753  } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
754  &osd->o_con.peer_addr,
755  sizeof(osd->o_con.peer_addr)) == 0 &&
756  !ceph_con_opened(&osd->o_con)) {
757  dout(" osd addr hasn't changed and connection never opened,"
758  " letting msgr retry");
759  /* touch each r_stamp for handle_timeout()'s benfit */
761  req->r_stamp = jiffies;
762  ret = -EAGAIN;
763  } else {
764  ceph_con_close(&osd->o_con);
766  &osdc->osdmap->osd_addr[osd->o_osd]);
767  osd->o_incarnation++;
768  }
769  return ret;
770 }
771 
772 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
773 {
774  struct rb_node **p = &osdc->osds.rb_node;
775  struct rb_node *parent = NULL;
776  struct ceph_osd *osd = NULL;
777 
778  dout("__insert_osd %p osd%d\n", new, new->o_osd);
779  while (*p) {
780  parent = *p;
781  osd = rb_entry(parent, struct ceph_osd, o_node);
782  if (new->o_osd < osd->o_osd)
783  p = &(*p)->rb_left;
784  else if (new->o_osd > osd->o_osd)
785  p = &(*p)->rb_right;
786  else
787  BUG();
788  }
789 
790  rb_link_node(&new->o_node, parent, p);
791  rb_insert_color(&new->o_node, &osdc->osds);
792 }
793 
794 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
795 {
796  struct ceph_osd *osd;
797  struct rb_node *n = osdc->osds.rb_node;
798 
799  while (n) {
800  osd = rb_entry(n, struct ceph_osd, o_node);
801  if (o < osd->o_osd)
802  n = n->rb_left;
803  else if (o > osd->o_osd)
804  n = n->rb_right;
805  else
806  return osd;
807  }
808  return NULL;
809 }
810 
811 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
812 {
814  osdc->client->options->osd_keepalive_timeout * HZ);
815 }
816 
817 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
818 {
820 }
821 
822 /*
823  * Register request, assign tid. If this is the first request, set up
824  * the timeout event.
825  */
826 static void __register_request(struct ceph_osd_client *osdc,
827  struct ceph_osd_request *req)
828 {
829  req->r_tid = ++osdc->last_tid;
830  req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
831  dout("__register_request %p tid %lld\n", req, req->r_tid);
832  __insert_request(osdc, req);
833  ceph_osdc_get_request(req);
834  osdc->num_requests++;
835  if (osdc->num_requests == 1) {
836  dout(" first request, scheduling timeout\n");
837  __schedule_osd_timeout(osdc);
838  }
839 }
840 
841 static void register_request(struct ceph_osd_client *osdc,
842  struct ceph_osd_request *req)
843 {
844  mutex_lock(&osdc->request_mutex);
845  __register_request(osdc, req);
846  mutex_unlock(&osdc->request_mutex);
847 }
848 
849 /*
850  * called under osdc->request_mutex
851  */
852 static void __unregister_request(struct ceph_osd_client *osdc,
853  struct ceph_osd_request *req)
854 {
855  if (RB_EMPTY_NODE(&req->r_node)) {
856  dout("__unregister_request %p tid %lld not registered\n",
857  req, req->r_tid);
858  return;
859  }
860 
861  dout("__unregister_request %p tid %lld\n", req, req->r_tid);
862  rb_erase(&req->r_node, &osdc->requests);
863  osdc->num_requests--;
864 
865  if (req->r_osd) {
866  /* make sure the original request isn't in flight. */
868 
869  list_del_init(&req->r_osd_item);
870  if (list_empty(&req->r_osd->o_requests) &&
871  list_empty(&req->r_osd->o_linger_requests)) {
872  dout("moving osd to %p lru\n", req->r_osd);
873  __move_osd_to_lru(osdc, req->r_osd);
874  }
875  if (list_empty(&req->r_linger_item))
876  req->r_osd = NULL;
877  }
878 
879  ceph_osdc_put_request(req);
880 
881  list_del_init(&req->r_req_lru_item);
882  if (osdc->num_requests == 0) {
883  dout(" no requests, canceling timeout\n");
884  __cancel_osd_timeout(osdc);
885  }
886 }
887 
888 /*
889  * Cancel a previously queued request message
890  */
891 static void __cancel_request(struct ceph_osd_request *req)
892 {
893  if (req->r_sent && req->r_osd) {
895  req->r_sent = 0;
896  }
897 }
898 
899 static void __register_linger_request(struct ceph_osd_client *osdc,
900  struct ceph_osd_request *req)
901 {
902  dout("__register_linger_request %p\n", req);
903  list_add_tail(&req->r_linger_item, &osdc->req_linger);
904  if (req->r_osd)
906  &req->r_osd->o_linger_requests);
907 }
908 
909 static void __unregister_linger_request(struct ceph_osd_client *osdc,
910  struct ceph_osd_request *req)
911 {
912  dout("__unregister_linger_request %p\n", req);
913  if (req->r_osd) {
914  list_del_init(&req->r_linger_item);
915  list_del_init(&req->r_linger_osd);
916 
917  if (list_empty(&req->r_osd->o_requests) &&
918  list_empty(&req->r_osd->o_linger_requests)) {
919  dout("moving osd to %p lru\n", req->r_osd);
920  __move_osd_to_lru(osdc, req->r_osd);
921  }
922  if (list_empty(&req->r_osd_item))
923  req->r_osd = NULL;
924  }
925 }
926 
928  struct ceph_osd_request *req)
929 {
930  mutex_lock(&osdc->request_mutex);
931  if (req->r_linger) {
932  __unregister_linger_request(osdc, req);
933  ceph_osdc_put_request(req);
934  }
935  mutex_unlock(&osdc->request_mutex);
936 }
938 
940  struct ceph_osd_request *req)
941 {
942  if (!req->r_linger) {
943  dout("set_request_linger %p\n", req);
944  req->r_linger = 1;
945  /*
946  * caller is now responsible for calling
947  * unregister_linger_request
948  */
949  ceph_osdc_get_request(req);
950  }
951 }
953 
954 /*
955  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
956  * (as needed), and set the request r_osd appropriately. If there is
957  * no up osd, set r_osd to NULL. Move the request to the appropriate list
958  * (unsent, homeless) or leave on in-flight lru.
959  *
960  * Return 0 if unchanged, 1 if changed, or negative on error.
961  *
962  * Caller should hold map_sem for read and request_mutex.
963  */
964 static int __map_request(struct ceph_osd_client *osdc,
965  struct ceph_osd_request *req, int force_resend)
966 {
967  struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
968  struct ceph_pg pgid;
969  int acting[CEPH_PG_MAX_SIZE];
970  int o = -1, num = 0;
971  int err;
972 
973  dout("map_request %p tid %lld\n", req, req->r_tid);
974  err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
975  &req->r_file_layout, osdc->osdmap);
976  if (err) {
977  list_move(&req->r_req_lru_item, &osdc->req_notarget);
978  return err;
979  }
980  pgid = reqhead->layout.ol_pgid;
981  req->r_pgid = pgid;
982 
983  err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
984  if (err > 0) {
985  o = acting[0];
986  num = err;
987  }
988 
989  if ((!force_resend &&
990  req->r_osd && req->r_osd->o_osd == o &&
991  req->r_sent >= req->r_osd->o_incarnation &&
992  req->r_num_pg_osds == num &&
993  memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
994  (req->r_osd == NULL && o == -1))
995  return 0; /* no change */
996 
997  dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
998  req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
999  req->r_osd ? req->r_osd->o_osd : -1);
1000 
1001  /* record full pg acting set */
1002  memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
1003  req->r_num_pg_osds = num;
1004 
1005  if (req->r_osd) {
1006  __cancel_request(req);
1007  list_del_init(&req->r_osd_item);
1008  req->r_osd = NULL;
1009  }
1010 
1011  req->r_osd = __lookup_osd(osdc, o);
1012  if (!req->r_osd && o >= 0) {
1013  err = -ENOMEM;
1014  req->r_osd = create_osd(osdc, o);
1015  if (!req->r_osd) {
1016  list_move(&req->r_req_lru_item, &osdc->req_notarget);
1017  goto out;
1018  }
1019 
1020  dout("map_request osd %p is osd%d\n", req->r_osd, o);
1021  __insert_osd(osdc, req->r_osd);
1022 
1023  ceph_con_open(&req->r_osd->o_con,
1025  &osdc->osdmap->osd_addr[o]);
1026  }
1027 
1028  if (req->r_osd) {
1029  __remove_osd_from_lru(req->r_osd);
1030  list_add(&req->r_osd_item, &req->r_osd->o_requests);
1031  list_move(&req->r_req_lru_item, &osdc->req_unsent);
1032  } else {
1033  list_move(&req->r_req_lru_item, &osdc->req_notarget);
1034  }
1035  err = 1; /* osd or pg changed */
1036 
1037 out:
1038  return err;
1039 }
1040 
1041 /*
1042  * caller should hold map_sem (for read) and request_mutex
1043  */
1044 static void __send_request(struct ceph_osd_client *osdc,
1045  struct ceph_osd_request *req)
1046 {
1047  struct ceph_osd_request_head *reqhead;
1048 
1049  dout("send_request %p tid %llu to osd%d flags %d\n",
1050  req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1051 
1052  reqhead = req->r_request->front.iov_base;
1053  reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1054  reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
1055  reqhead->reassert_version = req->r_reassert_version;
1056 
1057  req->r_stamp = jiffies;
1058  list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1059 
1060  ceph_msg_get(req->r_request); /* send consumes a ref */
1061  ceph_con_send(&req->r_osd->o_con, req->r_request);
1062  req->r_sent = req->r_osd->o_incarnation;
1063 }
1064 
1065 /*
1066  * Send any requests in the queue (req_unsent).
1067  */
1068 static void send_queued(struct ceph_osd_client *osdc)
1069 {
1070  struct ceph_osd_request *req, *tmp;
1071 
1072  dout("send_queued\n");
1073  mutex_lock(&osdc->request_mutex);
1075  __send_request(osdc, req);
1076  }
1077  mutex_unlock(&osdc->request_mutex);
1078 }
1079 
1080 /*
1081  * Timeout callback, called every N seconds when 1 or more osd
1082  * requests has been active for more than N seconds. When this
1083  * happens, we ping all OSDs with requests who have timed out to
1084  * ensure any communications channel reset is detected. Reset the
1085  * request timeouts another N seconds in the future as we go.
1086  * Reschedule the timeout event another N seconds in future (unless
1087  * there are no open requests).
1088  */
1089 static void handle_timeout(struct work_struct *work)
1090 {
1091  struct ceph_osd_client *osdc =
1092  container_of(work, struct ceph_osd_client, timeout_work.work);
1093  struct ceph_osd_request *req, *last_req = NULL;
1094  struct ceph_osd *osd;
1095  unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1096  unsigned long keepalive =
1097  osdc->client->options->osd_keepalive_timeout * HZ;
1098  unsigned long last_stamp = 0;
1099  struct list_head slow_osds;
1100  dout("timeout\n");
1101  down_read(&osdc->map_sem);
1102 
1103  ceph_monc_request_next_osdmap(&osdc->client->monc);
1104 
1105  mutex_lock(&osdc->request_mutex);
1106 
1107  /*
1108  * reset osds that appear to be _really_ unresponsive. this
1109  * is a failsafe measure.. we really shouldn't be getting to
1110  * this point if the system is working properly. the monitors
1111  * should mark the osd as failed and we should find out about
1112  * it from an updated osd map.
1113  */
1114  while (timeout && !list_empty(&osdc->req_lru)) {
1115  req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1116  r_req_lru_item);
1117 
1118  /* hasn't been long enough since we sent it? */
1119  if (time_before(jiffies, req->r_stamp + timeout))
1120  break;
1121 
1122  /* hasn't been long enough since it was acked? */
1123  if (req->r_request->ack_stamp == 0 ||
1124  time_before(jiffies, req->r_request->ack_stamp + timeout))
1125  break;
1126 
1127  BUG_ON(req == last_req && req->r_stamp == last_stamp);
1128  last_req = req;
1129  last_stamp = req->r_stamp;
1130 
1131  osd = req->r_osd;
1132  BUG_ON(!osd);
1133  pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1134  req->r_tid, osd->o_osd);
1135  __kick_osd_requests(osdc, osd);
1136  }
1137 
1138  /*
1139  * ping osds that are a bit slow. this ensures that if there
1140  * is a break in the TCP connection we will notice, and reopen
1141  * a connection with that osd (from the fault callback).
1142  */
1143  INIT_LIST_HEAD(&slow_osds);
1144  list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1145  if (time_before(jiffies, req->r_stamp + keepalive))
1146  break;
1147 
1148  osd = req->r_osd;
1149  BUG_ON(!osd);
1150  dout(" tid %llu is slow, will send keepalive on osd%d\n",
1151  req->r_tid, osd->o_osd);
1152  list_move_tail(&osd->o_keepalive_item, &slow_osds);
1153  }
1154  while (!list_empty(&slow_osds)) {
1155  osd = list_entry(slow_osds.next, struct ceph_osd,
1156  o_keepalive_item);
1157  list_del_init(&osd->o_keepalive_item);
1158  ceph_con_keepalive(&osd->o_con);
1159  }
1160 
1161  __schedule_osd_timeout(osdc);
1162  mutex_unlock(&osdc->request_mutex);
1163  send_queued(osdc);
1164  up_read(&osdc->map_sem);
1165 }
1166 
1167 static void handle_osds_timeout(struct work_struct *work)
1168 {
1169  struct ceph_osd_client *osdc =
1170  container_of(work, struct ceph_osd_client,
1171  osds_timeout_work.work);
1172  unsigned long delay =
1173  osdc->client->options->osd_idle_ttl * HZ >> 2;
1174 
1175  dout("osds timeout\n");
1176  down_read(&osdc->map_sem);
1177  remove_old_osds(osdc);
1178  up_read(&osdc->map_sem);
1179 
1181  round_jiffies_relative(delay));
1182 }
1183 
1184 static void complete_request(struct ceph_osd_request *req)
1185 {
1186  if (req->r_safe_callback)
1187  req->r_safe_callback(req, NULL);
1188  complete_all(&req->r_safe_completion); /* fsync waiter */
1189 }
1190 
1191 /*
1192  * handle osd op reply. either call the callback if it is specified,
1193  * or do the completion to wake up the waiting thread.
1194  */
1195 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1196  struct ceph_connection *con)
1197 {
1198  struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1199  struct ceph_osd_request *req;
1200  u64 tid;
1201  int numops, object_len, flags;
1202  s32 result;
1203 
1204  tid = le64_to_cpu(msg->hdr.tid);
1205  if (msg->front.iov_len < sizeof(*rhead))
1206  goto bad;
1207  numops = le32_to_cpu(rhead->num_ops);
1208  object_len = le32_to_cpu(rhead->object_len);
1209  result = le32_to_cpu(rhead->result);
1210  if (msg->front.iov_len != sizeof(*rhead) + object_len +
1211  numops * sizeof(struct ceph_osd_op))
1212  goto bad;
1213  dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1214  /* lookup */
1215  mutex_lock(&osdc->request_mutex);
1216  req = __lookup_request(osdc, tid);
1217  if (req == NULL) {
1218  dout("handle_reply tid %llu dne\n", tid);
1219  mutex_unlock(&osdc->request_mutex);
1220  return;
1221  }
1222  ceph_osdc_get_request(req);
1223  flags = le32_to_cpu(rhead->flags);
1224 
1225  /*
1226  * if this connection filled our message, drop our reference now, to
1227  * avoid a (safe but slower) revoke later.
1228  */
1229  if (req->r_con_filling_msg == con && req->r_reply == msg) {
1230  dout(" dropping con_filling_msg ref %p\n", con);
1231  req->r_con_filling_msg = NULL;
1232  con->ops->put(con);
1233  }
1234 
1235  if (!req->r_got_reply) {
1236  unsigned int bytes;
1237 
1238  req->r_result = le32_to_cpu(rhead->result);
1239  bytes = le32_to_cpu(msg->hdr.data_len);
1240  dout("handle_reply result %d bytes %d\n", req->r_result,
1241  bytes);
1242  if (req->r_result == 0)
1243  req->r_result = bytes;
1244 
1245  /* in case this is a write and we need to replay, */
1246  req->r_reassert_version = rhead->reassert_version;
1247 
1248  req->r_got_reply = 1;
1249  } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1250  dout("handle_reply tid %llu dup ack\n", tid);
1251  mutex_unlock(&osdc->request_mutex);
1252  goto done;
1253  }
1254 
1255  dout("handle_reply tid %llu flags %d\n", tid, flags);
1256 
1257  if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1258  __register_linger_request(osdc, req);
1259 
1260  /* either this is a read, or we got the safe response */
1261  if (result < 0 ||
1262  (flags & CEPH_OSD_FLAG_ONDISK) ||
1263  ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1264  __unregister_request(osdc, req);
1265 
1266  mutex_unlock(&osdc->request_mutex);
1267 
1268  if (req->r_callback)
1269  req->r_callback(req, msg);
1270  else
1271  complete_all(&req->r_completion);
1272 
1273  if (flags & CEPH_OSD_FLAG_ONDISK)
1274  complete_request(req);
1275 
1276 done:
1277  dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1278  ceph_osdc_put_request(req);
1279  return;
1280 
1281 bad:
1282  pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1283  (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1284  (int)sizeof(*rhead));
1285  ceph_msg_dump(msg);
1286 }
1287 
1288 static void reset_changed_osds(struct ceph_osd_client *osdc)
1289 {
1290  struct rb_node *p, *n;
1291 
1292  for (p = rb_first(&osdc->osds); p; p = n) {
1293  struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1294 
1295  n = rb_next(p);
1296  if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1297  memcmp(&osd->o_con.peer_addr,
1298  ceph_osd_addr(osdc->osdmap,
1299  osd->o_osd),
1300  sizeof(struct ceph_entity_addr)) != 0)
1301  __reset_osd(osdc, osd);
1302  }
1303 }
1304 
1305 /*
1306  * Requeue requests whose mapping to an OSD has changed. If requests map to
1307  * no osd, request a new map.
1308  *
1309  * Caller should hold map_sem for read and request_mutex.
1310  */
1311 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1312 {
1313  struct ceph_osd_request *req, *nreq;
1314  struct rb_node *p;
1315  int needmap = 0;
1316  int err;
1317 
1318  dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1319  mutex_lock(&osdc->request_mutex);
1320  for (p = rb_first(&osdc->requests); p; ) {
1321  req = rb_entry(p, struct ceph_osd_request, r_node);
1322  p = rb_next(p);
1323  err = __map_request(osdc, req, force_resend);
1324  if (err < 0)
1325  continue; /* error */
1326  if (req->r_osd == NULL) {
1327  dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1328  needmap++; /* request a newer map */
1329  } else if (err > 0) {
1330  if (!req->r_linger) {
1331  dout("%p tid %llu requeued on osd%d\n", req,
1332  req->r_tid,
1333  req->r_osd ? req->r_osd->o_osd : -1);
1334  req->r_flags |= CEPH_OSD_FLAG_RETRY;
1335  }
1336  }
1337  if (req->r_linger && list_empty(&req->r_linger_item)) {
1338  /*
1339  * register as a linger so that we will
1340  * re-submit below and get a new tid
1341  */
1342  dout("%p tid %llu restart on osd%d\n",
1343  req, req->r_tid,
1344  req->r_osd ? req->r_osd->o_osd : -1);
1345  __register_linger_request(osdc, req);
1346  __unregister_request(osdc, req);
1347  }
1348  }
1349 
1350  list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1351  r_linger_item) {
1352  dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1353 
1354  err = __map_request(osdc, req, force_resend);
1355  if (err == 0)
1356  continue; /* no change and no osd was specified */
1357  if (err < 0)
1358  continue; /* hrm! */
1359  if (req->r_osd == NULL) {
1360  dout("tid %llu maps to no valid osd\n", req->r_tid);
1361  needmap++; /* request a newer map */
1362  continue;
1363  }
1364 
1365  dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1366  req->r_osd ? req->r_osd->o_osd : -1);
1367  __unregister_linger_request(osdc, req);
1368  __register_request(osdc, req);
1369  }
1370  mutex_unlock(&osdc->request_mutex);
1371 
1372  if (needmap) {
1373  dout("%d requests for down osds, need new map\n", needmap);
1374  ceph_monc_request_next_osdmap(&osdc->client->monc);
1375  }
1376 }
1377 
1378 
1379 /*
1380  * Process updated osd map.
1381  *
1382  * The message contains any number of incremental and full maps, normally
1383  * indicating some sort of topology change in the cluster. Kick requests
1384  * off to different OSDs as needed.
1385  */
1386 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1387 {
1388  void *p, *end, *next;
1389  u32 nr_maps, maplen;
1390  u32 epoch;
1391  struct ceph_osdmap *newmap = NULL, *oldmap;
1392  int err;
1393  struct ceph_fsid fsid;
1394 
1395  dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1396  p = msg->front.iov_base;
1397  end = p + msg->front.iov_len;
1398 
1399  /* verify fsid */
1400  ceph_decode_need(&p, end, sizeof(fsid), bad);
1401  ceph_decode_copy(&p, &fsid, sizeof(fsid));
1402  if (ceph_check_fsid(osdc->client, &fsid) < 0)
1403  return;
1404 
1405  down_write(&osdc->map_sem);
1406 
1407  /* incremental maps */
1408  ceph_decode_32_safe(&p, end, nr_maps, bad);
1409  dout(" %d inc maps\n", nr_maps);
1410  while (nr_maps > 0) {
1411  ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1412  epoch = ceph_decode_32(&p);
1413  maplen = ceph_decode_32(&p);
1414  ceph_decode_need(&p, end, maplen, bad);
1415  next = p + maplen;
1416  if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1417  dout("applying incremental map %u len %d\n",
1418  epoch, maplen);
1419  newmap = osdmap_apply_incremental(&p, next,
1420  osdc->osdmap,
1421  &osdc->client->msgr);
1422  if (IS_ERR(newmap)) {
1423  err = PTR_ERR(newmap);
1424  goto bad;
1425  }
1426  BUG_ON(!newmap);
1427  if (newmap != osdc->osdmap) {
1428  ceph_osdmap_destroy(osdc->osdmap);
1429  osdc->osdmap = newmap;
1430  }
1431  kick_requests(osdc, 0);
1432  reset_changed_osds(osdc);
1433  } else {
1434  dout("ignoring incremental map %u len %d\n",
1435  epoch, maplen);
1436  }
1437  p = next;
1438  nr_maps--;
1439  }
1440  if (newmap)
1441  goto done;
1442 
1443  /* full maps */
1444  ceph_decode_32_safe(&p, end, nr_maps, bad);
1445  dout(" %d full maps\n", nr_maps);
1446  while (nr_maps) {
1447  ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1448  epoch = ceph_decode_32(&p);
1449  maplen = ceph_decode_32(&p);
1450  ceph_decode_need(&p, end, maplen, bad);
1451  if (nr_maps > 1) {
1452  dout("skipping non-latest full map %u len %d\n",
1453  epoch, maplen);
1454  } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1455  dout("skipping full map %u len %d, "
1456  "older than our %u\n", epoch, maplen,
1457  osdc->osdmap->epoch);
1458  } else {
1459  int skipped_map = 0;
1460 
1461  dout("taking full map %u len %d\n", epoch, maplen);
1462  newmap = osdmap_decode(&p, p+maplen);
1463  if (IS_ERR(newmap)) {
1464  err = PTR_ERR(newmap);
1465  goto bad;
1466  }
1467  BUG_ON(!newmap);
1468  oldmap = osdc->osdmap;
1469  osdc->osdmap = newmap;
1470  if (oldmap) {
1471  if (oldmap->epoch + 1 < newmap->epoch)
1472  skipped_map = 1;
1473  ceph_osdmap_destroy(oldmap);
1474  }
1475  kick_requests(osdc, skipped_map);
1476  }
1477  p += maplen;
1478  nr_maps--;
1479  }
1480 
1481 done:
1482  downgrade_write(&osdc->map_sem);
1483  ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1484 
1485  /*
1486  * subscribe to subsequent osdmap updates if full to ensure
1487  * we find out when we are no longer full and stop returning
1488  * ENOSPC.
1489  */
1490  if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1491  ceph_monc_request_next_osdmap(&osdc->client->monc);
1492 
1493  send_queued(osdc);
1494  up_read(&osdc->map_sem);
1495  wake_up_all(&osdc->client->auth_wq);
1496  return;
1497 
1498 bad:
1499  pr_err("osdc handle_map corrupt msg\n");
1500  ceph_msg_dump(msg);
1501  up_write(&osdc->map_sem);
1502  return;
1503 }
1504 
1505 /*
1506  * watch/notify callback event infrastructure
1507  *
1508  * These callbacks are used both for watch and notify operations.
1509  */
1510 static void __release_event(struct kref *kref)
1511 {
1512  struct ceph_osd_event *event =
1513  container_of(kref, struct ceph_osd_event, kref);
1514 
1515  dout("__release_event %p\n", event);
1516  kfree(event);
1517 }
1518 
1519 static void get_event(struct ceph_osd_event *event)
1520 {
1521  kref_get(&event->kref);
1522 }
1523 
1525 {
1526  kref_put(&event->kref, __release_event);
1527 }
1529 
1530 static void __insert_event(struct ceph_osd_client *osdc,
1531  struct ceph_osd_event *new)
1532 {
1533  struct rb_node **p = &osdc->event_tree.rb_node;
1534  struct rb_node *parent = NULL;
1535  struct ceph_osd_event *event = NULL;
1536 
1537  while (*p) {
1538  parent = *p;
1539  event = rb_entry(parent, struct ceph_osd_event, node);
1540  if (new->cookie < event->cookie)
1541  p = &(*p)->rb_left;
1542  else if (new->cookie > event->cookie)
1543  p = &(*p)->rb_right;
1544  else
1545  BUG();
1546  }
1547 
1548  rb_link_node(&new->node, parent, p);
1549  rb_insert_color(&new->node, &osdc->event_tree);
1550 }
1551 
1552 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1553  u64 cookie)
1554 {
1555  struct rb_node **p = &osdc->event_tree.rb_node;
1556  struct rb_node *parent = NULL;
1557  struct ceph_osd_event *event = NULL;
1558 
1559  while (*p) {
1560  parent = *p;
1561  event = rb_entry(parent, struct ceph_osd_event, node);
1562  if (cookie < event->cookie)
1563  p = &(*p)->rb_left;
1564  else if (cookie > event->cookie)
1565  p = &(*p)->rb_right;
1566  else
1567  return event;
1568  }
1569  return NULL;
1570 }
1571 
1572 static void __remove_event(struct ceph_osd_event *event)
1573 {
1574  struct ceph_osd_client *osdc = event->osdc;
1575 
1576  if (!RB_EMPTY_NODE(&event->node)) {
1577  dout("__remove_event removed %p\n", event);
1578  rb_erase(&event->node, &osdc->event_tree);
1579  ceph_osdc_put_event(event);
1580  } else {
1581  dout("__remove_event didn't remove %p\n", event);
1582  }
1583 }
1584 
1586  void (*event_cb)(u64, u64, u8, void *),
1587  int one_shot, void *data,
1588  struct ceph_osd_event **pevent)
1589 {
1590  struct ceph_osd_event *event;
1591 
1592  event = kmalloc(sizeof(*event), GFP_NOIO);
1593  if (!event)
1594  return -ENOMEM;
1595 
1596  dout("create_event %p\n", event);
1597  event->cb = event_cb;
1598  event->one_shot = one_shot;
1599  event->data = data;
1600  event->osdc = osdc;
1601  INIT_LIST_HEAD(&event->osd_node);
1602  kref_init(&event->kref); /* one ref for us */
1603  kref_get(&event->kref); /* one ref for the caller */
1604  init_completion(&event->completion);
1605 
1606  spin_lock(&osdc->event_lock);
1607  event->cookie = ++osdc->event_count;
1608  __insert_event(osdc, event);
1609  spin_unlock(&osdc->event_lock);
1610 
1611  *pevent = event;
1612  return 0;
1613 }
1615 
1617 {
1618  struct ceph_osd_client *osdc = event->osdc;
1619 
1620  dout("cancel_event %p\n", event);
1621  spin_lock(&osdc->event_lock);
1622  __remove_event(event);
1623  spin_unlock(&osdc->event_lock);
1624  ceph_osdc_put_event(event); /* caller's */
1625 }
1627 
1628 
1629 static void do_event_work(struct work_struct *work)
1630 {
1632  container_of(work, struct ceph_osd_event_work, work);
1633  struct ceph_osd_event *event = event_work->event;
1634  u64 ver = event_work->ver;
1635  u64 notify_id = event_work->notify_id;
1636  u8 opcode = event_work->opcode;
1637 
1638  dout("do_event_work completing %p\n", event);
1639  event->cb(ver, notify_id, opcode, event->data);
1640  complete(&event->completion);
1641  dout("do_event_work completed %p\n", event);
1642  ceph_osdc_put_event(event);
1643  kfree(event_work);
1644 }
1645 
1646 
1647 /*
1648  * Process osd watch notifications
1649  */
1650 void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1651 {
1652  void *p, *end;
1653  u8 proto_ver;
1654  u64 cookie, ver, notify_id;
1655  u8 opcode;
1656  struct ceph_osd_event *event;
1658 
1659  p = msg->front.iov_base;
1660  end = p + msg->front.iov_len;
1661 
1662  ceph_decode_8_safe(&p, end, proto_ver, bad);
1663  ceph_decode_8_safe(&p, end, opcode, bad);
1664  ceph_decode_64_safe(&p, end, cookie, bad);
1665  ceph_decode_64_safe(&p, end, ver, bad);
1666  ceph_decode_64_safe(&p, end, notify_id, bad);
1667 
1668  spin_lock(&osdc->event_lock);
1669  event = __find_event(osdc, cookie);
1670  if (event) {
1671  get_event(event);
1672  if (event->one_shot)
1673  __remove_event(event);
1674  }
1675  spin_unlock(&osdc->event_lock);
1676  dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1677  cookie, ver, event);
1678  if (event) {
1679  event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1680  if (!event_work) {
1681  dout("ERROR: could not allocate event_work\n");
1682  goto done_err;
1683  }
1684  INIT_WORK(&event_work->work, do_event_work);
1685  event_work->event = event;
1686  event_work->ver = ver;
1687  event_work->notify_id = notify_id;
1688  event_work->opcode = opcode;
1689  if (!queue_work(osdc->notify_wq, &event_work->work)) {
1690  dout("WARNING: failed to queue notify event work\n");
1691  goto done_err;
1692  }
1693  }
1694 
1695  return;
1696 
1697 done_err:
1698  complete(&event->completion);
1699  ceph_osdc_put_event(event);
1700  return;
1701 
1702 bad:
1703  pr_err("osdc handle_watch_notify corrupt msg\n");
1704  return;
1705 }
1706 
1707 int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1708 {
1709  int err;
1710 
1711  dout("wait_event %p\n", event);
1713  timeout * HZ);
1714  ceph_osdc_put_event(event);
1715  if (err > 0)
1716  err = 0;
1717  dout("wait_event %p returns %d\n", event, err);
1718  return err;
1719 }
1721 
1722 /*
1723  * Register request, send initial attempt.
1724  */
1726  struct ceph_osd_request *req,
1727  bool nofail)
1728 {
1729  int rc = 0;
1730 
1731  req->r_request->pages = req->r_pages;
1732  req->r_request->nr_pages = req->r_num_pages;
1733 #ifdef CONFIG_BLOCK
1734  req->r_request->bio = req->r_bio;
1735 #endif
1736  req->r_request->trail = req->r_trail;
1737 
1738  register_request(osdc, req);
1739 
1740  down_read(&osdc->map_sem);
1741  mutex_lock(&osdc->request_mutex);
1742  /*
1743  * a racing kick_requests() may have sent the message for us
1744  * while we dropped request_mutex above, so only send now if
1745  * the request still han't been touched yet.
1746  */
1747  if (req->r_sent == 0) {
1748  rc = __map_request(osdc, req, 0);
1749  if (rc < 0) {
1750  if (nofail) {
1751  dout("osdc_start_request failed map, "
1752  " will retry %lld\n", req->r_tid);
1753  rc = 0;
1754  }
1755  goto out_unlock;
1756  }
1757  if (req->r_osd == NULL) {
1758  dout("send_request %p no up osds in pg\n", req);
1759  ceph_monc_request_next_osdmap(&osdc->client->monc);
1760  } else {
1761  __send_request(osdc, req);
1762  }
1763  rc = 0;
1764  }
1765 
1766 out_unlock:
1767  mutex_unlock(&osdc->request_mutex);
1768  up_read(&osdc->map_sem);
1769  return rc;
1770 }
1772 
1773 /*
1774  * wait for a request to complete
1775  */
1777  struct ceph_osd_request *req)
1778 {
1779  int rc;
1780 
1781  rc = wait_for_completion_interruptible(&req->r_completion);
1782  if (rc < 0) {
1783  mutex_lock(&osdc->request_mutex);
1784  __cancel_request(req);
1785  __unregister_request(osdc, req);
1786  mutex_unlock(&osdc->request_mutex);
1787  complete_request(req);
1788  dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1789  return rc;
1790  }
1791 
1792  dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1793  return req->r_result;
1794 }
1796 
1797 /*
1798  * sync - wait for all in-flight requests to flush. avoid starvation.
1799  */
1801 {
1802  struct ceph_osd_request *req;
1803  u64 last_tid, next_tid = 0;
1804 
1805  mutex_lock(&osdc->request_mutex);
1806  last_tid = osdc->last_tid;
1807  while (1) {
1808  req = __lookup_request_ge(osdc, next_tid);
1809  if (!req)
1810  break;
1811  if (req->r_tid > last_tid)
1812  break;
1813 
1814  next_tid = req->r_tid + 1;
1815  if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1816  continue;
1817 
1818  ceph_osdc_get_request(req);
1819  mutex_unlock(&osdc->request_mutex);
1820  dout("sync waiting on tid %llu (last is %llu)\n",
1821  req->r_tid, last_tid);
1823  mutex_lock(&osdc->request_mutex);
1824  ceph_osdc_put_request(req);
1825  }
1826  mutex_unlock(&osdc->request_mutex);
1827  dout("sync done (thru tid %llu)\n", last_tid);
1828 }
1830 
1831 /*
1832  * init, shutdown
1833  */
1835 {
1836  int err;
1837 
1838  dout("init\n");
1839  osdc->client = client;
1840  osdc->osdmap = NULL;
1841  init_rwsem(&osdc->map_sem);
1842  init_completion(&osdc->map_waiters);
1843  osdc->last_requested_map = 0;
1844  mutex_init(&osdc->request_mutex);
1845  osdc->last_tid = 0;
1846  osdc->osds = RB_ROOT;
1847  INIT_LIST_HEAD(&osdc->osd_lru);
1848  osdc->requests = RB_ROOT;
1849  INIT_LIST_HEAD(&osdc->req_lru);
1850  INIT_LIST_HEAD(&osdc->req_unsent);
1851  INIT_LIST_HEAD(&osdc->req_notarget);
1852  INIT_LIST_HEAD(&osdc->req_linger);
1853  osdc->num_requests = 0;
1854  INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1855  INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1856  spin_lock_init(&osdc->event_lock);
1857  osdc->event_tree = RB_ROOT;
1858  osdc->event_count = 0;
1859 
1861  round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1862 
1863  err = -ENOMEM;
1864  osdc->req_mempool = mempool_create_kmalloc_pool(10,
1865  sizeof(struct ceph_osd_request));
1866  if (!osdc->req_mempool)
1867  goto out;
1868 
1870  OSD_OP_FRONT_LEN, 10, true,
1871  "osd_op");
1872  if (err < 0)
1873  goto out_mempool;
1875  OSD_OPREPLY_FRONT_LEN, 10, true,
1876  "osd_op_reply");
1877  if (err < 0)
1878  goto out_msgpool;
1879 
1880  osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1881  if (IS_ERR(osdc->notify_wq)) {
1882  err = PTR_ERR(osdc->notify_wq);
1883  osdc->notify_wq = NULL;
1884  goto out_msgpool;
1885  }
1886  return 0;
1887 
1888 out_msgpool:
1890 out_mempool:
1892 out:
1893  return err;
1894 }
1896 
1898 {
1899  flush_workqueue(osdc->notify_wq);
1903  if (osdc->osdmap) {
1904  ceph_osdmap_destroy(osdc->osdmap);
1905  osdc->osdmap = NULL;
1906  }
1907  remove_all_osds(osdc);
1911 }
1913 
1914 /*
1915  * Read some contiguous pages. If we cross a stripe boundary, shorten
1916  * *plen. Return number of bytes read, or error.
1917  */
1919  struct ceph_vino vino, struct ceph_file_layout *layout,
1920  u64 off, u64 *plen,
1922  struct page **pages, int num_pages, int page_align)
1923 {
1924  struct ceph_osd_request *req;
1925  int rc = 0;
1926 
1927  dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1928  vino.snap, off, *plen);
1929  req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1931  NULL, 0, truncate_seq, truncate_size, NULL,
1932  false, 1, page_align);
1933  if (IS_ERR(req))
1934  return PTR_ERR(req);
1935 
1936  /* it may be a short read due to an object boundary */
1937  req->r_pages = pages;
1938 
1939  dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1940  off, *plen, req->r_num_pages, page_align);
1941 
1942  rc = ceph_osdc_start_request(osdc, req, false);
1943  if (!rc)
1944  rc = ceph_osdc_wait_request(osdc, req);
1945 
1946  ceph_osdc_put_request(req);
1947  dout("readpages result %d\n", rc);
1948  return rc;
1949 }
1951 
1952 /*
1953  * do a synchronous write on N pages
1954  */
1955 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1956  struct ceph_file_layout *layout,
1957  struct ceph_snap_context *snapc,
1958  u64 off, u64 len,
1960  struct timespec *mtime,
1961  struct page **pages, int num_pages,
1962  int flags, int do_sync, bool nofail)
1963 {
1964  struct ceph_osd_request *req;
1965  int rc = 0;
1966  int page_align = off & ~PAGE_MASK;
1967 
1968  BUG_ON(vino.snap != CEPH_NOSNAP);
1969  req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1971  flags | CEPH_OSD_FLAG_ONDISK |
1972  CEPH_OSD_FLAG_WRITE,
1973  snapc, do_sync,
1974  truncate_seq, truncate_size, mtime,
1975  nofail, 1, page_align);
1976  if (IS_ERR(req))
1977  return PTR_ERR(req);
1978 
1979  /* it may be a short write due to an object boundary */
1980  req->r_pages = pages;
1981  dout("writepages %llu~%llu (%d pages)\n", off, len,
1982  req->r_num_pages);
1983 
1984  rc = ceph_osdc_start_request(osdc, req, nofail);
1985  if (!rc)
1986  rc = ceph_osdc_wait_request(osdc, req);
1987 
1988  ceph_osdc_put_request(req);
1989  if (rc == 0)
1990  rc = len;
1991  dout("writepages result %d\n", rc);
1992  return rc;
1993 }
1995 
1996 /*
1997  * handle incoming message
1998  */
1999 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2000 {
2001  struct ceph_osd *osd = con->private;
2002  struct ceph_osd_client *osdc;
2003  int type = le16_to_cpu(msg->hdr.type);
2004 
2005  if (!osd)
2006  goto out;
2007  osdc = osd->o_osdc;
2008 
2009  switch (type) {
2010  case CEPH_MSG_OSD_MAP:
2011  ceph_osdc_handle_map(osdc, msg);
2012  break;
2013  case CEPH_MSG_OSD_OPREPLY:
2014  handle_reply(osdc, msg, con);
2015  break;
2016  case CEPH_MSG_WATCH_NOTIFY:
2017  handle_watch_notify(osdc, msg);
2018  break;
2019 
2020  default:
2021  pr_err("received unknown message type %d %s\n", type,
2022  ceph_msg_type_name(type));
2023  }
2024 out:
2025  ceph_msg_put(msg);
2026 }
2027 
2028 /*
2029  * lookup and return message for incoming reply. set up reply message
2030  * pages.
2031  */
2032 static struct ceph_msg *get_reply(struct ceph_connection *con,
2033  struct ceph_msg_header *hdr,
2034  int *skip)
2035 {
2036  struct ceph_osd *osd = con->private;
2037  struct ceph_osd_client *osdc = osd->o_osdc;
2038  struct ceph_msg *m;
2039  struct ceph_osd_request *req;
2040  int front = le32_to_cpu(hdr->front_len);
2041  int data_len = le32_to_cpu(hdr->data_len);
2042  u64 tid;
2043 
2044  tid = le64_to_cpu(hdr->tid);
2045  mutex_lock(&osdc->request_mutex);
2046  req = __lookup_request(osdc, tid);
2047  if (!req) {
2048  *skip = 1;
2049  m = NULL;
2050  dout("get_reply unknown tid %llu from osd%d\n", tid,
2051  osd->o_osd);
2052  goto out;
2053  }
2054 
2055  if (req->r_con_filling_msg) {
2056  dout("%s revoking msg %p from old con %p\n", __func__,
2057  req->r_reply, req->r_con_filling_msg);
2059  req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2060  req->r_con_filling_msg = NULL;
2061  }
2062 
2063  if (front > req->r_reply->front.iov_len) {
2064  pr_warning("get_reply front %d > preallocated %d\n",
2065  front, (int)req->r_reply->front.iov_len);
2066  m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2067  if (!m)
2068  goto out;
2069  ceph_msg_put(req->r_reply);
2070  req->r_reply = m;
2071  }
2072  m = ceph_msg_get(req->r_reply);
2073 
2074  if (data_len > 0) {
2075  int want = calc_pages_for(req->r_page_alignment, data_len);
2076 
2077  if (unlikely(req->r_num_pages < want)) {
2078  pr_warning("tid %lld reply has %d bytes %d pages, we"
2079  " had only %d pages ready\n", tid, data_len,
2080  want, req->r_num_pages);
2081  *skip = 1;
2082  ceph_msg_put(m);
2083  m = NULL;
2084  goto out;
2085  }
2086  m->pages = req->r_pages;
2087  m->nr_pages = req->r_num_pages;
2088  m->page_alignment = req->r_page_alignment;
2089 #ifdef CONFIG_BLOCK
2090  m->bio = req->r_bio;
2091 #endif
2092  }
2093  *skip = 0;
2094  req->r_con_filling_msg = con->ops->get(con);
2095  dout("get_reply tid %lld %p\n", tid, m);
2096 
2097 out:
2098  mutex_unlock(&osdc->request_mutex);
2099  return m;
2100 
2101 }
2102 
2103 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2104  struct ceph_msg_header *hdr,
2105  int *skip)
2106 {
2107  struct ceph_osd *osd = con->private;
2108  int type = le16_to_cpu(hdr->type);
2109  int front = le32_to_cpu(hdr->front_len);
2110 
2111  *skip = 0;
2112  switch (type) {
2113  case CEPH_MSG_OSD_MAP:
2114  case CEPH_MSG_WATCH_NOTIFY:
2115  return ceph_msg_new(type, front, GFP_NOFS, false);
2116  case CEPH_MSG_OSD_OPREPLY:
2117  return get_reply(con, hdr, skip);
2118  default:
2119  pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2120  osd->o_osd);
2121  *skip = 1;
2122  return NULL;
2123  }
2124 }
2125 
2126 /*
2127  * Wrappers to refcount containing ceph_osd struct
2128  */
2129 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2130 {
2131  struct ceph_osd *osd = con->private;
2132  if (get_osd(osd))
2133  return con;
2134  return NULL;
2135 }
2136 
2137 static void put_osd_con(struct ceph_connection *con)
2138 {
2139  struct ceph_osd *osd = con->private;
2140  put_osd(osd);
2141 }
2142 
2143 /*
2144  * authentication
2145  */
2146 /*
2147  * Note: returned pointer is the address of a structure that's
2148  * managed separately. Caller must *not* attempt to free it.
2149  */
2150 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2151  int *proto, int force_new)
2152 {
2153  struct ceph_osd *o = con->private;
2154  struct ceph_osd_client *osdc = o->o_osdc;
2155  struct ceph_auth_client *ac = osdc->client->monc.auth;
2156  struct ceph_auth_handshake *auth = &o->o_auth;
2157 
2158  if (force_new && auth->authorizer) {
2159  if (ac->ops && ac->ops->destroy_authorizer)
2160  ac->ops->destroy_authorizer(ac, auth->authorizer);
2161  auth->authorizer = NULL;
2162  }
2163  if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2164  int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2165  auth);
2166  if (ret)
2167  return ERR_PTR(ret);
2168  }
2169  *proto = ac->protocol;
2170 
2171  return auth;
2172 }
2173 
2174 
2175 static int verify_authorizer_reply(struct ceph_connection *con, int len)
2176 {
2177  struct ceph_osd *o = con->private;
2178  struct ceph_osd_client *osdc = o->o_osdc;
2179  struct ceph_auth_client *ac = osdc->client->monc.auth;
2180 
2181  /*
2182  * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2183  * XXX which do we do: succeed or fail?
2184  */
2185  return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2186 }
2187 
2188 static int invalidate_authorizer(struct ceph_connection *con)
2189 {
2190  struct ceph_osd *o = con->private;
2191  struct ceph_osd_client *osdc = o->o_osdc;
2192  struct ceph_auth_client *ac = osdc->client->monc.auth;
2193 
2194  if (ac->ops && ac->ops->invalidate_authorizer)
2195  ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2196 
2197  return ceph_monc_validate_auth(&osdc->client->monc);
2198 }
2199 
2200 static const struct ceph_connection_operations osd_con_ops = {
2201  .get = get_osd_con,
2202  .put = put_osd_con,
2203  .dispatch = dispatch,
2204  .get_authorizer = get_authorizer,
2205  .verify_authorizer_reply = verify_authorizer_reply,
2206  .invalidate_authorizer = invalidate_authorizer,
2207  .alloc_msg = alloc_msg,
2208  .fault = osd_reset,
2209 };