Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
rbd.c
Go to the documentation of this file.
1 /*
2  rbd.c -- Export ceph rados objects as a Linux block device
3 
4 
5  based on drivers/block/osdblk.c:
6 
7  Copyright 2009 Red Hat, Inc.
8 
9  This program is free software; you can redistribute it and/or modify
10  it under the terms of the GNU General Public License as published by
11  the Free Software Foundation.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program; see the file COPYING. If not, write to
20  the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21 
22 
23 
24  For usage instructions, please refer to:
25 
26  Documentation/ABI/testing/sysfs-bus-rbd
27 
28  */
29 
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35 
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41 
42 #include "rbd_types.h"
43 
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
45 
46 /*
47  * The basic unit of block I/O is a sector. It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes. These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54 
55 /* It might be useful to have this defined elsewhere too */
56 
57 #define U64_MAX ((u64) (~0ULL))
58 
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN 1024
67 
68 #define RBD_SNAP_HEAD_NAME "-"
69 
70 #define RBD_IMAGE_ID_LEN_MAX 64
71 #define RBD_OBJ_PREFIX_LEN_MAX 64
72 
73 /*
74  * An RBD device name will be "rbd#", where the "rbd" comes from
75  * RBD_DRV_NAME above, and # is a unique integer identifier.
76  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
77  * enough to hold all possible device names.
78  */
79 #define DEV_NAME_LEN 32
80 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
81 
82 #define RBD_READ_ONLY_DEFAULT false
83 
84 /*
85  * block device image metadata (in-memory version)
86  */
88  /* These four fields never change for a given rbd image */
94 
95  /* The remaining fields need to be updated occasionally */
98  char *snap_names;
100 
102 };
103 
104 struct rbd_options {
105  bool read_only;
106 };
107 
108 /*
109  * an instance of the client. multiple devices may share an rbd client.
110  */
111 struct rbd_client {
113  struct kref kref;
114  struct list_head node;
115 };
116 
117 /*
118  * a request completion status
119  */
121  int done;
122  int rc;
124 };
125 
126 /*
127  * a collection of requests
128  */
129 struct rbd_req_coll {
130  int total;
131  int num_done;
132  struct kref kref;
134 };
135 
136 /*
137  * a single io request
138  */
139 struct rbd_request {
140  struct request *rq; /* blk layer request */
141  struct bio *bio; /* cloned bio */
142  struct page **pages; /* list of used pages */
146 };
147 
148 struct rbd_snap {
149  struct device dev;
150  const char *name;
152  struct list_head node;
155 };
156 
157 struct rbd_mapping {
158  char *snap_name;
163  bool read_only;
164 };
165 
166 /*
167  * a single device
168  */
169 struct rbd_device {
170  int dev_id; /* blkdev unique id */
171 
172  int major; /* blkdev assigned major */
173  struct gendisk *disk; /* blkdev's gendisk and rq */
174 
175  u32 image_format; /* Either 1 or 2 */
178 
179  char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180 
181  spinlock_t lock; /* queue lock */
182 
184  char *image_id;
185  size_t image_id_len;
186  char *image_name;
188  char *header_name;
189  char *pool_name;
190  int pool_id;
191 
194 
195  /* protects updating the header */
197 
199 
200  struct list_head node;
201 
202  /* list of snapshots */
203  struct list_head snaps;
204 
205  /* sysfs related */
206  struct device dev;
207 };
208 
209 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
210 
211 static LIST_HEAD(rbd_dev_list); /* devices */
212 static DEFINE_SPINLOCK(rbd_dev_list_lock);
213 
214 static LIST_HEAD(rbd_client_list); /* clients */
215 static DEFINE_SPINLOCK(rbd_client_list_lock);
216 
217 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
219 
220 static void rbd_dev_release(struct device *dev);
221 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
222 
223 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
224  size_t count);
225 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
226  size_t count);
227 
228 static struct bus_attribute rbd_bus_attrs[] = {
229  __ATTR(add, S_IWUSR, NULL, rbd_add),
230  __ATTR(remove, S_IWUSR, NULL, rbd_remove),
232 };
233 
234 static struct bus_type rbd_bus_type = {
235  .name = "rbd",
236  .bus_attrs = rbd_bus_attrs,
237 };
238 
239 static void rbd_root_dev_release(struct device *dev)
240 {
241 }
242 
243 static struct device rbd_root_dev = {
244  .init_name = "rbd",
245  .release = rbd_root_dev_release,
246 };
247 
248 #ifdef RBD_DEBUG
249 #define rbd_assert(expr) \
250  if (unlikely(!(expr))) { \
251  printk(KERN_ERR "\nAssertion failure in %s() " \
252  "at line %d:\n\n" \
253  "\trbd_assert(%s);\n\n", \
254  __func__, __LINE__, #expr); \
255  BUG(); \
256  }
257 #else /* !RBD_DEBUG */
258 # define rbd_assert(expr) ((void) 0)
259 #endif /* !RBD_DEBUG */
260 
261 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
262 {
263  return get_device(&rbd_dev->dev);
264 }
265 
266 static void rbd_put_dev(struct rbd_device *rbd_dev)
267 {
268  put_device(&rbd_dev->dev);
269 }
270 
271 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
272 
273 static int rbd_open(struct block_device *bdev, fmode_t mode)
274 {
275  struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
276 
277  if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
278  return -EROFS;
279 
280  rbd_get_dev(rbd_dev);
281  set_device_ro(bdev, rbd_dev->mapping.read_only);
282 
283  return 0;
284 }
285 
286 static int rbd_release(struct gendisk *disk, fmode_t mode)
287 {
288  struct rbd_device *rbd_dev = disk->private_data;
289 
290  rbd_put_dev(rbd_dev);
291 
292  return 0;
293 }
294 
295 static const struct block_device_operations rbd_bd_ops = {
296  .owner = THIS_MODULE,
297  .open = rbd_open,
298  .release = rbd_release,
299 };
300 
301 /*
302  * Initialize an rbd client instance.
303  * We own *ceph_opts.
304  */
305 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
306 {
307  struct rbd_client *rbdc;
308  int ret = -ENOMEM;
309 
310  dout("rbd_client_create\n");
311  rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
312  if (!rbdc)
313  goto out_opt;
314 
315  kref_init(&rbdc->kref);
316  INIT_LIST_HEAD(&rbdc->node);
317 
319 
320  rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
321  if (IS_ERR(rbdc->client))
322  goto out_mutex;
323  ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
324 
325  ret = ceph_open_session(rbdc->client);
326  if (ret < 0)
327  goto out_err;
328 
329  spin_lock(&rbd_client_list_lock);
330  list_add_tail(&rbdc->node, &rbd_client_list);
331  spin_unlock(&rbd_client_list_lock);
332 
333  mutex_unlock(&ctl_mutex);
334 
335  dout("rbd_client_create created %p\n", rbdc);
336  return rbdc;
337 
338 out_err:
340 out_mutex:
341  mutex_unlock(&ctl_mutex);
342  kfree(rbdc);
343 out_opt:
344  if (ceph_opts)
345  ceph_destroy_options(ceph_opts);
346  return ERR_PTR(ret);
347 }
348 
349 /*
350  * Find a ceph client with specific addr and configuration. If
351  * found, bump its reference count.
352  */
353 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
354 {
355  struct rbd_client *client_node;
356  bool found = false;
357 
358  if (ceph_opts->flags & CEPH_OPT_NOSHARE)
359  return NULL;
360 
361  spin_lock(&rbd_client_list_lock);
362  list_for_each_entry(client_node, &rbd_client_list, node) {
363  if (!ceph_compare_options(ceph_opts, client_node->client)) {
364  kref_get(&client_node->kref);
365  found = true;
366  break;
367  }
368  }
369  spin_unlock(&rbd_client_list_lock);
370 
371  return found ? client_node : NULL;
372 }
373 
374 /*
375  * mount options
376  */
377 enum {
379  /* int args above */
381  /* string args above */
384  /* Boolean args above */
386 };
387 
388 static match_table_t rbd_opts_tokens = {
389  /* int args above */
390  /* string args above */
391  {Opt_read_only, "mapping.read_only"},
392  {Opt_read_only, "ro"}, /* Alternate spelling */
393  {Opt_read_write, "read_write"},
394  {Opt_read_write, "rw"}, /* Alternate spelling */
395  /* Boolean args above */
396  {-1, NULL}
397 };
398 
399 static int parse_rbd_opts_token(char *c, void *private)
400 {
401  struct rbd_options *rbd_opts = private;
402  substring_t argstr[MAX_OPT_ARGS];
403  int token, intval, ret;
404 
405  token = match_token(c, rbd_opts_tokens, argstr);
406  if (token < 0)
407  return -EINVAL;
408 
409  if (token < Opt_last_int) {
410  ret = match_int(&argstr[0], &intval);
411  if (ret < 0) {
412  pr_err("bad mount option arg (not int) "
413  "at '%s'\n", c);
414  return ret;
415  }
416  dout("got int token %d val %d\n", token, intval);
417  } else if (token > Opt_last_int && token < Opt_last_string) {
418  dout("got string token %d val %s\n", token,
419  argstr[0].from);
420  } else if (token > Opt_last_string && token < Opt_last_bool) {
421  dout("got Boolean token %d\n", token);
422  } else {
423  dout("got token %d\n", token);
424  }
425 
426  switch (token) {
427  case Opt_read_only:
428  rbd_opts->read_only = true;
429  break;
430  case Opt_read_write:
431  rbd_opts->read_only = false;
432  break;
433  default:
434  rbd_assert(false);
435  break;
436  }
437  return 0;
438 }
439 
440 /*
441  * Get a ceph client with specific addr and configuration, if one does
442  * not exist create it.
443  */
444 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
445  size_t mon_addr_len, char *options)
446 {
447  struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
448  struct ceph_options *ceph_opts;
449  struct rbd_client *rbdc;
450 
451  rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
452 
453  ceph_opts = ceph_parse_options(options, mon_addr,
454  mon_addr + mon_addr_len,
455  parse_rbd_opts_token, rbd_opts);
456  if (IS_ERR(ceph_opts))
457  return PTR_ERR(ceph_opts);
458 
459  rbdc = rbd_client_find(ceph_opts);
460  if (rbdc) {
461  /* using an existing client */
462  ceph_destroy_options(ceph_opts);
463  } else {
464  rbdc = rbd_client_create(ceph_opts);
465  if (IS_ERR(rbdc))
466  return PTR_ERR(rbdc);
467  }
468  rbd_dev->rbd_client = rbdc;
469 
470  return 0;
471 }
472 
473 /*
474  * Destroy ceph client
475  *
476  * Caller must hold rbd_client_list_lock.
477  */
478 static void rbd_client_release(struct kref *kref)
479 {
480  struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
481 
482  dout("rbd_release_client %p\n", rbdc);
483  spin_lock(&rbd_client_list_lock);
484  list_del(&rbdc->node);
485  spin_unlock(&rbd_client_list_lock);
486 
488  kfree(rbdc);
489 }
490 
491 /*
492  * Drop reference to ceph client node. If it's not referenced anymore, release
493  * it.
494  */
495 static void rbd_put_client(struct rbd_device *rbd_dev)
496 {
497  kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
498  rbd_dev->rbd_client = NULL;
499 }
500 
501 /*
502  * Destroy requests collection
503  */
504 static void rbd_coll_release(struct kref *kref)
505 {
506  struct rbd_req_coll *coll =
507  container_of(kref, struct rbd_req_coll, kref);
508 
509  dout("rbd_coll_release %p\n", coll);
510  kfree(coll);
511 }
512 
513 static bool rbd_image_format_valid(u32 image_format)
514 {
515  return image_format == 1 || image_format == 2;
516 }
517 
518 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
519 {
520  size_t size;
521  u32 snap_count;
522 
523  /* The header has to start with the magic rbd header text */
525  return false;
526 
527  /*
528  * The size of a snapshot header has to fit in a size_t, and
529  * that limits the number of snapshots.
530  */
531  snap_count = le32_to_cpu(ondisk->snap_count);
532  size = SIZE_MAX - sizeof (struct ceph_snap_context);
533  if (snap_count > size / sizeof (__le64))
534  return false;
535 
536  /*
537  * Not only that, but the size of the entire the snapshot
538  * header must also be representable in a size_t.
539  */
540  size -= snap_count * sizeof (__le64);
541  if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
542  return false;
543 
544  return true;
545 }
546 
547 /*
548  * Create a new header structure, translate header format from the on-disk
549  * header.
550  */
551 static int rbd_header_from_disk(struct rbd_image_header *header,
552  struct rbd_image_header_ondisk *ondisk)
553 {
554  u32 snap_count;
555  size_t len;
556  size_t size;
557  u32 i;
558 
559  memset(header, 0, sizeof (*header));
560 
561  snap_count = le32_to_cpu(ondisk->snap_count);
562 
563  len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
564  header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
565  if (!header->object_prefix)
566  return -ENOMEM;
567  memcpy(header->object_prefix, ondisk->object_prefix, len);
568  header->object_prefix[len] = '\0';
569 
570  if (snap_count) {
572 
573  /* Save a copy of the snapshot names */
574 
575  if (snap_names_len > (u64) SIZE_MAX)
576  return -EIO;
577  header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
578  if (!header->snap_names)
579  goto out_err;
580  /*
581  * Note that rbd_dev_v1_header_read() guarantees
582  * the ondisk buffer we're working with has
583  * snap_names_len bytes beyond the end of the
584  * snapshot id array, this memcpy() is safe.
585  */
586  memcpy(header->snap_names, &ondisk->snaps[snap_count],
587  snap_names_len);
588 
589  /* Record each snapshot's size */
590 
591  size = snap_count * sizeof (*header->snap_sizes);
592  header->snap_sizes = kmalloc(size, GFP_KERNEL);
593  if (!header->snap_sizes)
594  goto out_err;
595  for (i = 0; i < snap_count; i++)
596  header->snap_sizes[i] =
597  le64_to_cpu(ondisk->snaps[i].image_size);
598  } else {
599  WARN_ON(ondisk->snap_names_len);
600  header->snap_names = NULL;
601  header->snap_sizes = NULL;
602  }
603 
604  header->features = 0; /* No features support in v1 images */
605  header->obj_order = ondisk->options.order;
606  header->crypt_type = ondisk->options.crypt_type;
607  header->comp_type = ondisk->options.comp_type;
608 
609  /* Allocate and fill in the snapshot context */
610 
611  header->image_size = le64_to_cpu(ondisk->image_size);
612  size = sizeof (struct ceph_snap_context);
613  size += snap_count * sizeof (header->snapc->snaps[0]);
614  header->snapc = kzalloc(size, GFP_KERNEL);
615  if (!header->snapc)
616  goto out_err;
617 
618  atomic_set(&header->snapc->nref, 1);
619  header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
620  header->snapc->num_snaps = snap_count;
621  for (i = 0; i < snap_count; i++)
622  header->snapc->snaps[i] =
623  le64_to_cpu(ondisk->snaps[i].id);
624 
625  return 0;
626 
627 out_err:
628  kfree(header->snap_sizes);
629  header->snap_sizes = NULL;
630  kfree(header->snap_names);
631  header->snap_names = NULL;
632  kfree(header->object_prefix);
633  header->object_prefix = NULL;
634 
635  return -ENOMEM;
636 }
637 
638 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
639 {
640 
641  struct rbd_snap *snap;
642 
643  list_for_each_entry(snap, &rbd_dev->snaps, node) {
644  if (!strcmp(snap_name, snap->name)) {
645  rbd_dev->mapping.snap_id = snap->id;
646  rbd_dev->mapping.size = snap->size;
647  rbd_dev->mapping.features = snap->features;
648 
649  return 0;
650  }
651  }
652 
653  return -ENOENT;
654 }
655 
656 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
657 {
658  int ret;
659 
660  if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
661  sizeof (RBD_SNAP_HEAD_NAME))) {
662  rbd_dev->mapping.snap_id = CEPH_NOSNAP;
663  rbd_dev->mapping.size = rbd_dev->header.image_size;
664  rbd_dev->mapping.features = rbd_dev->header.features;
665  rbd_dev->mapping.snap_exists = false;
666  rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
667  ret = 0;
668  } else {
669  ret = snap_by_name(rbd_dev, snap_name);
670  if (ret < 0)
671  goto done;
672  rbd_dev->mapping.snap_exists = true;
673  rbd_dev->mapping.read_only = true;
674  }
675  rbd_dev->mapping.snap_name = snap_name;
676 done:
677  return ret;
678 }
679 
680 static void rbd_header_free(struct rbd_image_header *header)
681 {
682  kfree(header->object_prefix);
683  header->object_prefix = NULL;
684  kfree(header->snap_sizes);
685  header->snap_sizes = NULL;
686  kfree(header->snap_names);
687  header->snap_names = NULL;
688  ceph_put_snap_context(header->snapc);
689  header->snapc = NULL;
690 }
691 
692 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
693 {
694  char *name;
695  u64 segment;
696  int ret;
697 
699  if (!name)
700  return NULL;
701  segment = offset >> rbd_dev->header.obj_order;
702  ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
703  rbd_dev->header.object_prefix, segment);
704  if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
705  pr_err("error formatting segment name for #%llu (%d)\n",
706  segment, ret);
707  kfree(name);
708  name = NULL;
709  }
710 
711  return name;
712 }
713 
714 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
715 {
716  u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
717 
718  return offset & (segment_size - 1);
719 }
720 
721 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
722  u64 offset, u64 length)
723 {
724  u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
725 
726  offset &= segment_size - 1;
727 
728  rbd_assert(length <= U64_MAX - offset);
729  if (offset + length > segment_size)
730  length = segment_size - offset;
731 
732  return length;
733 }
734 
735 static int rbd_get_num_segments(struct rbd_image_header *header,
736  u64 ofs, u64 len)
737 {
738  u64 start_seg;
739  u64 end_seg;
740 
741  if (!len)
742  return 0;
743  if (len - 1 > U64_MAX - ofs)
744  return -ERANGE;
745 
746  start_seg = ofs >> header->obj_order;
747  end_seg = (ofs + len - 1) >> header->obj_order;
748 
749  return end_seg - start_seg + 1;
750 }
751 
752 /*
753  * returns the size of an object in the image
754  */
755 static u64 rbd_obj_bytes(struct rbd_image_header *header)
756 {
757  return 1 << header->obj_order;
758 }
759 
760 /*
761  * bio helpers
762  */
763 
764 static void bio_chain_put(struct bio *chain)
765 {
766  struct bio *tmp;
767 
768  while (chain) {
769  tmp = chain;
770  chain = chain->bi_next;
771  bio_put(tmp);
772  }
773 }
774 
775 /*
776  * zeros a bio chain, starting at specific offset
777  */
778 static void zero_bio_chain(struct bio *chain, int start_ofs)
779 {
780  struct bio_vec *bv;
781  unsigned long flags;
782  void *buf;
783  int i;
784  int pos = 0;
785 
786  while (chain) {
787  bio_for_each_segment(bv, chain, i) {
788  if (pos + bv->bv_len > start_ofs) {
789  int remainder = max(start_ofs - pos, 0);
790  buf = bvec_kmap_irq(bv, &flags);
791  memset(buf + remainder, 0,
792  bv->bv_len - remainder);
793  bvec_kunmap_irq(buf, &flags);
794  }
795  pos += bv->bv_len;
796  }
797 
798  chain = chain->bi_next;
799  }
800 }
801 
802 /*
803  * bio_chain_clone - clone a chain of bios up to a certain length.
804  * might return a bio_pair that will need to be released.
805  */
806 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
807  struct bio_pair **bp,
808  int len, gfp_t gfpmask)
809 {
810  struct bio *old_chain = *old;
811  struct bio *new_chain = NULL;
812  struct bio *tail;
813  int total = 0;
814 
815  if (*bp) {
816  bio_pair_release(*bp);
817  *bp = NULL;
818  }
819 
820  while (old_chain && (total < len)) {
821  struct bio *tmp;
822 
823  tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
824  if (!tmp)
825  goto err_out;
826  gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
827 
828  if (total + old_chain->bi_size > len) {
829  struct bio_pair *bp;
830 
831  /*
832  * this split can only happen with a single paged bio,
833  * split_bio will BUG_ON if this is not the case
834  */
835  dout("bio_chain_clone split! total=%d remaining=%d"
836  "bi_size=%u\n",
837  total, len - total, old_chain->bi_size);
838 
839  /* split the bio. We'll release it either in the next
840  call, or it will have to be released outside */
841  bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
842  if (!bp)
843  goto err_out;
844 
845  __bio_clone(tmp, &bp->bio1);
846 
847  *next = &bp->bio2;
848  } else {
849  __bio_clone(tmp, old_chain);
850  *next = old_chain->bi_next;
851  }
852 
853  tmp->bi_bdev = NULL;
854  tmp->bi_next = NULL;
855  if (new_chain)
856  tail->bi_next = tmp;
857  else
858  new_chain = tmp;
859  tail = tmp;
860  old_chain = old_chain->bi_next;
861 
862  total += tmp->bi_size;
863  }
864 
865  rbd_assert(total == len);
866 
867  *old = old_chain;
868 
869  return new_chain;
870 
871 err_out:
872  dout("bio_chain_clone with err\n");
873  bio_chain_put(new_chain);
874  return NULL;
875 }
876 
877 /*
878  * helpers for osd request op vectors.
879  */
880 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
881  int opcode, u32 payload_len)
882 {
883  struct ceph_osd_req_op *ops;
884 
885  ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
886  if (!ops)
887  return NULL;
888 
889  ops[0].op = opcode;
890 
891  /*
892  * op extent offset and length will be set later on
893  * in calc_raw_layout()
894  */
895  ops[0].payload_len = payload_len;
896 
897  return ops;
898 }
899 
900 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
901 {
902  kfree(ops);
903 }
904 
905 static void rbd_coll_end_req_index(struct request *rq,
906  struct rbd_req_coll *coll,
907  int index,
908  int ret, u64 len)
909 {
910  struct request_queue *q;
911  int min, max, i;
912 
913  dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
914  coll, index, ret, (unsigned long long) len);
915 
916  if (!rq)
917  return;
918 
919  if (!coll) {
920  blk_end_request(rq, ret, len);
921  return;
922  }
923 
924  q = rq->q;
925 
926  spin_lock_irq(q->queue_lock);
927  coll->status[index].done = 1;
928  coll->status[index].rc = ret;
929  coll->status[index].bytes = len;
930  max = min = coll->num_done;
931  while (max < coll->total && coll->status[max].done)
932  max++;
933 
934  for (i = min; i<max; i++) {
935  __blk_end_request(rq, coll->status[i].rc,
936  coll->status[i].bytes);
937  coll->num_done++;
938  kref_put(&coll->kref, rbd_coll_release);
939  }
940  spin_unlock_irq(q->queue_lock);
941 }
942 
943 static void rbd_coll_end_req(struct rbd_request *req,
944  int ret, u64 len)
945 {
946  rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
947 }
948 
949 /*
950  * Send ceph osd request
951  */
952 static int rbd_do_request(struct request *rq,
953  struct rbd_device *rbd_dev,
954  struct ceph_snap_context *snapc,
955  u64 snapid,
956  const char *object_name, u64 ofs, u64 len,
957  struct bio *bio,
958  struct page **pages,
959  int num_pages,
960  int flags,
961  struct ceph_osd_req_op *ops,
962  struct rbd_req_coll *coll,
963  int coll_index,
964  void (*rbd_cb)(struct ceph_osd_request *req,
965  struct ceph_msg *msg),
966  struct ceph_osd_request **linger_req,
967  u64 *ver)
968 {
969  struct ceph_osd_request *req;
970  struct ceph_file_layout *layout;
971  int ret;
972  u64 bno;
973  struct timespec mtime = CURRENT_TIME;
974  struct rbd_request *req_data;
975  struct ceph_osd_request_head *reqhead;
976  struct ceph_osd_client *osdc;
977 
978  req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
979  if (!req_data) {
980  if (coll)
981  rbd_coll_end_req_index(rq, coll, coll_index,
982  -ENOMEM, len);
983  return -ENOMEM;
984  }
985 
986  if (coll) {
987  req_data->coll = coll;
988  req_data->coll_index = coll_index;
989  }
990 
991  dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
992  (unsigned long long) ofs, (unsigned long long) len);
993 
994  osdc = &rbd_dev->rbd_client->client->osdc;
995  req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
996  false, GFP_NOIO, pages, bio);
997  if (!req) {
998  ret = -ENOMEM;
999  goto done_pages;
1000  }
1001 
1002  req->r_callback = rbd_cb;
1003 
1004  req_data->rq = rq;
1005  req_data->bio = bio;
1006  req_data->pages = pages;
1007  req_data->len = len;
1008 
1009  req->r_priv = req_data;
1010 
1011  reqhead = req->r_request->front.iov_base;
1012  reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1013 
1014  strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1015  req->r_oid_len = strlen(req->r_oid);
1016 
1017  layout = &req->r_file_layout;
1018  memset(layout, 0, sizeof(*layout));
1020  layout->fl_stripe_count = cpu_to_le32(1);
1022  layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1023  ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1024  req, ops);
1025  rbd_assert(ret == 0);
1026 
1027  ceph_osdc_build_request(req, ofs, &len,
1028  ops,
1029  snapc,
1030  &mtime,
1031  req->r_oid, req->r_oid_len);
1032 
1033  if (linger_req) {
1034  ceph_osdc_set_request_linger(osdc, req);
1035  *linger_req = req;
1036  }
1037 
1038  ret = ceph_osdc_start_request(osdc, req, false);
1039  if (ret < 0)
1040  goto done_err;
1041 
1042  if (!rbd_cb) {
1043  ret = ceph_osdc_wait_request(osdc, req);
1044  if (ver)
1045  *ver = le64_to_cpu(req->r_reassert_version.version);
1046  dout("reassert_ver=%llu\n",
1047  (unsigned long long)
1048  le64_to_cpu(req->r_reassert_version.version));
1049  ceph_osdc_put_request(req);
1050  }
1051  return ret;
1052 
1053 done_err:
1054  bio_chain_put(req_data->bio);
1055  ceph_osdc_put_request(req);
1056 done_pages:
1057  rbd_coll_end_req(req_data, ret, len);
1058  kfree(req_data);
1059  return ret;
1060 }
1061 
1062 /*
1063  * Ceph osd op callback
1064  */
1065 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1066 {
1067  struct rbd_request *req_data = req->r_priv;
1068  struct ceph_osd_reply_head *replyhead;
1069  struct ceph_osd_op *op;
1070  __s32 rc;
1071  u64 bytes;
1072  int read_op;
1073 
1074  /* parse reply */
1075  replyhead = msg->front.iov_base;
1076  WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1077  op = (void *)(replyhead + 1);
1078  rc = le32_to_cpu(replyhead->result);
1079  bytes = le64_to_cpu(op->extent.length);
1080  read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1081 
1082  dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1083  (unsigned long long) bytes, read_op, (int) rc);
1084 
1085  if (rc == -ENOENT && read_op) {
1086  zero_bio_chain(req_data->bio, 0);
1087  rc = 0;
1088  } else if (rc == 0 && read_op && bytes < req_data->len) {
1089  zero_bio_chain(req_data->bio, bytes);
1090  bytes = req_data->len;
1091  }
1092 
1093  rbd_coll_end_req(req_data, rc, bytes);
1094 
1095  if (req_data->bio)
1096  bio_chain_put(req_data->bio);
1097 
1098  ceph_osdc_put_request(req);
1099  kfree(req_data);
1100 }
1101 
1102 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1103 {
1104  ceph_osdc_put_request(req);
1105 }
1106 
1107 /*
1108  * Do a synchronous ceph osd operation
1109  */
1110 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1111  struct ceph_snap_context *snapc,
1112  u64 snapid,
1113  int flags,
1114  struct ceph_osd_req_op *ops,
1115  const char *object_name,
1116  u64 ofs, u64 inbound_size,
1117  char *inbound,
1118  struct ceph_osd_request **linger_req,
1119  u64 *ver)
1120 {
1121  int ret;
1122  struct page **pages;
1123  int num_pages;
1124 
1125  rbd_assert(ops != NULL);
1126 
1127  num_pages = calc_pages_for(ofs, inbound_size);
1128  pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1129  if (IS_ERR(pages))
1130  return PTR_ERR(pages);
1131 
1132  ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1133  object_name, ofs, inbound_size, NULL,
1134  pages, num_pages,
1135  flags,
1136  ops,
1137  NULL, 0,
1138  NULL,
1139  linger_req, ver);
1140  if (ret < 0)
1141  goto done;
1142 
1143  if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1144  ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1145 
1146 done:
1147  ceph_release_page_vector(pages, num_pages);
1148  return ret;
1149 }
1150 
1151 /*
1152  * Do an asynchronous ceph osd operation
1153  */
1154 static int rbd_do_op(struct request *rq,
1155  struct rbd_device *rbd_dev,
1156  struct ceph_snap_context *snapc,
1157  u64 snapid,
1158  int opcode, int flags,
1159  u64 ofs, u64 len,
1160  struct bio *bio,
1161  struct rbd_req_coll *coll,
1162  int coll_index)
1163 {
1164  char *seg_name;
1165  u64 seg_ofs;
1166  u64 seg_len;
1167  int ret;
1168  struct ceph_osd_req_op *ops;
1169  u32 payload_len;
1170 
1171  seg_name = rbd_segment_name(rbd_dev, ofs);
1172  if (!seg_name)
1173  return -ENOMEM;
1174  seg_len = rbd_segment_length(rbd_dev, ofs, len);
1175  seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1176 
1177  payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1178 
1179  ret = -ENOMEM;
1180  ops = rbd_create_rw_ops(1, opcode, payload_len);
1181  if (!ops)
1182  goto done;
1183 
1184  /* we've taken care of segment sizes earlier when we
1185  cloned the bios. We should never have a segment
1186  truncated at this point */
1187  rbd_assert(seg_len == len);
1188 
1189  ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1190  seg_name, seg_ofs, seg_len,
1191  bio,
1192  NULL, 0,
1193  flags,
1194  ops,
1195  coll, coll_index,
1196  rbd_req_cb, 0, NULL);
1197 
1198  rbd_destroy_ops(ops);
1199 done:
1200  kfree(seg_name);
1201  return ret;
1202 }
1203 
1204 /*
1205  * Request async osd write
1206  */
1207 static int rbd_req_write(struct request *rq,
1208  struct rbd_device *rbd_dev,
1209  struct ceph_snap_context *snapc,
1210  u64 ofs, u64 len,
1211  struct bio *bio,
1212  struct rbd_req_coll *coll,
1213  int coll_index)
1214 {
1215  return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1218  ofs, len, bio, coll, coll_index);
1219 }
1220 
1221 /*
1222  * Request async osd read
1223  */
1224 static int rbd_req_read(struct request *rq,
1225  struct rbd_device *rbd_dev,
1226  u64 snapid,
1227  u64 ofs, u64 len,
1228  struct bio *bio,
1229  struct rbd_req_coll *coll,
1230  int coll_index)
1231 {
1232  return rbd_do_op(rq, rbd_dev, NULL,
1233  snapid,
1236  ofs, len, bio, coll, coll_index);
1237 }
1238 
1239 /*
1240  * Request sync osd read
1241  */
1242 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1243  u64 snapid,
1244  const char *object_name,
1245  u64 ofs, u64 len,
1246  char *buf,
1247  u64 *ver)
1248 {
1249  struct ceph_osd_req_op *ops;
1250  int ret;
1251 
1252  ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1253  if (!ops)
1254  return -ENOMEM;
1255 
1256  ret = rbd_req_sync_op(rbd_dev, NULL,
1257  snapid,
1259  ops, object_name, ofs, len, buf, NULL, ver);
1260  rbd_destroy_ops(ops);
1261 
1262  return ret;
1263 }
1264 
1265 /*
1266  * Request sync osd watch
1267  */
1268 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1269  u64 ver,
1270  u64 notify_id)
1271 {
1272  struct ceph_osd_req_op *ops;
1273  int ret;
1274 
1275  ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1276  if (!ops)
1277  return -ENOMEM;
1278 
1279  ops[0].watch.ver = cpu_to_le64(ver);
1280  ops[0].watch.cookie = notify_id;
1281  ops[0].watch.flag = 0;
1282 
1283  ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1284  rbd_dev->header_name, 0, 0, NULL,
1285  NULL, 0,
1287  ops,
1288  NULL, 0,
1289  rbd_simple_req_cb, 0, NULL);
1290 
1291  rbd_destroy_ops(ops);
1292  return ret;
1293 }
1294 
1295 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1296 {
1297  struct rbd_device *rbd_dev = (struct rbd_device *)data;
1298  u64 hver;
1299  int rc;
1300 
1301  if (!rbd_dev)
1302  return;
1303 
1304  dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1305  rbd_dev->header_name, (unsigned long long) notify_id,
1306  (unsigned int) opcode);
1307  rc = rbd_refresh_header(rbd_dev, &hver);
1308  if (rc)
1309  pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1310  " update snaps: %d\n", rbd_dev->major, rc);
1311 
1312  rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1313 }
1314 
1315 /*
1316  * Request sync osd watch
1317  */
1318 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1319 {
1320  struct ceph_osd_req_op *ops;
1321  struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1322  int ret;
1323 
1324  ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1325  if (!ops)
1326  return -ENOMEM;
1327 
1328  ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1329  (void *)rbd_dev, &rbd_dev->watch_event);
1330  if (ret < 0)
1331  goto fail;
1332 
1333  ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1334  ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1335  ops[0].watch.flag = 1;
1336 
1337  ret = rbd_req_sync_op(rbd_dev, NULL,
1338  CEPH_NOSNAP,
1340  ops,
1341  rbd_dev->header_name,
1342  0, 0, NULL,
1343  &rbd_dev->watch_request, NULL);
1344 
1345  if (ret < 0)
1346  goto fail_event;
1347 
1348  rbd_destroy_ops(ops);
1349  return 0;
1350 
1351 fail_event:
1353  rbd_dev->watch_event = NULL;
1354 fail:
1355  rbd_destroy_ops(ops);
1356  return ret;
1357 }
1358 
1359 /*
1360  * Request sync osd unwatch
1361  */
1362 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1363 {
1364  struct ceph_osd_req_op *ops;
1365  int ret;
1366 
1367  ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1368  if (!ops)
1369  return -ENOMEM;
1370 
1371  ops[0].watch.ver = 0;
1372  ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1373  ops[0].watch.flag = 0;
1374 
1375  ret = rbd_req_sync_op(rbd_dev, NULL,
1376  CEPH_NOSNAP,
1378  ops,
1379  rbd_dev->header_name,
1380  0, 0, NULL, NULL, NULL);
1381 
1382 
1383  rbd_destroy_ops(ops);
1385  rbd_dev->watch_event = NULL;
1386  return ret;
1387 }
1388 
1389 /*
1390  * Synchronous osd object method call
1391  */
1392 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1393  const char *object_name,
1394  const char *class_name,
1395  const char *method_name,
1396  const char *outbound,
1397  size_t outbound_size,
1398  char *inbound,
1399  size_t inbound_size,
1400  int flags,
1401  u64 *ver)
1402 {
1403  struct ceph_osd_req_op *ops;
1404  int class_name_len = strlen(class_name);
1405  int method_name_len = strlen(method_name);
1406  int payload_size;
1407  int ret;
1408 
1409  /*
1410  * Any input parameters required by the method we're calling
1411  * will be sent along with the class and method names as
1412  * part of the message payload. That data and its size are
1413  * supplied via the indata and indata_len fields (named from
1414  * the perspective of the server side) in the OSD request
1415  * operation.
1416  */
1417  payload_size = class_name_len + method_name_len + outbound_size;
1418  ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1419  if (!ops)
1420  return -ENOMEM;
1421 
1422  ops[0].cls.class_name = class_name;
1423  ops[0].cls.class_len = (__u8) class_name_len;
1424  ops[0].cls.method_name = method_name;
1425  ops[0].cls.method_len = (__u8) method_name_len;
1426  ops[0].cls.argc = 0;
1427  ops[0].cls.indata = outbound;
1428  ops[0].cls.indata_len = outbound_size;
1429 
1430  ret = rbd_req_sync_op(rbd_dev, NULL,
1431  CEPH_NOSNAP,
1432  flags, ops,
1433  object_name, 0, inbound_size, inbound,
1434  NULL, ver);
1435 
1436  rbd_destroy_ops(ops);
1437 
1438  dout("cls_exec returned %d\n", ret);
1439  return ret;
1440 }
1441 
1442 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1443 {
1444  struct rbd_req_coll *coll =
1445  kzalloc(sizeof(struct rbd_req_coll) +
1446  sizeof(struct rbd_req_status) * num_reqs,
1447  GFP_ATOMIC);
1448 
1449  if (!coll)
1450  return NULL;
1451  coll->total = num_reqs;
1452  kref_init(&coll->kref);
1453  return coll;
1454 }
1455 
1456 /*
1457  * block device queue callback
1458  */
1459 static void rbd_rq_fn(struct request_queue *q)
1460 {
1461  struct rbd_device *rbd_dev = q->queuedata;
1462  struct request *rq;
1463  struct bio_pair *bp = NULL;
1464 
1465  while ((rq = blk_fetch_request(q))) {
1466  struct bio *bio;
1467  struct bio *rq_bio, *next_bio = NULL;
1468  bool do_write;
1469  unsigned int size;
1470  u64 op_size = 0;
1471  u64 ofs;
1472  int num_segs, cur_seg = 0;
1473  struct rbd_req_coll *coll;
1474  struct ceph_snap_context *snapc;
1475 
1476  dout("fetched request\n");
1477 
1478  /* filter out block requests we don't understand */
1479  if ((rq->cmd_type != REQ_TYPE_FS)) {
1480  __blk_end_request_all(rq, 0);
1481  continue;
1482  }
1483 
1484  /* deduce our operation (read, write) */
1485  do_write = (rq_data_dir(rq) == WRITE);
1486 
1487  size = blk_rq_bytes(rq);
1488  ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1489  rq_bio = rq->bio;
1490  if (do_write && rbd_dev->mapping.read_only) {
1492  continue;
1493  }
1494 
1495  spin_unlock_irq(q->queue_lock);
1496 
1497  down_read(&rbd_dev->header_rwsem);
1498 
1499  if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1500  !rbd_dev->mapping.snap_exists) {
1501  up_read(&rbd_dev->header_rwsem);
1502  dout("request for non-existent snapshot");
1503  spin_lock_irq(q->queue_lock);
1505  continue;
1506  }
1507 
1508  snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1509 
1510  up_read(&rbd_dev->header_rwsem);
1511 
1512  dout("%s 0x%x bytes at 0x%llx\n",
1513  do_write ? "write" : "read",
1514  size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1515 
1516  num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1517  if (num_segs <= 0) {
1518  spin_lock_irq(q->queue_lock);
1519  __blk_end_request_all(rq, num_segs);
1520  ceph_put_snap_context(snapc);
1521  continue;
1522  }
1523  coll = rbd_alloc_coll(num_segs);
1524  if (!coll) {
1525  spin_lock_irq(q->queue_lock);
1527  ceph_put_snap_context(snapc);
1528  continue;
1529  }
1530 
1531  do {
1532  /* a bio clone to be passed down to OSD req */
1533  dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534  op_size = rbd_segment_length(rbd_dev, ofs, size);
1535  kref_get(&coll->kref);
1536  bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1537  op_size, GFP_ATOMIC);
1538  if (!bio) {
1539  rbd_coll_end_req_index(rq, coll, cur_seg,
1540  -ENOMEM, op_size);
1541  goto next_seg;
1542  }
1543 
1544 
1545  /* init OSD command: write or read */
1546  if (do_write)
1547  rbd_req_write(rq, rbd_dev,
1548  snapc,
1549  ofs,
1550  op_size, bio,
1551  coll, cur_seg);
1552  else
1553  rbd_req_read(rq, rbd_dev,
1554  rbd_dev->mapping.snap_id,
1555  ofs,
1556  op_size, bio,
1557  coll, cur_seg);
1558 
1559 next_seg:
1560  size -= op_size;
1561  ofs += op_size;
1562 
1563  cur_seg++;
1564  rq_bio = next_bio;
1565  } while (size > 0);
1566  kref_put(&coll->kref, rbd_coll_release);
1567 
1568  if (bp)
1569  bio_pair_release(bp);
1570  spin_lock_irq(q->queue_lock);
1571 
1572  ceph_put_snap_context(snapc);
1573  }
1574 }
1575 
1576 /*
1577  * a queue callback. Makes sure that we don't create a bio that spans across
1578  * multiple osd objects. One exception would be with a single page bios,
1579  * which we handle later at bio_chain_clone
1580  */
1581 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1582  struct bio_vec *bvec)
1583 {
1584  struct rbd_device *rbd_dev = q->queuedata;
1585  unsigned int chunk_sectors;
1586  sector_t sector;
1587  unsigned int bio_sectors;
1588  int max;
1589 
1590  chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1591  sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1592  bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1593 
1594  max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1595  + bio_sectors)) << SECTOR_SHIFT;
1596  if (max < 0)
1597  max = 0; /* bio_add cannot handle a negative return */
1598  if (max <= bvec->bv_len && bio_sectors == 0)
1599  return bvec->bv_len;
1600  return max;
1601 }
1602 
1603 static void rbd_free_disk(struct rbd_device *rbd_dev)
1604 {
1605  struct gendisk *disk = rbd_dev->disk;
1606 
1607  if (!disk)
1608  return;
1609 
1610  if (disk->flags & GENHD_FL_UP)
1611  del_gendisk(disk);
1612  if (disk->queue)
1613  blk_cleanup_queue(disk->queue);
1614  put_disk(disk);
1615 }
1616 
1617 /*
1618  * Read the complete header for the given rbd device.
1619  *
1620  * Returns a pointer to a dynamically-allocated buffer containing
1621  * the complete and validated header. Caller can pass the address
1622  * of a variable that will be filled in with the version of the
1623  * header object at the time it was read.
1624  *
1625  * Returns a pointer-coded errno if a failure occurs.
1626  */
1627 static struct rbd_image_header_ondisk *
1628 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1629 {
1630  struct rbd_image_header_ondisk *ondisk = NULL;
1631  u32 snap_count = 0;
1632  u64 names_size = 0;
1633  u32 want_count;
1634  int ret;
1635 
1636  /*
1637  * The complete header will include an array of its 64-bit
1638  * snapshot ids, followed by the names of those snapshots as
1639  * a contiguous block of NUL-terminated strings. Note that
1640  * the number of snapshots could change by the time we read
1641  * it in, in which case we re-read it.
1642  */
1643  do {
1644  size_t size;
1645 
1646  kfree(ondisk);
1647 
1648  size = sizeof (*ondisk);
1649  size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1650  size += names_size;
1651  ondisk = kmalloc(size, GFP_KERNEL);
1652  if (!ondisk)
1653  return ERR_PTR(-ENOMEM);
1654 
1655  ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1656  rbd_dev->header_name,
1657  0, size,
1658  (char *) ondisk, version);
1659 
1660  if (ret < 0)
1661  goto out_err;
1662  if (WARN_ON((size_t) ret < size)) {
1663  ret = -ENXIO;
1664  pr_warning("short header read for image %s"
1665  " (want %zd got %d)\n",
1666  rbd_dev->image_name, size, ret);
1667  goto out_err;
1668  }
1669  if (!rbd_dev_ondisk_valid(ondisk)) {
1670  ret = -ENXIO;
1671  pr_warning("invalid header for image %s\n",
1672  rbd_dev->image_name);
1673  goto out_err;
1674  }
1675 
1676  names_size = le64_to_cpu(ondisk->snap_names_len);
1677  want_count = snap_count;
1678  snap_count = le32_to_cpu(ondisk->snap_count);
1679  } while (snap_count != want_count);
1680 
1681  return ondisk;
1682 
1683 out_err:
1684  kfree(ondisk);
1685 
1686  return ERR_PTR(ret);
1687 }
1688 
1689 /*
1690  * reload the ondisk the header
1691  */
1692 static int rbd_read_header(struct rbd_device *rbd_dev,
1693  struct rbd_image_header *header)
1694 {
1695  struct rbd_image_header_ondisk *ondisk;
1696  u64 ver = 0;
1697  int ret;
1698 
1699  ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1700  if (IS_ERR(ondisk))
1701  return PTR_ERR(ondisk);
1702  ret = rbd_header_from_disk(header, ondisk);
1703  if (ret >= 0)
1704  header->obj_version = ver;
1705  kfree(ondisk);
1706 
1707  return ret;
1708 }
1709 
1710 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1711 {
1712  struct rbd_snap *snap;
1713  struct rbd_snap *next;
1714 
1715  list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1716  __rbd_remove_snap_dev(snap);
1717 }
1718 
1719 /*
1720  * only read the first part of the ondisk header, without the snaps info
1721  */
1722 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1723 {
1724  int ret;
1725  struct rbd_image_header h;
1726 
1727  ret = rbd_read_header(rbd_dev, &h);
1728  if (ret < 0)
1729  return ret;
1730 
1731  down_write(&rbd_dev->header_rwsem);
1732 
1733  /* resized? */
1734  if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1735  sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1736 
1737  if (size != (sector_t) rbd_dev->mapping.size) {
1738  dout("setting size to %llu sectors",
1739  (unsigned long long) size);
1740  rbd_dev->mapping.size = (u64) size;
1741  set_capacity(rbd_dev->disk, size);
1742  }
1743  }
1744 
1745  /* rbd_dev->header.object_prefix shouldn't change */
1746  kfree(rbd_dev->header.snap_sizes);
1747  kfree(rbd_dev->header.snap_names);
1748  /* osd requests may still refer to snapc */
1749  ceph_put_snap_context(rbd_dev->header.snapc);
1750 
1751  if (hver)
1752  *hver = h.obj_version;
1753  rbd_dev->header.obj_version = h.obj_version;
1754  rbd_dev->header.image_size = h.image_size;
1755  rbd_dev->header.snapc = h.snapc;
1756  rbd_dev->header.snap_names = h.snap_names;
1757  rbd_dev->header.snap_sizes = h.snap_sizes;
1758  /* Free the extra copy of the object prefix */
1759  WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1760  kfree(h.object_prefix);
1761 
1762  ret = rbd_dev_snaps_update(rbd_dev);
1763  if (!ret)
1764  ret = rbd_dev_snaps_register(rbd_dev);
1765 
1766  up_write(&rbd_dev->header_rwsem);
1767 
1768  return ret;
1769 }
1770 
1771 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1772 {
1773  int ret;
1774 
1776  ret = __rbd_refresh_header(rbd_dev, hver);
1777  mutex_unlock(&ctl_mutex);
1778 
1779  return ret;
1780 }
1781 
1782 static int rbd_init_disk(struct rbd_device *rbd_dev)
1783 {
1784  struct gendisk *disk;
1785  struct request_queue *q;
1786  u64 segment_size;
1787 
1788  /* create gendisk info */
1790  if (!disk)
1791  return -ENOMEM;
1792 
1793  snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1794  rbd_dev->dev_id);
1795  disk->major = rbd_dev->major;
1796  disk->first_minor = 0;
1797  disk->fops = &rbd_bd_ops;
1798  disk->private_data = rbd_dev;
1799 
1800  /* init rq */
1801  q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1802  if (!q)
1803  goto out_disk;
1804 
1805  /* We use the default size, but let's be explicit about it. */
1807 
1808  /* set io sizes to object size */
1809  segment_size = rbd_obj_bytes(&rbd_dev->header);
1810  blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1811  blk_queue_max_segment_size(q, segment_size);
1812  blk_queue_io_min(q, segment_size);
1813  blk_queue_io_opt(q, segment_size);
1814 
1815  blk_queue_merge_bvec(q, rbd_merge_bvec);
1816  disk->queue = q;
1817 
1818  q->queuedata = rbd_dev;
1819 
1820  rbd_dev->disk = disk;
1821 
1822  set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1823 
1824  return 0;
1825 out_disk:
1826  put_disk(disk);
1827 
1828  return -ENOMEM;
1829 }
1830 
1831 /*
1832  sysfs
1833 */
1834 
1835 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1836 {
1837  return container_of(dev, struct rbd_device, dev);
1838 }
1839 
1840 static ssize_t rbd_size_show(struct device *dev,
1841  struct device_attribute *attr, char *buf)
1842 {
1843  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1844  sector_t size;
1845 
1846  down_read(&rbd_dev->header_rwsem);
1847  size = get_capacity(rbd_dev->disk);
1848  up_read(&rbd_dev->header_rwsem);
1849 
1850  return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1851 }
1852 
1853 /*
1854  * Note this shows the features for whatever's mapped, which is not
1855  * necessarily the base image.
1856  */
1857 static ssize_t rbd_features_show(struct device *dev,
1858  struct device_attribute *attr, char *buf)
1859 {
1860  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861 
1862  return sprintf(buf, "0x%016llx\n",
1863  (unsigned long long) rbd_dev->mapping.features);
1864 }
1865 
1866 static ssize_t rbd_major_show(struct device *dev,
1867  struct device_attribute *attr, char *buf)
1868 {
1869  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870 
1871  return sprintf(buf, "%d\n", rbd_dev->major);
1872 }
1873 
1874 static ssize_t rbd_client_id_show(struct device *dev,
1875  struct device_attribute *attr, char *buf)
1876 {
1877  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878 
1879  return sprintf(buf, "client%lld\n",
1880  ceph_client_id(rbd_dev->rbd_client->client));
1881 }
1882 
1883 static ssize_t rbd_pool_show(struct device *dev,
1884  struct device_attribute *attr, char *buf)
1885 {
1886  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887 
1888  return sprintf(buf, "%s\n", rbd_dev->pool_name);
1889 }
1890 
1891 static ssize_t rbd_pool_id_show(struct device *dev,
1892  struct device_attribute *attr, char *buf)
1893 {
1894  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895 
1896  return sprintf(buf, "%d\n", rbd_dev->pool_id);
1897 }
1898 
1899 static ssize_t rbd_name_show(struct device *dev,
1900  struct device_attribute *attr, char *buf)
1901 {
1902  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903 
1904  return sprintf(buf, "%s\n", rbd_dev->image_name);
1905 }
1906 
1907 static ssize_t rbd_image_id_show(struct device *dev,
1908  struct device_attribute *attr, char *buf)
1909 {
1910  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1911 
1912  return sprintf(buf, "%s\n", rbd_dev->image_id);
1913 }
1914 
1915 /*
1916  * Shows the name of the currently-mapped snapshot (or
1917  * RBD_SNAP_HEAD_NAME for the base image).
1918  */
1919 static ssize_t rbd_snap_show(struct device *dev,
1920  struct device_attribute *attr,
1921  char *buf)
1922 {
1923  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924 
1925  return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1926 }
1927 
1928 static ssize_t rbd_image_refresh(struct device *dev,
1929  struct device_attribute *attr,
1930  const char *buf,
1931  size_t size)
1932 {
1933  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934  int ret;
1935 
1936  ret = rbd_refresh_header(rbd_dev, NULL);
1937 
1938  return ret < 0 ? ret : size;
1939 }
1940 
1941 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1942 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1943 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1944 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1945 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1946 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1947 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1948 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1949 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1950 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1951 
1952 static struct attribute *rbd_attrs[] = {
1953  &dev_attr_size.attr,
1954  &dev_attr_features.attr,
1955  &dev_attr_major.attr,
1956  &dev_attr_client_id.attr,
1957  &dev_attr_pool.attr,
1958  &dev_attr_pool_id.attr,
1959  &dev_attr_name.attr,
1960  &dev_attr_image_id.attr,
1961  &dev_attr_current_snap.attr,
1962  &dev_attr_refresh.attr,
1963  NULL
1964 };
1965 
1966 static struct attribute_group rbd_attr_group = {
1967  .attrs = rbd_attrs,
1968 };
1969 
1970 static const struct attribute_group *rbd_attr_groups[] = {
1971  &rbd_attr_group,
1972  NULL
1973 };
1974 
1975 static void rbd_sysfs_dev_release(struct device *dev)
1976 {
1977 }
1978 
1979 static struct device_type rbd_device_type = {
1980  .name = "rbd",
1981  .groups = rbd_attr_groups,
1982  .release = rbd_sysfs_dev_release,
1983 };
1984 
1985 
1986 /*
1987  sysfs - snapshots
1988 */
1989 
1990 static ssize_t rbd_snap_size_show(struct device *dev,
1991  struct device_attribute *attr,
1992  char *buf)
1993 {
1994  struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1995 
1996  return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1997 }
1998 
1999 static ssize_t rbd_snap_id_show(struct device *dev,
2000  struct device_attribute *attr,
2001  char *buf)
2002 {
2003  struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2004 
2005  return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2006 }
2007 
2008 static ssize_t rbd_snap_features_show(struct device *dev,
2009  struct device_attribute *attr,
2010  char *buf)
2011 {
2012  struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2013 
2014  return sprintf(buf, "0x%016llx\n",
2015  (unsigned long long) snap->features);
2016 }
2017 
2018 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2019 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2020 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2021 
2022 static struct attribute *rbd_snap_attrs[] = {
2023  &dev_attr_snap_size.attr,
2024  &dev_attr_snap_id.attr,
2025  &dev_attr_snap_features.attr,
2026  NULL,
2027 };
2028 
2029 static struct attribute_group rbd_snap_attr_group = {
2030  .attrs = rbd_snap_attrs,
2031 };
2032 
2033 static void rbd_snap_dev_release(struct device *dev)
2034 {
2035  struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2036  kfree(snap->name);
2037  kfree(snap);
2038 }
2039 
2040 static const struct attribute_group *rbd_snap_attr_groups[] = {
2041  &rbd_snap_attr_group,
2042  NULL
2043 };
2044 
2045 static struct device_type rbd_snap_device_type = {
2046  .groups = rbd_snap_attr_groups,
2047  .release = rbd_snap_dev_release,
2048 };
2049 
2050 static bool rbd_snap_registered(struct rbd_snap *snap)
2051 {
2052  bool ret = snap->dev.type == &rbd_snap_device_type;
2053  bool reg = device_is_registered(&snap->dev);
2054 
2055  rbd_assert(!ret ^ reg);
2056 
2057  return ret;
2058 }
2059 
2060 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2061 {
2062  list_del(&snap->node);
2063  if (device_is_registered(&snap->dev))
2064  device_unregister(&snap->dev);
2065 }
2066 
2067 static int rbd_register_snap_dev(struct rbd_snap *snap,
2068  struct device *parent)
2069 {
2070  struct device *dev = &snap->dev;
2071  int ret;
2072 
2073  dev->type = &rbd_snap_device_type;
2074  dev->parent = parent;
2075  dev->release = rbd_snap_dev_release;
2076  dev_set_name(dev, "snap_%s", snap->name);
2077  dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2078 
2079  ret = device_register(dev);
2080 
2081  return ret;
2082 }
2083 
2084 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2085  const char *snap_name,
2086  u64 snap_id, u64 snap_size,
2087  u64 snap_features)
2088 {
2089  struct rbd_snap *snap;
2090  int ret;
2091 
2092  snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2093  if (!snap)
2094  return ERR_PTR(-ENOMEM);
2095 
2096  ret = -ENOMEM;
2097  snap->name = kstrdup(snap_name, GFP_KERNEL);
2098  if (!snap->name)
2099  goto err;
2100 
2101  snap->id = snap_id;
2102  snap->size = snap_size;
2103  snap->features = snap_features;
2104 
2105  return snap;
2106 
2107 err:
2108  kfree(snap->name);
2109  kfree(snap);
2110 
2111  return ERR_PTR(ret);
2112 }
2113 
2114 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2115  u64 *snap_size, u64 *snap_features)
2116 {
2117  char *snap_name;
2118 
2119  rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2120 
2121  *snap_size = rbd_dev->header.snap_sizes[which];
2122  *snap_features = 0; /* No features for v1 */
2123 
2124  /* Skip over names until we find the one we are looking for */
2125 
2126  snap_name = rbd_dev->header.snap_names;
2127  while (which--)
2128  snap_name += strlen(snap_name) + 1;
2129 
2130  return snap_name;
2131 }
2132 
2133 /*
2134  * Get the size and object order for an image snapshot, or if
2135  * snap_id is CEPH_NOSNAP, gets this information for the base
2136  * image.
2137  */
2138 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2139  u8 *order, u64 *snap_size)
2140 {
2141  __le64 snapid = cpu_to_le64(snap_id);
2142  int ret;
2143  struct {
2144  u8 order;
2145  __le64 size;
2146  } __attribute__ ((packed)) size_buf = { 0 };
2147 
2148  ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2149  "rbd", "get_size",
2150  (char *) &snapid, sizeof (snapid),
2151  (char *) &size_buf, sizeof (size_buf),
2153  dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2154  if (ret < 0)
2155  return ret;
2156 
2157  *order = size_buf.order;
2158  *snap_size = le64_to_cpu(size_buf.size);
2159 
2160  dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2161  (unsigned long long) snap_id, (unsigned int) *order,
2162  (unsigned long long) *snap_size);
2163 
2164  return 0;
2165 }
2166 
2167 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2168 {
2169  return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2170  &rbd_dev->header.obj_order,
2171  &rbd_dev->header.image_size);
2172 }
2173 
2174 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2175 {
2176  void *reply_buf;
2177  int ret;
2178  void *p;
2179 
2180  reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2181  if (!reply_buf)
2182  return -ENOMEM;
2183 
2184  ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2185  "rbd", "get_object_prefix",
2186  NULL, 0,
2187  reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2189  dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2190  if (ret < 0)
2191  goto out;
2192 
2193  p = reply_buf;
2194  rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2196  NULL, GFP_NOIO);
2197 
2198  if (IS_ERR(rbd_dev->header.object_prefix)) {
2199  ret = PTR_ERR(rbd_dev->header.object_prefix);
2200  rbd_dev->header.object_prefix = NULL;
2201  } else {
2202  dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2203  }
2204 
2205 out:
2206  kfree(reply_buf);
2207 
2208  return ret;
2209 }
2210 
2211 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2212  u64 *snap_features)
2213 {
2214  __le64 snapid = cpu_to_le64(snap_id);
2215  struct {
2216  __le64 features;
2217  __le64 incompat;
2218  } features_buf = { 0 };
2219  int ret;
2220 
2221  ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2222  "rbd", "get_features",
2223  (char *) &snapid, sizeof (snapid),
2224  (char *) &features_buf, sizeof (features_buf),
2226  dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2227  if (ret < 0)
2228  return ret;
2229  *snap_features = le64_to_cpu(features_buf.features);
2230 
2231  dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2232  (unsigned long long) snap_id,
2233  (unsigned long long) *snap_features,
2234  (unsigned long long) le64_to_cpu(features_buf.incompat));
2235 
2236  return 0;
2237 }
2238 
2239 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2240 {
2241  return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2242  &rbd_dev->header.features);
2243 }
2244 
2245 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2246 {
2247  size_t size;
2248  int ret;
2249  void *reply_buf;
2250  void *p;
2251  void *end;
2252  u64 seq;
2253  u32 snap_count;
2254  struct ceph_snap_context *snapc;
2255  u32 i;
2256 
2257  /*
2258  * We'll need room for the seq value (maximum snapshot id),
2259  * snapshot count, and array of that many snapshot ids.
2260  * For now we have a fixed upper limit on the number we're
2261  * prepared to receive.
2262  */
2263  size = sizeof (__le64) + sizeof (__le32) +
2264  RBD_MAX_SNAP_COUNT * sizeof (__le64);
2265  reply_buf = kzalloc(size, GFP_KERNEL);
2266  if (!reply_buf)
2267  return -ENOMEM;
2268 
2269  ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2270  "rbd", "get_snapcontext",
2271  NULL, 0,
2272  reply_buf, size,
2273  CEPH_OSD_FLAG_READ, ver);
2274  dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2275  if (ret < 0)
2276  goto out;
2277 
2278  ret = -ERANGE;
2279  p = reply_buf;
2280  end = (char *) reply_buf + size;
2281  ceph_decode_64_safe(&p, end, seq, out);
2282  ceph_decode_32_safe(&p, end, snap_count, out);
2283 
2284  /*
2285  * Make sure the reported number of snapshot ids wouldn't go
2286  * beyond the end of our buffer. But before checking that,
2287  * make sure the computed size of the snapshot context we
2288  * allocate is representable in a size_t.
2289  */
2290  if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2291  / sizeof (u64)) {
2292  ret = -EINVAL;
2293  goto out;
2294  }
2295  if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2296  goto out;
2297 
2298  size = sizeof (struct ceph_snap_context) +
2299  snap_count * sizeof (snapc->snaps[0]);
2300  snapc = kmalloc(size, GFP_KERNEL);
2301  if (!snapc) {
2302  ret = -ENOMEM;
2303  goto out;
2304  }
2305 
2306  atomic_set(&snapc->nref, 1);
2307  snapc->seq = seq;
2308  snapc->num_snaps = snap_count;
2309  for (i = 0; i < snap_count; i++)
2310  snapc->snaps[i] = ceph_decode_64(&p);
2311 
2312  rbd_dev->header.snapc = snapc;
2313 
2314  dout(" snap context seq = %llu, snap_count = %u\n",
2315  (unsigned long long) seq, (unsigned int) snap_count);
2316 
2317 out:
2318  kfree(reply_buf);
2319 
2320  return 0;
2321 }
2322 
2323 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2324 {
2325  size_t size;
2326  void *reply_buf;
2327  __le64 snap_id;
2328  int ret;
2329  void *p;
2330  void *end;
2331  size_t snap_name_len;
2332  char *snap_name;
2333 
2334  size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2335  reply_buf = kmalloc(size, GFP_KERNEL);
2336  if (!reply_buf)
2337  return ERR_PTR(-ENOMEM);
2338 
2339  snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2340  ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2341  "rbd", "get_snapshot_name",
2342  (char *) &snap_id, sizeof (snap_id),
2343  reply_buf, size,
2345  dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2346  if (ret < 0)
2347  goto out;
2348 
2349  p = reply_buf;
2350  end = (char *) reply_buf + size;
2351  snap_name_len = 0;
2352  snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2353  GFP_KERNEL);
2354  if (IS_ERR(snap_name)) {
2355  ret = PTR_ERR(snap_name);
2356  goto out;
2357  } else {
2358  dout(" snap_id 0x%016llx snap_name = %s\n",
2359  (unsigned long long) le64_to_cpu(snap_id), snap_name);
2360  }
2361  kfree(reply_buf);
2362 
2363  return snap_name;
2364 out:
2365  kfree(reply_buf);
2366 
2367  return ERR_PTR(ret);
2368 }
2369 
2370 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2371  u64 *snap_size, u64 *snap_features)
2372 {
2373  __le64 snap_id;
2374  u8 order;
2375  int ret;
2376 
2377  snap_id = rbd_dev->header.snapc->snaps[which];
2378  ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2379  if (ret)
2380  return ERR_PTR(ret);
2381  ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2382  if (ret)
2383  return ERR_PTR(ret);
2384 
2385  return rbd_dev_v2_snap_name(rbd_dev, which);
2386 }
2387 
2388 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2389  u64 *snap_size, u64 *snap_features)
2390 {
2391  if (rbd_dev->image_format == 1)
2392  return rbd_dev_v1_snap_info(rbd_dev, which,
2393  snap_size, snap_features);
2394  if (rbd_dev->image_format == 2)
2395  return rbd_dev_v2_snap_info(rbd_dev, which,
2396  snap_size, snap_features);
2397  return ERR_PTR(-EINVAL);
2398 }
2399 
2400 /*
2401  * Scan the rbd device's current snapshot list and compare it to the
2402  * newly-received snapshot context. Remove any existing snapshots
2403  * not present in the new snapshot context. Add a new snapshot for
2404  * any snaphots in the snapshot context not in the current list.
2405  * And verify there are no changes to snapshots we already know
2406  * about.
2407  *
2408  * Assumes the snapshots in the snapshot context are sorted by
2409  * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2410  * are also maintained in that order.)
2411  */
2412 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2413 {
2414  struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2415  const u32 snap_count = snapc->num_snaps;
2416  struct list_head *head = &rbd_dev->snaps;
2417  struct list_head *links = head->next;
2418  u32 index = 0;
2419 
2420  dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2421  while (index < snap_count || links != head) {
2422  u64 snap_id;
2423  struct rbd_snap *snap;
2424  char *snap_name;
2425  u64 snap_size = 0;
2426  u64 snap_features = 0;
2427 
2428  snap_id = index < snap_count ? snapc->snaps[index]
2429  : CEPH_NOSNAP;
2430  snap = links != head ? list_entry(links, struct rbd_snap, node)
2431  : NULL;
2432  rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2433 
2434  if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2435  struct list_head *next = links->next;
2436 
2437  /* Existing snapshot not in the new snap context */
2438 
2439  if (rbd_dev->mapping.snap_id == snap->id)
2440  rbd_dev->mapping.snap_exists = false;
2441  __rbd_remove_snap_dev(snap);
2442  dout("%ssnap id %llu has been removed\n",
2443  rbd_dev->mapping.snap_id == snap->id ?
2444  "mapped " : "",
2445  (unsigned long long) snap->id);
2446 
2447  /* Done with this list entry; advance */
2448 
2449  links = next;
2450  continue;
2451  }
2452 
2453  snap_name = rbd_dev_snap_info(rbd_dev, index,
2454  &snap_size, &snap_features);
2455  if (IS_ERR(snap_name))
2456  return PTR_ERR(snap_name);
2457 
2458  dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2459  (unsigned long long) snap_id);
2460  if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2461  struct rbd_snap *new_snap;
2462 
2463  /* We haven't seen this snapshot before */
2464 
2465  new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2466  snap_id, snap_size, snap_features);
2467  if (IS_ERR(new_snap)) {
2468  int err = PTR_ERR(new_snap);
2469 
2470  dout(" failed to add dev, error %d\n", err);
2471 
2472  return err;
2473  }
2474 
2475  /* New goes before existing, or at end of list */
2476 
2477  dout(" added dev%s\n", snap ? "" : " at end\n");
2478  if (snap)
2479  list_add_tail(&new_snap->node, &snap->node);
2480  else
2481  list_add_tail(&new_snap->node, head);
2482  } else {
2483  /* Already have this one */
2484 
2485  dout(" already present\n");
2486 
2487  rbd_assert(snap->size == snap_size);
2488  rbd_assert(!strcmp(snap->name, snap_name));
2489  rbd_assert(snap->features == snap_features);
2490 
2491  /* Done with this list entry; advance */
2492 
2493  links = links->next;
2494  }
2495 
2496  /* Advance to the next entry in the snapshot context */
2497 
2498  index++;
2499  }
2500  dout("%s: done\n", __func__);
2501 
2502  return 0;
2503 }
2504 
2505 /*
2506  * Scan the list of snapshots and register the devices for any that
2507  * have not already been registered.
2508  */
2509 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2510 {
2511  struct rbd_snap *snap;
2512  int ret = 0;
2513 
2514  dout("%s called\n", __func__);
2515  if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2516  return -EIO;
2517 
2518  list_for_each_entry(snap, &rbd_dev->snaps, node) {
2519  if (!rbd_snap_registered(snap)) {
2520  ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2521  if (ret < 0)
2522  break;
2523  }
2524  }
2525  dout("%s: returning %d\n", __func__, ret);
2526 
2527  return ret;
2528 }
2529 
2530 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2531 {
2532  struct device *dev;
2533  int ret;
2534 
2536 
2537  dev = &rbd_dev->dev;
2538  dev->bus = &rbd_bus_type;
2539  dev->type = &rbd_device_type;
2540  dev->parent = &rbd_root_dev;
2541  dev->release = rbd_dev_release;
2542  dev_set_name(dev, "%d", rbd_dev->dev_id);
2543  ret = device_register(dev);
2544 
2545  mutex_unlock(&ctl_mutex);
2546 
2547  return ret;
2548 }
2549 
2550 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2551 {
2552  device_unregister(&rbd_dev->dev);
2553 }
2554 
2555 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2556 {
2557  int ret, rc;
2558 
2559  do {
2560  ret = rbd_req_sync_watch(rbd_dev);
2561  if (ret == -ERANGE) {
2562  rc = rbd_refresh_header(rbd_dev, NULL);
2563  if (rc < 0)
2564  return rc;
2565  }
2566  } while (ret == -ERANGE);
2567 
2568  return ret;
2569 }
2570 
2571 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2572 
2573 /*
2574  * Get a unique rbd identifier for the given new rbd_dev, and add
2575  * the rbd_dev to the global list. The minimum rbd id is 1.
2576  */
2577 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2578 {
2579  rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2580 
2581  spin_lock(&rbd_dev_list_lock);
2582  list_add_tail(&rbd_dev->node, &rbd_dev_list);
2583  spin_unlock(&rbd_dev_list_lock);
2584  dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2585  (unsigned long long) rbd_dev->dev_id);
2586 }
2587 
2588 /*
2589  * Remove an rbd_dev from the global list, and record that its
2590  * identifier is no longer in use.
2591  */
2592 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2593 {
2594  struct list_head *tmp;
2595  int rbd_id = rbd_dev->dev_id;
2596  int max_id;
2597 
2598  rbd_assert(rbd_id > 0);
2599 
2600  dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2601  (unsigned long long) rbd_dev->dev_id);
2602  spin_lock(&rbd_dev_list_lock);
2603  list_del_init(&rbd_dev->node);
2604 
2605  /*
2606  * If the id being "put" is not the current maximum, there
2607  * is nothing special we need to do.
2608  */
2609  if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2610  spin_unlock(&rbd_dev_list_lock);
2611  return;
2612  }
2613 
2614  /*
2615  * We need to update the current maximum id. Search the
2616  * list to find out what it is. We're more likely to find
2617  * the maximum at the end, so search the list backward.
2618  */
2619  max_id = 0;
2620  list_for_each_prev(tmp, &rbd_dev_list) {
2621  struct rbd_device *rbd_dev;
2622 
2623  rbd_dev = list_entry(tmp, struct rbd_device, node);
2624  if (rbd_id > max_id)
2625  max_id = rbd_id;
2626  }
2627  spin_unlock(&rbd_dev_list_lock);
2628 
2629  /*
2630  * The max id could have been updated by rbd_dev_id_get(), in
2631  * which case it now accurately reflects the new maximum.
2632  * Be careful not to overwrite the maximum value in that
2633  * case.
2634  */
2635  atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2636  dout(" max dev id has been reset\n");
2637 }
2638 
2639 /*
2640  * Skips over white space at *buf, and updates *buf to point to the
2641  * first found non-space character (if any). Returns the length of
2642  * the token (string of non-white space characters) found. Note
2643  * that *buf must be terminated with '\0'.
2644  */
2645 static inline size_t next_token(const char **buf)
2646 {
2647  /*
2648  * These are the characters that produce nonzero for
2649  * isspace() in the "C" and "POSIX" locales.
2650  */
2651  const char *spaces = " \f\n\r\t\v";
2652 
2653  *buf += strspn(*buf, spaces); /* Find start of token */
2654 
2655  return strcspn(*buf, spaces); /* Return token length */
2656 }
2657 
2658 /*
2659  * Finds the next token in *buf, and if the provided token buffer is
2660  * big enough, copies the found token into it. The result, if
2661  * copied, is guaranteed to be terminated with '\0'. Note that *buf
2662  * must be terminated with '\0' on entry.
2663  *
2664  * Returns the length of the token found (not including the '\0').
2665  * Return value will be 0 if no token is found, and it will be >=
2666  * token_size if the token would not fit.
2667  *
2668  * The *buf pointer will be updated to point beyond the end of the
2669  * found token. Note that this occurs even if the token buffer is
2670  * too small to hold it.
2671  */
2672 static inline size_t copy_token(const char **buf,
2673  char *token,
2674  size_t token_size)
2675 {
2676  size_t len;
2677 
2678  len = next_token(buf);
2679  if (len < token_size) {
2680  memcpy(token, *buf, len);
2681  *(token + len) = '\0';
2682  }
2683  *buf += len;
2684 
2685  return len;
2686 }
2687 
2688 /*
2689  * Finds the next token in *buf, dynamically allocates a buffer big
2690  * enough to hold a copy of it, and copies the token into the new
2691  * buffer. The copy is guaranteed to be terminated with '\0'. Note
2692  * that a duplicate buffer is created even for a zero-length token.
2693  *
2694  * Returns a pointer to the newly-allocated duplicate, or a null
2695  * pointer if memory for the duplicate was not available. If
2696  * the lenp argument is a non-null pointer, the length of the token
2697  * (not including the '\0') is returned in *lenp.
2698  *
2699  * If successful, the *buf pointer will be updated to point beyond
2700  * the end of the found token.
2701  *
2702  * Note: uses GFP_KERNEL for allocation.
2703  */
2704 static inline char *dup_token(const char **buf, size_t *lenp)
2705 {
2706  char *dup;
2707  size_t len;
2708 
2709  len = next_token(buf);
2710  dup = kmalloc(len + 1, GFP_KERNEL);
2711  if (!dup)
2712  return NULL;
2713 
2714  memcpy(dup, *buf, len);
2715  *(dup + len) = '\0';
2716  *buf += len;
2717 
2718  if (lenp)
2719  *lenp = len;
2720 
2721  return dup;
2722 }
2723 
2724 /*
2725  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2726  * rbd_md_name, and name fields of the given rbd_dev, based on the
2727  * list of monitor addresses and other options provided via
2728  * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2729  * copy of the snapshot name to map if successful, or a
2730  * pointer-coded error otherwise.
2731  *
2732  * Note: rbd_dev is assumed to have been initially zero-filled.
2733  */
2734 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2735  const char *buf,
2736  const char **mon_addrs,
2737  size_t *mon_addrs_size,
2738  char *options,
2739  size_t options_size)
2740 {
2741  size_t len;
2742  char *err_ptr = ERR_PTR(-EINVAL);
2743  char *snap_name;
2744 
2745  /* The first four tokens are required */
2746 
2747  len = next_token(&buf);
2748  if (!len)
2749  return err_ptr;
2750  *mon_addrs_size = len + 1;
2751  *mon_addrs = buf;
2752 
2753  buf += len;
2754 
2755  len = copy_token(&buf, options, options_size);
2756  if (!len || len >= options_size)
2757  return err_ptr;
2758 
2759  err_ptr = ERR_PTR(-ENOMEM);
2760  rbd_dev->pool_name = dup_token(&buf, NULL);
2761  if (!rbd_dev->pool_name)
2762  goto out_err;
2763 
2764  rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2765  if (!rbd_dev->image_name)
2766  goto out_err;
2767 
2768  /* Snapshot name is optional */
2769  len = next_token(&buf);
2770  if (!len) {
2771  buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2772  len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2773  }
2774  snap_name = kmalloc(len + 1, GFP_KERNEL);
2775  if (!snap_name)
2776  goto out_err;
2777  memcpy(snap_name, buf, len);
2778  *(snap_name + len) = '\0';
2779 
2780 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2781 
2782  return snap_name;
2783 
2784 out_err:
2785  kfree(rbd_dev->image_name);
2786  rbd_dev->image_name = NULL;
2787  rbd_dev->image_name_len = 0;
2788  kfree(rbd_dev->pool_name);
2789  rbd_dev->pool_name = NULL;
2790 
2791  return err_ptr;
2792 }
2793 
2794 /*
2795  * An rbd format 2 image has a unique identifier, distinct from the
2796  * name given to it by the user. Internally, that identifier is
2797  * what's used to specify the names of objects related to the image.
2798  *
2799  * A special "rbd id" object is used to map an rbd image name to its
2800  * id. If that object doesn't exist, then there is no v2 rbd image
2801  * with the supplied name.
2802  *
2803  * This function will record the given rbd_dev's image_id field if
2804  * it can be determined, and in that case will return 0. If any
2805  * errors occur a negative errno will be returned and the rbd_dev's
2806  * image_id field will be unchanged (and should be NULL).
2807  */
2808 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2809 {
2810  int ret;
2811  size_t size;
2812  char *object_name;
2813  void *response;
2814  void *p;
2815 
2816  /*
2817  * First, see if the format 2 image id file exists, and if
2818  * so, get the image's persistent id from it.
2819  */
2820  size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2821  object_name = kmalloc(size, GFP_NOIO);
2822  if (!object_name)
2823  return -ENOMEM;
2824  sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2825  dout("rbd id object name is %s\n", object_name);
2826 
2827  /* Response will be an encoded string, which includes a length */
2828 
2829  size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2830  response = kzalloc(size, GFP_NOIO);
2831  if (!response) {
2832  ret = -ENOMEM;
2833  goto out;
2834  }
2835 
2836  ret = rbd_req_sync_exec(rbd_dev, object_name,
2837  "rbd", "get_id",
2838  NULL, 0,
2839  response, RBD_IMAGE_ID_LEN_MAX,
2841  dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2842  if (ret < 0)
2843  goto out;
2844 
2845  p = response;
2846  rbd_dev->image_id = ceph_extract_encoded_string(&p,
2848  &rbd_dev->image_id_len,
2849  GFP_NOIO);
2850  if (IS_ERR(rbd_dev->image_id)) {
2851  ret = PTR_ERR(rbd_dev->image_id);
2852  rbd_dev->image_id = NULL;
2853  } else {
2854  dout("image_id is %s\n", rbd_dev->image_id);
2855  }
2856 out:
2857  kfree(response);
2858  kfree(object_name);
2859 
2860  return ret;
2861 }
2862 
2863 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2864 {
2865  int ret;
2866  size_t size;
2867 
2868  /* Version 1 images have no id; empty string is used */
2869 
2870  rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2871  if (!rbd_dev->image_id)
2872  return -ENOMEM;
2873  rbd_dev->image_id_len = 0;
2874 
2875  /* Record the header object name for this rbd image. */
2876 
2877  size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2878  rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2879  if (!rbd_dev->header_name) {
2880  ret = -ENOMEM;
2881  goto out_err;
2882  }
2883  sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2884 
2885  /* Populate rbd image metadata */
2886 
2887  ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2888  if (ret < 0)
2889  goto out_err;
2890  rbd_dev->image_format = 1;
2891 
2892  dout("discovered version 1 image, header name is %s\n",
2893  rbd_dev->header_name);
2894 
2895  return 0;
2896 
2897 out_err:
2898  kfree(rbd_dev->header_name);
2899  rbd_dev->header_name = NULL;
2900  kfree(rbd_dev->image_id);
2901  rbd_dev->image_id = NULL;
2902 
2903  return ret;
2904 }
2905 
2906 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2907 {
2908  size_t size;
2909  int ret;
2910  u64 ver = 0;
2911 
2912  /*
2913  * Image id was filled in by the caller. Record the header
2914  * object name for this rbd image.
2915  */
2916  size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2917  rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2918  if (!rbd_dev->header_name)
2919  return -ENOMEM;
2920  sprintf(rbd_dev->header_name, "%s%s",
2921  RBD_HEADER_PREFIX, rbd_dev->image_id);
2922 
2923  /* Get the size and object order for the image */
2924 
2925  ret = rbd_dev_v2_image_size(rbd_dev);
2926  if (ret < 0)
2927  goto out_err;
2928 
2929  /* Get the object prefix (a.k.a. block_name) for the image */
2930 
2931  ret = rbd_dev_v2_object_prefix(rbd_dev);
2932  if (ret < 0)
2933  goto out_err;
2934 
2935  /* Get the features for the image */
2936 
2937  ret = rbd_dev_v2_features(rbd_dev);
2938  if (ret < 0)
2939  goto out_err;
2940 
2941  /* crypto and compression type aren't (yet) supported for v2 images */
2942 
2943  rbd_dev->header.crypt_type = 0;
2944  rbd_dev->header.comp_type = 0;
2945 
2946  /* Get the snapshot context, plus the header version */
2947 
2948  ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2949  if (ret)
2950  goto out_err;
2951  rbd_dev->header.obj_version = ver;
2952 
2953  rbd_dev->image_format = 2;
2954 
2955  dout("discovered version 2 image, header name is %s\n",
2956  rbd_dev->header_name);
2957 
2958  return -ENOTSUPP;
2959 out_err:
2960  kfree(rbd_dev->header_name);
2961  rbd_dev->header_name = NULL;
2962  kfree(rbd_dev->header.object_prefix);
2963  rbd_dev->header.object_prefix = NULL;
2964 
2965  return ret;
2966 }
2967 
2968 /*
2969  * Probe for the existence of the header object for the given rbd
2970  * device. For format 2 images this includes determining the image
2971  * id.
2972  */
2973 static int rbd_dev_probe(struct rbd_device *rbd_dev)
2974 {
2975  int ret;
2976 
2977  /*
2978  * Get the id from the image id object. If it's not a
2979  * format 2 image, we'll get ENOENT back, and we'll assume
2980  * it's a format 1 image.
2981  */
2982  ret = rbd_dev_image_id(rbd_dev);
2983  if (ret)
2984  ret = rbd_dev_v1_probe(rbd_dev);
2985  else
2986  ret = rbd_dev_v2_probe(rbd_dev);
2987  if (ret)
2988  dout("probe failed, returning %d\n", ret);
2989 
2990  return ret;
2991 }
2992 
2993 static ssize_t rbd_add(struct bus_type *bus,
2994  const char *buf,
2995  size_t count)
2996 {
2997  char *options;
2998  struct rbd_device *rbd_dev = NULL;
2999  const char *mon_addrs = NULL;
3000  size_t mon_addrs_size = 0;
3001  struct ceph_osd_client *osdc;
3002  int rc = -ENOMEM;
3003  char *snap_name;
3004 
3005  if (!try_module_get(THIS_MODULE))
3006  return -ENODEV;
3007 
3008  options = kmalloc(count, GFP_KERNEL);
3009  if (!options)
3010  goto err_out_mem;
3011  rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3012  if (!rbd_dev)
3013  goto err_out_mem;
3014 
3015  /* static rbd_device initialization */
3016  spin_lock_init(&rbd_dev->lock);
3017  INIT_LIST_HEAD(&rbd_dev->node);
3018  INIT_LIST_HEAD(&rbd_dev->snaps);
3019  init_rwsem(&rbd_dev->header_rwsem);
3020 
3021  /* parse add command */
3022  snap_name = rbd_add_parse_args(rbd_dev, buf,
3023  &mon_addrs, &mon_addrs_size, options, count);
3024  if (IS_ERR(snap_name)) {
3025  rc = PTR_ERR(snap_name);
3026  goto err_out_mem;
3027  }
3028 
3029  rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3030  if (rc < 0)
3031  goto err_out_args;
3032 
3033  /* pick the pool */
3034  osdc = &rbd_dev->rbd_client->client->osdc;
3035  rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3036  if (rc < 0)
3037  goto err_out_client;
3038  rbd_dev->pool_id = rc;
3039 
3040  rc = rbd_dev_probe(rbd_dev);
3041  if (rc < 0)
3042  goto err_out_client;
3043  rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3044 
3045  /* no need to lock here, as rbd_dev is not registered yet */
3046  rc = rbd_dev_snaps_update(rbd_dev);
3047  if (rc)
3048  goto err_out_header;
3049 
3050  rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3051  if (rc)
3052  goto err_out_header;
3053 
3054  /* generate unique id: find highest unique id, add one */
3055  rbd_dev_id_get(rbd_dev);
3056 
3057  /* Fill in the device name, now that we have its id. */
3059  < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3060  sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3061 
3062  /* Get our block major device number. */
3063 
3064  rc = register_blkdev(0, rbd_dev->name);
3065  if (rc < 0)
3066  goto err_out_id;
3067  rbd_dev->major = rc;
3068 
3069  /* Set up the blkdev mapping. */
3070 
3071  rc = rbd_init_disk(rbd_dev);
3072  if (rc)
3073  goto err_out_blkdev;
3074 
3075  rc = rbd_bus_add_dev(rbd_dev);
3076  if (rc)
3077  goto err_out_disk;
3078 
3079  /*
3080  * At this point cleanup in the event of an error is the job
3081  * of the sysfs code (initiated by rbd_bus_del_dev()).
3082  */
3083 
3084  down_write(&rbd_dev->header_rwsem);
3085  rc = rbd_dev_snaps_register(rbd_dev);
3086  up_write(&rbd_dev->header_rwsem);
3087  if (rc)
3088  goto err_out_bus;
3089 
3090  rc = rbd_init_watch_dev(rbd_dev);
3091  if (rc)
3092  goto err_out_bus;
3093 
3094  /* Everything's ready. Announce the disk to the world. */
3095 
3096  add_disk(rbd_dev->disk);
3097 
3098  pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3099  (unsigned long long) rbd_dev->mapping.size);
3100 
3101  return count;
3102 
3103 err_out_bus:
3104  /* this will also clean up rest of rbd_dev stuff */
3105 
3106  rbd_bus_del_dev(rbd_dev);
3107  kfree(options);
3108  return rc;
3109 
3110 err_out_disk:
3111  rbd_free_disk(rbd_dev);
3112 err_out_blkdev:
3113  unregister_blkdev(rbd_dev->major, rbd_dev->name);
3114 err_out_id:
3115  rbd_dev_id_put(rbd_dev);
3116 err_out_header:
3117  rbd_header_free(&rbd_dev->header);
3118 err_out_client:
3119  kfree(rbd_dev->header_name);
3120  rbd_put_client(rbd_dev);
3121  kfree(rbd_dev->image_id);
3122 err_out_args:
3123  kfree(rbd_dev->mapping.snap_name);
3124  kfree(rbd_dev->image_name);
3125  kfree(rbd_dev->pool_name);
3126 err_out_mem:
3127  kfree(rbd_dev);
3128  kfree(options);
3129 
3130  dout("Error adding device %s\n", buf);
3131  module_put(THIS_MODULE);
3132 
3133  return (ssize_t) rc;
3134 }
3135 
3136 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3137 {
3138  struct list_head *tmp;
3139  struct rbd_device *rbd_dev;
3140 
3141  spin_lock(&rbd_dev_list_lock);
3142  list_for_each(tmp, &rbd_dev_list) {
3143  rbd_dev = list_entry(tmp, struct rbd_device, node);
3144  if (rbd_dev->dev_id == dev_id) {
3145  spin_unlock(&rbd_dev_list_lock);
3146  return rbd_dev;
3147  }
3148  }
3149  spin_unlock(&rbd_dev_list_lock);
3150  return NULL;
3151 }
3152 
3153 static void rbd_dev_release(struct device *dev)
3154 {
3155  struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3156 
3157  if (rbd_dev->watch_request) {
3158  struct ceph_client *client = rbd_dev->rbd_client->client;
3159 
3161  rbd_dev->watch_request);
3162  }
3163  if (rbd_dev->watch_event)
3164  rbd_req_sync_unwatch(rbd_dev);
3165 
3166  rbd_put_client(rbd_dev);
3167 
3168  /* clean up and free blkdev */
3169  rbd_free_disk(rbd_dev);
3170  unregister_blkdev(rbd_dev->major, rbd_dev->name);
3171 
3172  /* release allocated disk header fields */
3173  rbd_header_free(&rbd_dev->header);
3174 
3175  /* done with the id, and with the rbd_dev */
3176  kfree(rbd_dev->mapping.snap_name);
3177  kfree(rbd_dev->image_id);
3178  kfree(rbd_dev->header_name);
3179  kfree(rbd_dev->pool_name);
3180  kfree(rbd_dev->image_name);
3181  rbd_dev_id_put(rbd_dev);
3182  kfree(rbd_dev);
3183 
3184  /* release module ref */
3185  module_put(THIS_MODULE);
3186 }
3187 
3188 static ssize_t rbd_remove(struct bus_type *bus,
3189  const char *buf,
3190  size_t count)
3191 {
3192  struct rbd_device *rbd_dev = NULL;
3193  int target_id, rc;
3194  unsigned long ul;
3195  int ret = count;
3196 
3197  rc = strict_strtoul(buf, 10, &ul);
3198  if (rc)
3199  return rc;
3200 
3201  /* convert to int; abort if we lost anything in the conversion */
3202  target_id = (int) ul;
3203  if (target_id != ul)
3204  return -EINVAL;
3205 
3207 
3208  rbd_dev = __rbd_get_dev(target_id);
3209  if (!rbd_dev) {
3210  ret = -ENOENT;
3211  goto done;
3212  }
3213 
3214  __rbd_remove_all_snaps(rbd_dev);
3215  rbd_bus_del_dev(rbd_dev);
3216 
3217 done:
3218  mutex_unlock(&ctl_mutex);
3219 
3220  return ret;
3221 }
3222 
3223 /*
3224  * create control files in sysfs
3225  * /sys/bus/rbd/...
3226  */
3227 static int rbd_sysfs_init(void)
3228 {
3229  int ret;
3230 
3231  ret = device_register(&rbd_root_dev);
3232  if (ret < 0)
3233  return ret;
3234 
3235  ret = bus_register(&rbd_bus_type);
3236  if (ret < 0)
3237  device_unregister(&rbd_root_dev);
3238 
3239  return ret;
3240 }
3241 
3242 static void rbd_sysfs_cleanup(void)
3243 {
3244  bus_unregister(&rbd_bus_type);
3245  device_unregister(&rbd_root_dev);
3246 }
3247 
3248 int __init rbd_init(void)
3249 {
3250  int rc;
3251 
3252  rc = rbd_sysfs_init();
3253  if (rc)
3254  return rc;
3255  pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3256  return 0;
3257 }
3258 
3259 void __exit rbd_exit(void)
3260 {
3261  rbd_sysfs_cleanup();
3262 }
3263 
3266 
3267 MODULE_AUTHOR("Sage Weil <[email protected]>");
3268 MODULE_AUTHOR("Yehuda Sadeh <[email protected]>");
3269 MODULE_DESCRIPTION("rados block device");
3270 
3271 /* following authorship retained from original osdblk.c */
3272 MODULE_AUTHOR("Jeff Garzik <[email protected]>");
3273 
3274 MODULE_LICENSE("GPL");