GNU Linux-libre 4.9.318-gnu1
[releases.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG       /* Activate rbd_assert() calls */
52
53 /*
54  * Increment the given counter and return its updated value.
55  * If the counter is already 0 it will not be incremented.
56  * If the counter is already at its maximum value returns
57  * -EINVAL without updating it.
58  */
59 static int atomic_inc_return_safe(atomic_t *v)
60 {
61         unsigned int counter;
62
63         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
64         if (counter <= (unsigned int)INT_MAX)
65                 return (int)counter;
66
67         atomic_dec(v);
68
69         return -EINVAL;
70 }
71
72 /* Decrement the counter.  Return the resulting value, or -EINVAL */
73 static int atomic_dec_return_safe(atomic_t *v)
74 {
75         int counter;
76
77         counter = atomic_dec_return(v);
78         if (counter >= 0)
79                 return counter;
80
81         atomic_inc(v);
82
83         return -EINVAL;
84 }
85
86 #define RBD_DRV_NAME "rbd"
87
88 #define RBD_MINORS_PER_MAJOR            256
89 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
90
91 #define RBD_MAX_PARENT_CHAIN_LEN        16
92
93 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
94 #define RBD_MAX_SNAP_NAME_LEN   \
95                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
96
97 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
98
99 #define RBD_SNAP_HEAD_NAME      "-"
100
101 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
102
103 /* This allows a single page to hold an image name sent by OSD */
104 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
105 #define RBD_IMAGE_ID_LEN_MAX    64
106
107 #define RBD_OBJ_PREFIX_LEN_MAX  64
108
109 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
110 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING    (1<<0)
115 #define RBD_FEATURE_STRIPINGV2  (1<<1)
116 #define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
117 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
118                                  RBD_FEATURE_STRIPINGV2 |       \
119                                  RBD_FEATURE_EXCLUSIVE_LOCK)
120
121 /* Features supported by this (client software) implementation. */
122
123 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
124
125 /*
126  * An RBD device name will be "rbd#", where the "rbd" comes from
127  * RBD_DRV_NAME above, and # is a unique integer identifier.
128  */
129 #define DEV_NAME_LEN            32
130
131 /*
132  * block device image metadata (in-memory version)
133  */
134 struct rbd_image_header {
135         /* These six fields never change for a given rbd image */
136         char *object_prefix;
137         __u8 obj_order;
138         __u8 crypt_type;
139         __u8 comp_type;
140         u64 stripe_unit;
141         u64 stripe_count;
142         u64 features;           /* Might be changeable someday? */
143
144         /* The remaining fields need to be updated occasionally */
145         u64 image_size;
146         struct ceph_snap_context *snapc;
147         char *snap_names;       /* format 1 only */
148         u64 *snap_sizes;        /* format 1 only */
149 };
150
151 /*
152  * An rbd image specification.
153  *
154  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
155  * identify an image.  Each rbd_dev structure includes a pointer to
156  * an rbd_spec structure that encapsulates this identity.
157  *
158  * Each of the id's in an rbd_spec has an associated name.  For a
159  * user-mapped image, the names are supplied and the id's associated
160  * with them are looked up.  For a layered image, a parent image is
161  * defined by the tuple, and the names are looked up.
162  *
163  * An rbd_dev structure contains a parent_spec pointer which is
164  * non-null if the image it represents is a child in a layered
165  * image.  This pointer will refer to the rbd_spec structure used
166  * by the parent rbd_dev for its own identity (i.e., the structure
167  * is shared between the parent and child).
168  *
169  * Since these structures are populated once, during the discovery
170  * phase of image construction, they are effectively immutable so
171  * we make no effort to synchronize access to them.
172  *
173  * Note that code herein does not assume the image name is known (it
174  * could be a null pointer).
175  */
176 struct rbd_spec {
177         u64             pool_id;
178         const char      *pool_name;
179
180         const char      *image_id;
181         const char      *image_name;
182
183         u64             snap_id;
184         const char      *snap_name;
185
186         struct kref     kref;
187 };
188
189 /*
190  * an instance of the client.  multiple devices may share an rbd client.
191  */
192 struct rbd_client {
193         struct ceph_client      *client;
194         struct kref             kref;
195         struct list_head        node;
196 };
197
198 struct rbd_img_request;
199 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
200
201 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
202
203 struct rbd_obj_request;
204 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
205
206 enum obj_request_type {
207         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
208 };
209
210 enum obj_operation_type {
211         OBJ_OP_WRITE,
212         OBJ_OP_READ,
213         OBJ_OP_DISCARD,
214 };
215
216 enum obj_req_flags {
217         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
218         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
219         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
220         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
221 };
222
223 struct rbd_obj_request {
224         const char              *object_name;
225         u64                     offset;         /* object start byte */
226         u64                     length;         /* bytes from offset */
227         unsigned long           flags;
228
229         /*
230          * An object request associated with an image will have its
231          * img_data flag set; a standalone object request will not.
232          *
233          * A standalone object request will have which == BAD_WHICH
234          * and a null obj_request pointer.
235          *
236          * An object request initiated in support of a layered image
237          * object (to check for its existence before a write) will
238          * have which == BAD_WHICH and a non-null obj_request pointer.
239          *
240          * Finally, an object request for rbd image data will have
241          * which != BAD_WHICH, and will have a non-null img_request
242          * pointer.  The value of which will be in the range
243          * 0..(img_request->obj_request_count-1).
244          */
245         union {
246                 struct rbd_obj_request  *obj_request;   /* STAT op */
247                 struct {
248                         struct rbd_img_request  *img_request;
249                         u64                     img_offset;
250                         /* links for img_request->obj_requests list */
251                         struct list_head        links;
252                 };
253         };
254         u32                     which;          /* posn image request list */
255
256         enum obj_request_type   type;
257         union {
258                 struct bio      *bio_list;
259                 struct {
260                         struct page     **pages;
261                         u32             page_count;
262                 };
263         };
264         struct page             **copyup_pages;
265         u32                     copyup_page_count;
266
267         struct ceph_osd_request *osd_req;
268
269         u64                     xferred;        /* bytes transferred */
270         int                     result;
271
272         rbd_obj_callback_t      callback;
273         struct completion       completion;
274
275         struct kref             kref;
276 };
277
278 enum img_req_flags {
279         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
280         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
281         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
282         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
283 };
284
285 struct rbd_img_request {
286         struct rbd_device       *rbd_dev;
287         u64                     offset; /* starting image byte offset */
288         u64                     length; /* byte count from offset */
289         unsigned long           flags;
290         union {
291                 u64                     snap_id;        /* for reads */
292                 struct ceph_snap_context *snapc;        /* for writes */
293         };
294         union {
295                 struct request          *rq;            /* block request */
296                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
297         };
298         struct page             **copyup_pages;
299         u32                     copyup_page_count;
300         spinlock_t              completion_lock;/* protects next_completion */
301         u32                     next_completion;
302         rbd_img_callback_t      callback;
303         u64                     xferred;/* aggregate bytes transferred */
304         int                     result; /* first nonzero obj_request result */
305
306         u32                     obj_request_count;
307         struct list_head        obj_requests;   /* rbd_obj_request structs */
308
309         struct kref             kref;
310 };
311
312 #define for_each_obj_request(ireq, oreq) \
313         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
314 #define for_each_obj_request_from(ireq, oreq) \
315         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
316 #define for_each_obj_request_safe(ireq, oreq, n) \
317         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
318
319 enum rbd_watch_state {
320         RBD_WATCH_STATE_UNREGISTERED,
321         RBD_WATCH_STATE_REGISTERED,
322         RBD_WATCH_STATE_ERROR,
323 };
324
325 enum rbd_lock_state {
326         RBD_LOCK_STATE_UNLOCKED,
327         RBD_LOCK_STATE_LOCKED,
328         RBD_LOCK_STATE_RELEASING,
329 };
330
331 /* WatchNotify::ClientId */
332 struct rbd_client_id {
333         u64 gid;
334         u64 handle;
335 };
336
337 struct rbd_mapping {
338         u64                     size;
339         u64                     features;
340         bool                    read_only;
341 };
342
343 /*
344  * a single device
345  */
346 struct rbd_device {
347         int                     dev_id;         /* blkdev unique id */
348
349         int                     major;          /* blkdev assigned major */
350         int                     minor;
351         struct gendisk          *disk;          /* blkdev's gendisk and rq */
352
353         u32                     image_format;   /* Either 1 or 2 */
354         struct rbd_client       *rbd_client;
355
356         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
357
358         spinlock_t              lock;           /* queue, flags, open_count */
359
360         struct rbd_image_header header;
361         unsigned long           flags;          /* possibly lock protected */
362         struct rbd_spec         *spec;
363         struct rbd_options      *opts;
364         char                    *config_info;   /* add{,_single_major} string */
365
366         struct ceph_object_id   header_oid;
367         struct ceph_object_locator header_oloc;
368
369         struct ceph_file_layout layout;         /* used for all rbd requests */
370
371         struct mutex            watch_mutex;
372         enum rbd_watch_state    watch_state;
373         struct ceph_osd_linger_request *watch_handle;
374         u64                     watch_cookie;
375         struct delayed_work     watch_dwork;
376
377         struct rw_semaphore     lock_rwsem;
378         enum rbd_lock_state     lock_state;
379         struct rbd_client_id    owner_cid;
380         struct work_struct      acquired_lock_work;
381         struct work_struct      released_lock_work;
382         struct delayed_work     lock_dwork;
383         struct work_struct      unlock_work;
384         wait_queue_head_t       lock_waitq;
385
386         struct workqueue_struct *task_wq;
387
388         struct rbd_spec         *parent_spec;
389         u64                     parent_overlap;
390         atomic_t                parent_ref;
391         struct rbd_device       *parent;
392
393         /* Block layer tags. */
394         struct blk_mq_tag_set   tag_set;
395
396         /* protects updating the header */
397         struct rw_semaphore     header_rwsem;
398
399         struct rbd_mapping      mapping;
400
401         struct list_head        node;
402
403         /* sysfs related */
404         struct device           dev;
405         unsigned long           open_count;     /* protected by lock */
406 };
407
408 /*
409  * Flag bits for rbd_dev->flags:
410  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
411  *   by rbd_dev->lock
412  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
413  */
414 enum rbd_dev_flags {
415         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
416         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
417         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
418 };
419
420 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
421
422 static LIST_HEAD(rbd_dev_list);    /* devices */
423 static DEFINE_SPINLOCK(rbd_dev_list_lock);
424
425 static LIST_HEAD(rbd_client_list);              /* clients */
426 static DEFINE_SPINLOCK(rbd_client_list_lock);
427
428 /* Slab caches for frequently-allocated structures */
429
430 static struct kmem_cache        *rbd_img_request_cache;
431 static struct kmem_cache        *rbd_obj_request_cache;
432 static struct kmem_cache        *rbd_segment_name_cache;
433
434 static int rbd_major;
435 static DEFINE_IDA(rbd_dev_id_ida);
436
437 static struct workqueue_struct *rbd_wq;
438
439 /*
440  * Default to false for now, as single-major requires >= 0.75 version of
441  * userspace rbd utility.
442  */
443 static bool single_major = false;
444 module_param(single_major, bool, S_IRUGO);
445 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
446
447 static int rbd_img_request_submit(struct rbd_img_request *img_request);
448
449 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
450                        size_t count);
451 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
452                           size_t count);
453 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
454                                     size_t count);
455 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
456                                        size_t count);
457 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
458 static void rbd_spec_put(struct rbd_spec *spec);
459
460 static int rbd_dev_id_to_minor(int dev_id)
461 {
462         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
463 }
464
465 static int minor_to_rbd_dev_id(int minor)
466 {
467         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
468 }
469
470 static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
471 {
472         return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
473                rbd_dev->spec->snap_id == CEPH_NOSNAP &&
474                !rbd_dev->mapping.read_only;
475 }
476
477 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
478 {
479         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
480                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
481 }
482
483 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
484 {
485         bool is_lock_owner;
486
487         down_read(&rbd_dev->lock_rwsem);
488         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
489         up_read(&rbd_dev->lock_rwsem);
490         return is_lock_owner;
491 }
492
493 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
494 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
495 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
496 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
497
498 static struct attribute *rbd_bus_attrs[] = {
499         &bus_attr_add.attr,
500         &bus_attr_remove.attr,
501         &bus_attr_add_single_major.attr,
502         &bus_attr_remove_single_major.attr,
503         NULL,
504 };
505
506 static umode_t rbd_bus_is_visible(struct kobject *kobj,
507                                   struct attribute *attr, int index)
508 {
509         if (!single_major &&
510             (attr == &bus_attr_add_single_major.attr ||
511              attr == &bus_attr_remove_single_major.attr))
512                 return 0;
513
514         return attr->mode;
515 }
516
517 static const struct attribute_group rbd_bus_group = {
518         .attrs = rbd_bus_attrs,
519         .is_visible = rbd_bus_is_visible,
520 };
521 __ATTRIBUTE_GROUPS(rbd_bus);
522
523 static struct bus_type rbd_bus_type = {
524         .name           = "rbd",
525         .bus_groups     = rbd_bus_groups,
526 };
527
528 static void rbd_root_dev_release(struct device *dev)
529 {
530 }
531
532 static struct device rbd_root_dev = {
533         .init_name =    "rbd",
534         .release =      rbd_root_dev_release,
535 };
536
537 static __printf(2, 3)
538 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
539 {
540         struct va_format vaf;
541         va_list args;
542
543         va_start(args, fmt);
544         vaf.fmt = fmt;
545         vaf.va = &args;
546
547         if (!rbd_dev)
548                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
549         else if (rbd_dev->disk)
550                 printk(KERN_WARNING "%s: %s: %pV\n",
551                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
552         else if (rbd_dev->spec && rbd_dev->spec->image_name)
553                 printk(KERN_WARNING "%s: image %s: %pV\n",
554                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
555         else if (rbd_dev->spec && rbd_dev->spec->image_id)
556                 printk(KERN_WARNING "%s: id %s: %pV\n",
557                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
558         else    /* punt */
559                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
560                         RBD_DRV_NAME, rbd_dev, &vaf);
561         va_end(args);
562 }
563
564 #ifdef RBD_DEBUG
565 #define rbd_assert(expr)                                                \
566                 if (unlikely(!(expr))) {                                \
567                         printk(KERN_ERR "\nAssertion failure in %s() "  \
568                                                 "at line %d:\n\n"       \
569                                         "\trbd_assert(%s);\n\n",        \
570                                         __func__, __LINE__, #expr);     \
571                         BUG();                                          \
572                 }
573 #else /* !RBD_DEBUG */
574 #  define rbd_assert(expr)      ((void) 0)
575 #endif /* !RBD_DEBUG */
576
577 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
578 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
579 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
580 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
581
582 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
583 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
584 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
585 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
586 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
587                                         u64 snap_id);
588 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
589                                 u8 *order, u64 *snap_size);
590 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
591                 u64 *snap_features);
592
593 static int rbd_open(struct block_device *bdev, fmode_t mode)
594 {
595         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
596         bool removing = false;
597
598         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
599                 return -EROFS;
600
601         spin_lock_irq(&rbd_dev->lock);
602         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
603                 removing = true;
604         else
605                 rbd_dev->open_count++;
606         spin_unlock_irq(&rbd_dev->lock);
607         if (removing)
608                 return -ENOENT;
609
610         (void) get_device(&rbd_dev->dev);
611
612         return 0;
613 }
614
615 static void rbd_release(struct gendisk *disk, fmode_t mode)
616 {
617         struct rbd_device *rbd_dev = disk->private_data;
618         unsigned long open_count_before;
619
620         spin_lock_irq(&rbd_dev->lock);
621         open_count_before = rbd_dev->open_count--;
622         spin_unlock_irq(&rbd_dev->lock);
623         rbd_assert(open_count_before > 0);
624
625         put_device(&rbd_dev->dev);
626 }
627
628 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
629 {
630         int ret = 0;
631         int val;
632         bool ro;
633         bool ro_changed = false;
634
635         /* get_user() may sleep, so call it before taking rbd_dev->lock */
636         if (get_user(val, (int __user *)(arg)))
637                 return -EFAULT;
638
639         ro = val ? true : false;
640         /* Snapshot doesn't allow to write*/
641         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
642                 return -EROFS;
643
644         spin_lock_irq(&rbd_dev->lock);
645         /* prevent others open this device */
646         if (rbd_dev->open_count > 1) {
647                 ret = -EBUSY;
648                 goto out;
649         }
650
651         if (rbd_dev->mapping.read_only != ro) {
652                 rbd_dev->mapping.read_only = ro;
653                 ro_changed = true;
654         }
655
656 out:
657         spin_unlock_irq(&rbd_dev->lock);
658         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
659         if (ret == 0 && ro_changed)
660                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
661
662         return ret;
663 }
664
665 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
666                         unsigned int cmd, unsigned long arg)
667 {
668         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
669         int ret = 0;
670
671         switch (cmd) {
672         case BLKROSET:
673                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
674                 break;
675         default:
676                 ret = -ENOTTY;
677         }
678
679         return ret;
680 }
681
682 #ifdef CONFIG_COMPAT
683 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
684                                 unsigned int cmd, unsigned long arg)
685 {
686         return rbd_ioctl(bdev, mode, cmd, arg);
687 }
688 #endif /* CONFIG_COMPAT */
689
690 static const struct block_device_operations rbd_bd_ops = {
691         .owner                  = THIS_MODULE,
692         .open                   = rbd_open,
693         .release                = rbd_release,
694         .ioctl                  = rbd_ioctl,
695 #ifdef CONFIG_COMPAT
696         .compat_ioctl           = rbd_compat_ioctl,
697 #endif
698 };
699
700 /*
701  * Initialize an rbd client instance.  Success or not, this function
702  * consumes ceph_opts.  Caller holds client_mutex.
703  */
704 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
705 {
706         struct rbd_client *rbdc;
707         int ret = -ENOMEM;
708
709         dout("%s:\n", __func__);
710         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
711         if (!rbdc)
712                 goto out_opt;
713
714         kref_init(&rbdc->kref);
715         INIT_LIST_HEAD(&rbdc->node);
716
717         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
718         if (IS_ERR(rbdc->client))
719                 goto out_rbdc;
720         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
721
722         ret = ceph_open_session(rbdc->client);
723         if (ret < 0)
724                 goto out_client;
725
726         spin_lock(&rbd_client_list_lock);
727         list_add_tail(&rbdc->node, &rbd_client_list);
728         spin_unlock(&rbd_client_list_lock);
729
730         dout("%s: rbdc %p\n", __func__, rbdc);
731
732         return rbdc;
733 out_client:
734         ceph_destroy_client(rbdc->client);
735 out_rbdc:
736         kfree(rbdc);
737 out_opt:
738         if (ceph_opts)
739                 ceph_destroy_options(ceph_opts);
740         dout("%s: error %d\n", __func__, ret);
741
742         return ERR_PTR(ret);
743 }
744
745 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
746 {
747         kref_get(&rbdc->kref);
748
749         return rbdc;
750 }
751
752 /*
753  * Find a ceph client with specific addr and configuration.  If
754  * found, bump its reference count.
755  */
756 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
757 {
758         struct rbd_client *client_node;
759         bool found = false;
760
761         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
762                 return NULL;
763
764         spin_lock(&rbd_client_list_lock);
765         list_for_each_entry(client_node, &rbd_client_list, node) {
766                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
767                         __rbd_get_client(client_node);
768
769                         found = true;
770                         break;
771                 }
772         }
773         spin_unlock(&rbd_client_list_lock);
774
775         return found ? client_node : NULL;
776 }
777
778 /*
779  * (Per device) rbd map options
780  */
781 enum {
782         Opt_queue_depth,
783         Opt_last_int,
784         /* int args above */
785         Opt_last_string,
786         /* string args above */
787         Opt_read_only,
788         Opt_read_write,
789         Opt_lock_on_read,
790         Opt_err
791 };
792
793 static match_table_t rbd_opts_tokens = {
794         {Opt_queue_depth, "queue_depth=%d"},
795         /* int args above */
796         /* string args above */
797         {Opt_read_only, "read_only"},
798         {Opt_read_only, "ro"},          /* Alternate spelling */
799         {Opt_read_write, "read_write"},
800         {Opt_read_write, "rw"},         /* Alternate spelling */
801         {Opt_lock_on_read, "lock_on_read"},
802         {Opt_err, NULL}
803 };
804
805 struct rbd_options {
806         int     queue_depth;
807         bool    read_only;
808         bool    lock_on_read;
809 };
810
811 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
812 #define RBD_READ_ONLY_DEFAULT   false
813 #define RBD_LOCK_ON_READ_DEFAULT false
814
815 static int parse_rbd_opts_token(char *c, void *private)
816 {
817         struct rbd_options *rbd_opts = private;
818         substring_t argstr[MAX_OPT_ARGS];
819         int token, intval, ret;
820
821         token = match_token(c, rbd_opts_tokens, argstr);
822         if (token < Opt_last_int) {
823                 ret = match_int(&argstr[0], &intval);
824                 if (ret < 0) {
825                         pr_err("bad mount option arg (not int) at '%s'\n", c);
826                         return ret;
827                 }
828                 dout("got int token %d val %d\n", token, intval);
829         } else if (token > Opt_last_int && token < Opt_last_string) {
830                 dout("got string token %d val %s\n", token, argstr[0].from);
831         } else {
832                 dout("got token %d\n", token);
833         }
834
835         switch (token) {
836         case Opt_queue_depth:
837                 if (intval < 1) {
838                         pr_err("queue_depth out of range\n");
839                         return -EINVAL;
840                 }
841                 rbd_opts->queue_depth = intval;
842                 break;
843         case Opt_read_only:
844                 rbd_opts->read_only = true;
845                 break;
846         case Opt_read_write:
847                 rbd_opts->read_only = false;
848                 break;
849         case Opt_lock_on_read:
850                 rbd_opts->lock_on_read = true;
851                 break;
852         default:
853                 /* libceph prints "bad option" msg */
854                 return -EINVAL;
855         }
856
857         return 0;
858 }
859
860 static char* obj_op_name(enum obj_operation_type op_type)
861 {
862         switch (op_type) {
863         case OBJ_OP_READ:
864                 return "read";
865         case OBJ_OP_WRITE:
866                 return "write";
867         case OBJ_OP_DISCARD:
868                 return "discard";
869         default:
870                 return "???";
871         }
872 }
873
874 /*
875  * Get a ceph client with specific addr and configuration, if one does
876  * not exist create it.  Either way, ceph_opts is consumed by this
877  * function.
878  */
879 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
880 {
881         struct rbd_client *rbdc;
882
883         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
884         rbdc = rbd_client_find(ceph_opts);
885         if (rbdc)       /* using an existing client */
886                 ceph_destroy_options(ceph_opts);
887         else
888                 rbdc = rbd_client_create(ceph_opts);
889         mutex_unlock(&client_mutex);
890
891         return rbdc;
892 }
893
894 /*
895  * Destroy ceph client
896  *
897  * Caller must hold rbd_client_list_lock.
898  */
899 static void rbd_client_release(struct kref *kref)
900 {
901         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
902
903         dout("%s: rbdc %p\n", __func__, rbdc);
904         spin_lock(&rbd_client_list_lock);
905         list_del(&rbdc->node);
906         spin_unlock(&rbd_client_list_lock);
907
908         ceph_destroy_client(rbdc->client);
909         kfree(rbdc);
910 }
911
912 /*
913  * Drop reference to ceph client node. If it's not referenced anymore, release
914  * it.
915  */
916 static void rbd_put_client(struct rbd_client *rbdc)
917 {
918         if (rbdc)
919                 kref_put(&rbdc->kref, rbd_client_release);
920 }
921
922 static bool rbd_image_format_valid(u32 image_format)
923 {
924         return image_format == 1 || image_format == 2;
925 }
926
927 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
928 {
929         size_t size;
930         u32 snap_count;
931
932         /* The header has to start with the magic rbd header text */
933         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
934                 return false;
935
936         /* The bio layer requires at least sector-sized I/O */
937
938         if (ondisk->options.order < SECTOR_SHIFT)
939                 return false;
940
941         /* If we use u64 in a few spots we may be able to loosen this */
942
943         if (ondisk->options.order > 8 * sizeof (int) - 1)
944                 return false;
945
946         /*
947          * The size of a snapshot header has to fit in a size_t, and
948          * that limits the number of snapshots.
949          */
950         snap_count = le32_to_cpu(ondisk->snap_count);
951         size = SIZE_MAX - sizeof (struct ceph_snap_context);
952         if (snap_count > size / sizeof (__le64))
953                 return false;
954
955         /*
956          * Not only that, but the size of the entire the snapshot
957          * header must also be representable in a size_t.
958          */
959         size -= snap_count * sizeof (__le64);
960         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
961                 return false;
962
963         return true;
964 }
965
966 /*
967  * Fill an rbd image header with information from the given format 1
968  * on-disk header.
969  */
970 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
971                                  struct rbd_image_header_ondisk *ondisk)
972 {
973         struct rbd_image_header *header = &rbd_dev->header;
974         bool first_time = header->object_prefix == NULL;
975         struct ceph_snap_context *snapc;
976         char *object_prefix = NULL;
977         char *snap_names = NULL;
978         u64 *snap_sizes = NULL;
979         u32 snap_count;
980         int ret = -ENOMEM;
981         u32 i;
982
983         /* Allocate this now to avoid having to handle failure below */
984
985         if (first_time) {
986                 size_t len;
987
988                 len = strnlen(ondisk->object_prefix,
989                                 sizeof (ondisk->object_prefix));
990                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
991                 if (!object_prefix)
992                         return -ENOMEM;
993                 memcpy(object_prefix, ondisk->object_prefix, len);
994                 object_prefix[len] = '\0';
995         }
996
997         /* Allocate the snapshot context and fill it in */
998
999         snap_count = le32_to_cpu(ondisk->snap_count);
1000         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1001         if (!snapc)
1002                 goto out_err;
1003         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1004         if (snap_count) {
1005                 struct rbd_image_snap_ondisk *snaps;
1006                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1007
1008                 /* We'll keep a copy of the snapshot names... */
1009
1010                 if (snap_names_len > (u64)SIZE_MAX)
1011                         goto out_2big;
1012                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1013                 if (!snap_names)
1014                         goto out_err;
1015
1016                 /* ...as well as the array of their sizes. */
1017                 snap_sizes = kmalloc_array(snap_count,
1018                                            sizeof(*header->snap_sizes),
1019                                            GFP_KERNEL);
1020                 if (!snap_sizes)
1021                         goto out_err;
1022
1023                 /*
1024                  * Copy the names, and fill in each snapshot's id
1025                  * and size.
1026                  *
1027                  * Note that rbd_dev_v1_header_info() guarantees the
1028                  * ondisk buffer we're working with has
1029                  * snap_names_len bytes beyond the end of the
1030                  * snapshot id array, this memcpy() is safe.
1031                  */
1032                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1033                 snaps = ondisk->snaps;
1034                 for (i = 0; i < snap_count; i++) {
1035                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1036                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1037                 }
1038         }
1039
1040         /* We won't fail any more, fill in the header */
1041
1042         if (first_time) {
1043                 header->object_prefix = object_prefix;
1044                 header->obj_order = ondisk->options.order;
1045                 header->crypt_type = ondisk->options.crypt_type;
1046                 header->comp_type = ondisk->options.comp_type;
1047                 /* The rest aren't used for format 1 images */
1048                 header->stripe_unit = 0;
1049                 header->stripe_count = 0;
1050                 header->features = 0;
1051         } else {
1052                 ceph_put_snap_context(header->snapc);
1053                 kfree(header->snap_names);
1054                 kfree(header->snap_sizes);
1055         }
1056
1057         /* The remaining fields always get updated (when we refresh) */
1058
1059         header->image_size = le64_to_cpu(ondisk->image_size);
1060         header->snapc = snapc;
1061         header->snap_names = snap_names;
1062         header->snap_sizes = snap_sizes;
1063
1064         return 0;
1065 out_2big:
1066         ret = -EIO;
1067 out_err:
1068         kfree(snap_sizes);
1069         kfree(snap_names);
1070         ceph_put_snap_context(snapc);
1071         kfree(object_prefix);
1072
1073         return ret;
1074 }
1075
1076 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1077 {
1078         const char *snap_name;
1079
1080         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1081
1082         /* Skip over names until we find the one we are looking for */
1083
1084         snap_name = rbd_dev->header.snap_names;
1085         while (which--)
1086                 snap_name += strlen(snap_name) + 1;
1087
1088         return kstrdup(snap_name, GFP_KERNEL);
1089 }
1090
1091 /*
1092  * Snapshot id comparison function for use with qsort()/bsearch().
1093  * Note that result is for snapshots in *descending* order.
1094  */
1095 static int snapid_compare_reverse(const void *s1, const void *s2)
1096 {
1097         u64 snap_id1 = *(u64 *)s1;
1098         u64 snap_id2 = *(u64 *)s2;
1099
1100         if (snap_id1 < snap_id2)
1101                 return 1;
1102         return snap_id1 == snap_id2 ? 0 : -1;
1103 }
1104
1105 /*
1106  * Search a snapshot context to see if the given snapshot id is
1107  * present.
1108  *
1109  * Returns the position of the snapshot id in the array if it's found,
1110  * or BAD_SNAP_INDEX otherwise.
1111  *
1112  * Note: The snapshot array is in kept sorted (by the osd) in
1113  * reverse order, highest snapshot id first.
1114  */
1115 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1116 {
1117         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1118         u64 *found;
1119
1120         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1121                                 sizeof (snap_id), snapid_compare_reverse);
1122
1123         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1124 }
1125
1126 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1127                                         u64 snap_id)
1128 {
1129         u32 which;
1130         const char *snap_name;
1131
1132         which = rbd_dev_snap_index(rbd_dev, snap_id);
1133         if (which == BAD_SNAP_INDEX)
1134                 return ERR_PTR(-ENOENT);
1135
1136         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1137         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1138 }
1139
1140 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1141 {
1142         if (snap_id == CEPH_NOSNAP)
1143                 return RBD_SNAP_HEAD_NAME;
1144
1145         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1146         if (rbd_dev->image_format == 1)
1147                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1148
1149         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1150 }
1151
1152 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1153                                 u64 *snap_size)
1154 {
1155         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1156         if (snap_id == CEPH_NOSNAP) {
1157                 *snap_size = rbd_dev->header.image_size;
1158         } else if (rbd_dev->image_format == 1) {
1159                 u32 which;
1160
1161                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1162                 if (which == BAD_SNAP_INDEX)
1163                         return -ENOENT;
1164
1165                 *snap_size = rbd_dev->header.snap_sizes[which];
1166         } else {
1167                 u64 size = 0;
1168                 int ret;
1169
1170                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1171                 if (ret)
1172                         return ret;
1173
1174                 *snap_size = size;
1175         }
1176         return 0;
1177 }
1178
1179 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1180                         u64 *snap_features)
1181 {
1182         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1183         if (snap_id == CEPH_NOSNAP) {
1184                 *snap_features = rbd_dev->header.features;
1185         } else if (rbd_dev->image_format == 1) {
1186                 *snap_features = 0;     /* No features for format 1 */
1187         } else {
1188                 u64 features = 0;
1189                 int ret;
1190
1191                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1192                 if (ret)
1193                         return ret;
1194
1195                 *snap_features = features;
1196         }
1197         return 0;
1198 }
1199
1200 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1201 {
1202         u64 snap_id = rbd_dev->spec->snap_id;
1203         u64 size = 0;
1204         u64 features = 0;
1205         int ret;
1206
1207         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1208         if (ret)
1209                 return ret;
1210         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1211         if (ret)
1212                 return ret;
1213
1214         rbd_dev->mapping.size = size;
1215         rbd_dev->mapping.features = features;
1216
1217         return 0;
1218 }
1219
1220 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1221 {
1222         rbd_dev->mapping.size = 0;
1223         rbd_dev->mapping.features = 0;
1224 }
1225
1226 static void rbd_segment_name_free(const char *name)
1227 {
1228         /* The explicit cast here is needed to drop the const qualifier */
1229
1230         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1231 }
1232
1233 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1234 {
1235         char *name;
1236         u64 segment;
1237         int ret;
1238         char *name_format;
1239
1240         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1241         if (!name)
1242                 return NULL;
1243         segment = offset >> rbd_dev->header.obj_order;
1244         name_format = "%s.%012llx";
1245         if (rbd_dev->image_format == 2)
1246                 name_format = "%s.%016llx";
1247         ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1248                         rbd_dev->header.object_prefix, segment);
1249         if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1250                 pr_err("error formatting segment name for #%llu (%d)\n",
1251                         segment, ret);
1252                 rbd_segment_name_free(name);
1253                 name = NULL;
1254         }
1255
1256         return name;
1257 }
1258
1259 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1260 {
1261         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1262
1263         return offset & (segment_size - 1);
1264 }
1265
1266 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1267                                 u64 offset, u64 length)
1268 {
1269         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1270
1271         offset &= segment_size - 1;
1272
1273         rbd_assert(length <= U64_MAX - offset);
1274         if (offset + length > segment_size)
1275                 length = segment_size - offset;
1276
1277         return length;
1278 }
1279
1280 /*
1281  * returns the size of an object in the image
1282  */
1283 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1284 {
1285         return 1 << header->obj_order;
1286 }
1287
1288 /*
1289  * bio helpers
1290  */
1291
1292 static void bio_chain_put(struct bio *chain)
1293 {
1294         struct bio *tmp;
1295
1296         while (chain) {
1297                 tmp = chain;
1298                 chain = chain->bi_next;
1299                 bio_put(tmp);
1300         }
1301 }
1302
1303 /*
1304  * zeros a bio chain, starting at specific offset
1305  */
1306 static void zero_bio_chain(struct bio *chain, int start_ofs)
1307 {
1308         struct bio_vec bv;
1309         struct bvec_iter iter;
1310         unsigned long flags;
1311         void *buf;
1312         int pos = 0;
1313
1314         while (chain) {
1315                 bio_for_each_segment(bv, chain, iter) {
1316                         if (pos + bv.bv_len > start_ofs) {
1317                                 int remainder = max(start_ofs - pos, 0);
1318                                 buf = bvec_kmap_irq(&bv, &flags);
1319                                 memset(buf + remainder, 0,
1320                                        bv.bv_len - remainder);
1321                                 flush_dcache_page(bv.bv_page);
1322                                 bvec_kunmap_irq(buf, &flags);
1323                         }
1324                         pos += bv.bv_len;
1325                 }
1326
1327                 chain = chain->bi_next;
1328         }
1329 }
1330
1331 /*
1332  * similar to zero_bio_chain(), zeros data defined by a page array,
1333  * starting at the given byte offset from the start of the array and
1334  * continuing up to the given end offset.  The pages array is
1335  * assumed to be big enough to hold all bytes up to the end.
1336  */
1337 static void zero_pages(struct page **pages, u64 offset, u64 end)
1338 {
1339         struct page **page = &pages[offset >> PAGE_SHIFT];
1340
1341         rbd_assert(end > offset);
1342         rbd_assert(end - offset <= (u64)SIZE_MAX);
1343         while (offset < end) {
1344                 size_t page_offset;
1345                 size_t length;
1346                 unsigned long flags;
1347                 void *kaddr;
1348
1349                 page_offset = offset & ~PAGE_MASK;
1350                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1351                 local_irq_save(flags);
1352                 kaddr = kmap_atomic(*page);
1353                 memset(kaddr + page_offset, 0, length);
1354                 flush_dcache_page(*page);
1355                 kunmap_atomic(kaddr);
1356                 local_irq_restore(flags);
1357
1358                 offset += length;
1359                 page++;
1360         }
1361 }
1362
1363 /*
1364  * Clone a portion of a bio, starting at the given byte offset
1365  * and continuing for the number of bytes indicated.
1366  */
1367 static struct bio *bio_clone_range(struct bio *bio_src,
1368                                         unsigned int offset,
1369                                         unsigned int len,
1370                                         gfp_t gfpmask)
1371 {
1372         struct bio *bio;
1373
1374         bio = bio_clone(bio_src, gfpmask);
1375         if (!bio)
1376                 return NULL;    /* ENOMEM */
1377
1378         bio_advance(bio, offset);
1379         bio->bi_iter.bi_size = len;
1380
1381         return bio;
1382 }
1383
1384 /*
1385  * Clone a portion of a bio chain, starting at the given byte offset
1386  * into the first bio in the source chain and continuing for the
1387  * number of bytes indicated.  The result is another bio chain of
1388  * exactly the given length, or a null pointer on error.
1389  *
1390  * The bio_src and offset parameters are both in-out.  On entry they
1391  * refer to the first source bio and the offset into that bio where
1392  * the start of data to be cloned is located.
1393  *
1394  * On return, bio_src is updated to refer to the bio in the source
1395  * chain that contains first un-cloned byte, and *offset will
1396  * contain the offset of that byte within that bio.
1397  */
1398 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1399                                         unsigned int *offset,
1400                                         unsigned int len,
1401                                         gfp_t gfpmask)
1402 {
1403         struct bio *bi = *bio_src;
1404         unsigned int off = *offset;
1405         struct bio *chain = NULL;
1406         struct bio **end;
1407
1408         /* Build up a chain of clone bios up to the limit */
1409
1410         if (!bi || off >= bi->bi_iter.bi_size || !len)
1411                 return NULL;            /* Nothing to clone */
1412
1413         end = &chain;
1414         while (len) {
1415                 unsigned int bi_size;
1416                 struct bio *bio;
1417
1418                 if (!bi) {
1419                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1420                         goto out_err;   /* EINVAL; ran out of bio's */
1421                 }
1422                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1423                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1424                 if (!bio)
1425                         goto out_err;   /* ENOMEM */
1426
1427                 *end = bio;
1428                 end = &bio->bi_next;
1429
1430                 off += bi_size;
1431                 if (off == bi->bi_iter.bi_size) {
1432                         bi = bi->bi_next;
1433                         off = 0;
1434                 }
1435                 len -= bi_size;
1436         }
1437         *bio_src = bi;
1438         *offset = off;
1439
1440         return chain;
1441 out_err:
1442         bio_chain_put(chain);
1443
1444         return NULL;
1445 }
1446
1447 /*
1448  * The default/initial value for all object request flags is 0.  For
1449  * each flag, once its value is set to 1 it is never reset to 0
1450  * again.
1451  */
1452 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1453 {
1454         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1455                 struct rbd_device *rbd_dev;
1456
1457                 rbd_dev = obj_request->img_request->rbd_dev;
1458                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1459                         obj_request);
1460         }
1461 }
1462
1463 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1464 {
1465         smp_mb();
1466         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1467 }
1468
1469 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1470 {
1471         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1472                 struct rbd_device *rbd_dev = NULL;
1473
1474                 if (obj_request_img_data_test(obj_request))
1475                         rbd_dev = obj_request->img_request->rbd_dev;
1476                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1477                         obj_request);
1478         }
1479 }
1480
1481 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1482 {
1483         smp_mb();
1484         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1485 }
1486
1487 /*
1488  * This sets the KNOWN flag after (possibly) setting the EXISTS
1489  * flag.  The latter is set based on the "exists" value provided.
1490  *
1491  * Note that for our purposes once an object exists it never goes
1492  * away again.  It's possible that the response from two existence
1493  * checks are separated by the creation of the target object, and
1494  * the first ("doesn't exist") response arrives *after* the second
1495  * ("does exist").  In that case we ignore the second one.
1496  */
1497 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1498                                 bool exists)
1499 {
1500         if (exists)
1501                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1502         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1503         smp_mb();
1504 }
1505
1506 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1507 {
1508         smp_mb();
1509         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1510 }
1511
1512 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1513 {
1514         smp_mb();
1515         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1516 }
1517
1518 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1519 {
1520         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1521
1522         return obj_request->img_offset <
1523             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1524 }
1525
1526 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1527 {
1528         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1529                 atomic_read(&obj_request->kref.refcount));
1530         kref_get(&obj_request->kref);
1531 }
1532
1533 static void rbd_obj_request_destroy(struct kref *kref);
1534 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1535 {
1536         rbd_assert(obj_request != NULL);
1537         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1538                 atomic_read(&obj_request->kref.refcount));
1539         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1540 }
1541
1542 static void rbd_img_request_get(struct rbd_img_request *img_request)
1543 {
1544         dout("%s: img %p (was %d)\n", __func__, img_request,
1545              atomic_read(&img_request->kref.refcount));
1546         kref_get(&img_request->kref);
1547 }
1548
1549 static bool img_request_child_test(struct rbd_img_request *img_request);
1550 static void rbd_parent_request_destroy(struct kref *kref);
1551 static void rbd_img_request_destroy(struct kref *kref);
1552 static void rbd_img_request_put(struct rbd_img_request *img_request)
1553 {
1554         rbd_assert(img_request != NULL);
1555         dout("%s: img %p (was %d)\n", __func__, img_request,
1556                 atomic_read(&img_request->kref.refcount));
1557         if (img_request_child_test(img_request))
1558                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1559         else
1560                 kref_put(&img_request->kref, rbd_img_request_destroy);
1561 }
1562
1563 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1564                                         struct rbd_obj_request *obj_request)
1565 {
1566         rbd_assert(obj_request->img_request == NULL);
1567
1568         /* Image request now owns object's original reference */
1569         obj_request->img_request = img_request;
1570         obj_request->which = img_request->obj_request_count;
1571         rbd_assert(!obj_request_img_data_test(obj_request));
1572         obj_request_img_data_set(obj_request);
1573         rbd_assert(obj_request->which != BAD_WHICH);
1574         img_request->obj_request_count++;
1575         list_add_tail(&obj_request->links, &img_request->obj_requests);
1576         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1577                 obj_request->which);
1578 }
1579
1580 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1581                                         struct rbd_obj_request *obj_request)
1582 {
1583         rbd_assert(obj_request->which != BAD_WHICH);
1584
1585         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1586                 obj_request->which);
1587         list_del(&obj_request->links);
1588         rbd_assert(img_request->obj_request_count > 0);
1589         img_request->obj_request_count--;
1590         rbd_assert(obj_request->which == img_request->obj_request_count);
1591         obj_request->which = BAD_WHICH;
1592         rbd_assert(obj_request_img_data_test(obj_request));
1593         rbd_assert(obj_request->img_request == img_request);
1594         obj_request->img_request = NULL;
1595         obj_request->callback = NULL;
1596         rbd_obj_request_put(obj_request);
1597 }
1598
1599 static bool obj_request_type_valid(enum obj_request_type type)
1600 {
1601         switch (type) {
1602         case OBJ_REQUEST_NODATA:
1603         case OBJ_REQUEST_BIO:
1604         case OBJ_REQUEST_PAGES:
1605                 return true;
1606         default:
1607                 return false;
1608         }
1609 }
1610
1611 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1612
1613 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1614 {
1615         struct ceph_osd_request *osd_req = obj_request->osd_req;
1616
1617         dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
1618         if (obj_request_img_data_test(obj_request)) {
1619                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1620                 rbd_img_request_get(obj_request->img_request);
1621         }
1622         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1623 }
1624
1625 static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1626 {
1627         dout("%s %p\n", __func__, obj_request);
1628         ceph_osdc_cancel_request(obj_request->osd_req);
1629 }
1630
1631 /*
1632  * Wait for an object request to complete.  If interrupted, cancel the
1633  * underlying osd request.
1634  *
1635  * @timeout: in jiffies, 0 means "wait forever"
1636  */
1637 static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1638                                   unsigned long timeout)
1639 {
1640         long ret;
1641
1642         dout("%s %p\n", __func__, obj_request);
1643         ret = wait_for_completion_interruptible_timeout(
1644                                         &obj_request->completion,
1645                                         ceph_timeout_jiffies(timeout));
1646         if (ret <= 0) {
1647                 if (ret == 0)
1648                         ret = -ETIMEDOUT;
1649                 rbd_obj_request_end(obj_request);
1650         } else {
1651                 ret = 0;
1652         }
1653
1654         dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1655         return ret;
1656 }
1657
1658 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1659 {
1660         return __rbd_obj_request_wait(obj_request, 0);
1661 }
1662
1663 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1664 {
1665
1666         dout("%s: img %p\n", __func__, img_request);
1667
1668         /*
1669          * If no error occurred, compute the aggregate transfer
1670          * count for the image request.  We could instead use
1671          * atomic64_cmpxchg() to update it as each object request
1672          * completes; not clear which way is better off hand.
1673          */
1674         if (!img_request->result) {
1675                 struct rbd_obj_request *obj_request;
1676                 u64 xferred = 0;
1677
1678                 for_each_obj_request(img_request, obj_request)
1679                         xferred += obj_request->xferred;
1680                 img_request->xferred = xferred;
1681         }
1682
1683         if (img_request->callback)
1684                 img_request->callback(img_request);
1685         else
1686                 rbd_img_request_put(img_request);
1687 }
1688
1689 /*
1690  * The default/initial value for all image request flags is 0.  Each
1691  * is conditionally set to 1 at image request initialization time
1692  * and currently never change thereafter.
1693  */
1694 static void img_request_write_set(struct rbd_img_request *img_request)
1695 {
1696         set_bit(IMG_REQ_WRITE, &img_request->flags);
1697         smp_mb();
1698 }
1699
1700 static bool img_request_write_test(struct rbd_img_request *img_request)
1701 {
1702         smp_mb();
1703         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1704 }
1705
1706 /*
1707  * Set the discard flag when the img_request is an discard request
1708  */
1709 static void img_request_discard_set(struct rbd_img_request *img_request)
1710 {
1711         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1712         smp_mb();
1713 }
1714
1715 static bool img_request_discard_test(struct rbd_img_request *img_request)
1716 {
1717         smp_mb();
1718         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1719 }
1720
1721 static void img_request_child_set(struct rbd_img_request *img_request)
1722 {
1723         set_bit(IMG_REQ_CHILD, &img_request->flags);
1724         smp_mb();
1725 }
1726
1727 static void img_request_child_clear(struct rbd_img_request *img_request)
1728 {
1729         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1730         smp_mb();
1731 }
1732
1733 static bool img_request_child_test(struct rbd_img_request *img_request)
1734 {
1735         smp_mb();
1736         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1737 }
1738
1739 static void img_request_layered_set(struct rbd_img_request *img_request)
1740 {
1741         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1742         smp_mb();
1743 }
1744
1745 static void img_request_layered_clear(struct rbd_img_request *img_request)
1746 {
1747         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1748         smp_mb();
1749 }
1750
1751 static bool img_request_layered_test(struct rbd_img_request *img_request)
1752 {
1753         smp_mb();
1754         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1755 }
1756
1757 static enum obj_operation_type
1758 rbd_img_request_op_type(struct rbd_img_request *img_request)
1759 {
1760         if (img_request_write_test(img_request))
1761                 return OBJ_OP_WRITE;
1762         else if (img_request_discard_test(img_request))
1763                 return OBJ_OP_DISCARD;
1764         else
1765                 return OBJ_OP_READ;
1766 }
1767
1768 static void
1769 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1770 {
1771         u64 xferred = obj_request->xferred;
1772         u64 length = obj_request->length;
1773
1774         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1775                 obj_request, obj_request->img_request, obj_request->result,
1776                 xferred, length);
1777         /*
1778          * ENOENT means a hole in the image.  We zero-fill the entire
1779          * length of the request.  A short read also implies zero-fill
1780          * to the end of the request.  An error requires the whole
1781          * length of the request to be reported finished with an error
1782          * to the block layer.  In each case we update the xferred
1783          * count to indicate the whole request was satisfied.
1784          */
1785         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1786         if (obj_request->result == -ENOENT) {
1787                 if (obj_request->type == OBJ_REQUEST_BIO)
1788                         zero_bio_chain(obj_request->bio_list, 0);
1789                 else
1790                         zero_pages(obj_request->pages, 0, length);
1791                 obj_request->result = 0;
1792         } else if (xferred < length && !obj_request->result) {
1793                 if (obj_request->type == OBJ_REQUEST_BIO)
1794                         zero_bio_chain(obj_request->bio_list, xferred);
1795                 else
1796                         zero_pages(obj_request->pages, xferred, length);
1797         }
1798         obj_request->xferred = length;
1799         obj_request_done_set(obj_request);
1800 }
1801
1802 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1803 {
1804         dout("%s: obj %p cb %p\n", __func__, obj_request,
1805                 obj_request->callback);
1806         if (obj_request->callback)
1807                 obj_request->callback(obj_request);
1808         else
1809                 complete_all(&obj_request->completion);
1810 }
1811
1812 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1813 {
1814         obj_request->result = err;
1815         obj_request->xferred = 0;
1816         /*
1817          * kludge - mirror rbd_obj_request_submit() to match a put in
1818          * rbd_img_obj_callback()
1819          */
1820         if (obj_request_img_data_test(obj_request)) {
1821                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1822                 rbd_img_request_get(obj_request->img_request);
1823         }
1824         obj_request_done_set(obj_request);
1825         rbd_obj_request_complete(obj_request);
1826 }
1827
1828 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1829 {
1830         struct rbd_img_request *img_request = NULL;
1831         struct rbd_device *rbd_dev = NULL;
1832         bool layered = false;
1833
1834         if (obj_request_img_data_test(obj_request)) {
1835                 img_request = obj_request->img_request;
1836                 layered = img_request && img_request_layered_test(img_request);
1837                 rbd_dev = img_request->rbd_dev;
1838         }
1839
1840         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1841                 obj_request, img_request, obj_request->result,
1842                 obj_request->xferred, obj_request->length);
1843         if (layered && obj_request->result == -ENOENT &&
1844                         obj_request->img_offset < rbd_dev->parent_overlap)
1845                 rbd_img_parent_read(obj_request);
1846         else if (img_request)
1847                 rbd_img_obj_request_read_callback(obj_request);
1848         else
1849                 obj_request_done_set(obj_request);
1850 }
1851
1852 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1853 {
1854         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1855                 obj_request->result, obj_request->length);
1856         /*
1857          * There is no such thing as a successful short write.  Set
1858          * it to our originally-requested length.
1859          */
1860         obj_request->xferred = obj_request->length;
1861         obj_request_done_set(obj_request);
1862 }
1863
1864 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1865 {
1866         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1867                 obj_request->result, obj_request->length);
1868         /*
1869          * There is no such thing as a successful short discard.  Set
1870          * it to our originally-requested length.
1871          */
1872         obj_request->xferred = obj_request->length;
1873         /* discarding a non-existent object is not a problem */
1874         if (obj_request->result == -ENOENT)
1875                 obj_request->result = 0;
1876         obj_request_done_set(obj_request);
1877 }
1878
1879 /*
1880  * For a simple stat call there's nothing to do.  We'll do more if
1881  * this is part of a write sequence for a layered image.
1882  */
1883 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1884 {
1885         dout("%s: obj %p\n", __func__, obj_request);
1886         obj_request_done_set(obj_request);
1887 }
1888
1889 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1890 {
1891         dout("%s: obj %p\n", __func__, obj_request);
1892
1893         if (obj_request_img_data_test(obj_request))
1894                 rbd_osd_copyup_callback(obj_request);
1895         else
1896                 obj_request_done_set(obj_request);
1897 }
1898
1899 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1900 {
1901         struct rbd_obj_request *obj_request = osd_req->r_priv;
1902         u16 opcode;
1903
1904         dout("%s: osd_req %p\n", __func__, osd_req);
1905         rbd_assert(osd_req == obj_request->osd_req);
1906         if (obj_request_img_data_test(obj_request)) {
1907                 rbd_assert(obj_request->img_request);
1908                 rbd_assert(obj_request->which != BAD_WHICH);
1909         } else {
1910                 rbd_assert(obj_request->which == BAD_WHICH);
1911         }
1912
1913         if (osd_req->r_result < 0)
1914                 obj_request->result = osd_req->r_result;
1915
1916         /*
1917          * We support a 64-bit length, but ultimately it has to be
1918          * passed to the block layer, which just supports a 32-bit
1919          * length field.
1920          */
1921         obj_request->xferred = osd_req->r_ops[0].outdata_len;
1922         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1923
1924         opcode = osd_req->r_ops[0].op;
1925         switch (opcode) {
1926         case CEPH_OSD_OP_READ:
1927                 rbd_osd_read_callback(obj_request);
1928                 break;
1929         case CEPH_OSD_OP_SETALLOCHINT:
1930                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1931                            osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1932                 /* fall through */
1933         case CEPH_OSD_OP_WRITE:
1934         case CEPH_OSD_OP_WRITEFULL:
1935                 rbd_osd_write_callback(obj_request);
1936                 break;
1937         case CEPH_OSD_OP_STAT:
1938                 rbd_osd_stat_callback(obj_request);
1939                 break;
1940         case CEPH_OSD_OP_DELETE:
1941         case CEPH_OSD_OP_TRUNCATE:
1942         case CEPH_OSD_OP_ZERO:
1943                 rbd_osd_discard_callback(obj_request);
1944                 break;
1945         case CEPH_OSD_OP_CALL:
1946                 rbd_osd_call_callback(obj_request);
1947                 break;
1948         default:
1949                 rbd_warn(NULL, "%s: unsupported op %hu",
1950                         obj_request->object_name, (unsigned short) opcode);
1951                 break;
1952         }
1953
1954         if (obj_request_done_test(obj_request))
1955                 rbd_obj_request_complete(obj_request);
1956 }
1957
1958 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1959 {
1960         struct ceph_osd_request *osd_req = obj_request->osd_req;
1961
1962         rbd_assert(obj_request_img_data_test(obj_request));
1963         osd_req->r_snapid = obj_request->img_request->snap_id;
1964 }
1965
1966 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1967 {
1968         struct ceph_osd_request *osd_req = obj_request->osd_req;
1969
1970         osd_req->r_mtime = CURRENT_TIME;
1971         osd_req->r_data_offset = obj_request->offset;
1972 }
1973
1974 /*
1975  * Create an osd request.  A read request has one osd op (read).
1976  * A write request has either one (watch) or two (hint+write) osd ops.
1977  * (All rbd data writes are prefixed with an allocation hint op, but
1978  * technically osd watch is a write request, hence this distinction.)
1979  */
1980 static struct ceph_osd_request *rbd_osd_req_create(
1981                                         struct rbd_device *rbd_dev,
1982                                         enum obj_operation_type op_type,
1983                                         unsigned int num_ops,
1984                                         struct rbd_obj_request *obj_request)
1985 {
1986         struct ceph_snap_context *snapc = NULL;
1987         struct ceph_osd_client *osdc;
1988         struct ceph_osd_request *osd_req;
1989
1990         if (obj_request_img_data_test(obj_request) &&
1991                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1992                 struct rbd_img_request *img_request = obj_request->img_request;
1993                 if (op_type == OBJ_OP_WRITE) {
1994                         rbd_assert(img_request_write_test(img_request));
1995                 } else {
1996                         rbd_assert(img_request_discard_test(img_request));
1997                 }
1998                 snapc = img_request->snapc;
1999         }
2000
2001         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
2002
2003         /* Allocate and initialize the request, for the num_ops ops */
2004
2005         osdc = &rbd_dev->rbd_client->client->osdc;
2006         osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2007                                           GFP_NOIO);
2008         if (!osd_req)
2009                 goto fail;
2010
2011         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2012                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2013         else
2014                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
2015
2016         osd_req->r_callback = rbd_osd_req_callback;
2017         osd_req->r_priv = obj_request;
2018
2019         osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
2020         if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2021                              obj_request->object_name))
2022                 goto fail;
2023
2024         if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2025                 goto fail;
2026
2027         return osd_req;
2028
2029 fail:
2030         ceph_osdc_put_request(osd_req);
2031         return NULL;
2032 }
2033
2034 /*
2035  * Create a copyup osd request based on the information in the object
2036  * request supplied.  A copyup request has two or three osd ops, a
2037  * copyup method call, potentially a hint op, and a write or truncate
2038  * or zero op.
2039  */
2040 static struct ceph_osd_request *
2041 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2042 {
2043         struct rbd_img_request *img_request;
2044         struct ceph_snap_context *snapc;
2045         struct rbd_device *rbd_dev;
2046         struct ceph_osd_client *osdc;
2047         struct ceph_osd_request *osd_req;
2048         int num_osd_ops = 3;
2049
2050         rbd_assert(obj_request_img_data_test(obj_request));
2051         img_request = obj_request->img_request;
2052         rbd_assert(img_request);
2053         rbd_assert(img_request_write_test(img_request) ||
2054                         img_request_discard_test(img_request));
2055
2056         if (img_request_discard_test(img_request))
2057                 num_osd_ops = 2;
2058
2059         /* Allocate and initialize the request, for all the ops */
2060
2061         snapc = img_request->snapc;
2062         rbd_dev = img_request->rbd_dev;
2063         osdc = &rbd_dev->rbd_client->client->osdc;
2064         osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2065                                                 false, GFP_NOIO);
2066         if (!osd_req)
2067                 goto fail;
2068
2069         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2070         osd_req->r_callback = rbd_osd_req_callback;
2071         osd_req->r_priv = obj_request;
2072
2073         osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
2074         if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2075                              obj_request->object_name))
2076                 goto fail;
2077
2078         if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2079                 goto fail;
2080
2081         return osd_req;
2082
2083 fail:
2084         ceph_osdc_put_request(osd_req);
2085         return NULL;
2086 }
2087
2088
2089 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2090 {
2091         ceph_osdc_put_request(osd_req);
2092 }
2093
2094 /* object_name is assumed to be a non-null pointer and NUL-terminated */
2095
2096 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2097                                                 u64 offset, u64 length,
2098                                                 enum obj_request_type type)
2099 {
2100         struct rbd_obj_request *obj_request;
2101         size_t size;
2102         char *name;
2103
2104         rbd_assert(obj_request_type_valid(type));
2105
2106         size = strlen(object_name) + 1;
2107         name = kmalloc(size, GFP_NOIO);
2108         if (!name)
2109                 return NULL;
2110
2111         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2112         if (!obj_request) {
2113                 kfree(name);
2114                 return NULL;
2115         }
2116
2117         obj_request->object_name = memcpy(name, object_name, size);
2118         obj_request->offset = offset;
2119         obj_request->length = length;
2120         obj_request->flags = 0;
2121         obj_request->which = BAD_WHICH;
2122         obj_request->type = type;
2123         INIT_LIST_HEAD(&obj_request->links);
2124         init_completion(&obj_request->completion);
2125         kref_init(&obj_request->kref);
2126
2127         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2128                 offset, length, (int)type, obj_request);
2129
2130         return obj_request;
2131 }
2132
2133 static void rbd_obj_request_destroy(struct kref *kref)
2134 {
2135         struct rbd_obj_request *obj_request;
2136
2137         obj_request = container_of(kref, struct rbd_obj_request, kref);
2138
2139         dout("%s: obj %p\n", __func__, obj_request);
2140
2141         rbd_assert(obj_request->img_request == NULL);
2142         rbd_assert(obj_request->which == BAD_WHICH);
2143
2144         if (obj_request->osd_req)
2145                 rbd_osd_req_destroy(obj_request->osd_req);
2146
2147         rbd_assert(obj_request_type_valid(obj_request->type));
2148         switch (obj_request->type) {
2149         case OBJ_REQUEST_NODATA:
2150                 break;          /* Nothing to do */
2151         case OBJ_REQUEST_BIO:
2152                 if (obj_request->bio_list)
2153                         bio_chain_put(obj_request->bio_list);
2154                 break;
2155         case OBJ_REQUEST_PAGES:
2156                 /* img_data requests don't own their page array */
2157                 if (obj_request->pages &&
2158                     !obj_request_img_data_test(obj_request))
2159                         ceph_release_page_vector(obj_request->pages,
2160                                                 obj_request->page_count);
2161                 break;
2162         }
2163
2164         kfree(obj_request->object_name);
2165         obj_request->object_name = NULL;
2166         kmem_cache_free(rbd_obj_request_cache, obj_request);
2167 }
2168
2169 /* It's OK to call this for a device with no parent */
2170
2171 static void rbd_spec_put(struct rbd_spec *spec);
2172 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2173 {
2174         rbd_dev_remove_parent(rbd_dev);
2175         rbd_spec_put(rbd_dev->parent_spec);
2176         rbd_dev->parent_spec = NULL;
2177         rbd_dev->parent_overlap = 0;
2178 }
2179
2180 /*
2181  * Parent image reference counting is used to determine when an
2182  * image's parent fields can be safely torn down--after there are no
2183  * more in-flight requests to the parent image.  When the last
2184  * reference is dropped, cleaning them up is safe.
2185  */
2186 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2187 {
2188         int counter;
2189
2190         if (!rbd_dev->parent_spec)
2191                 return;
2192
2193         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2194         if (counter > 0)
2195                 return;
2196
2197         /* Last reference; clean up parent data structures */
2198
2199         if (!counter)
2200                 rbd_dev_unparent(rbd_dev);
2201         else
2202                 rbd_warn(rbd_dev, "parent reference underflow");
2203 }
2204
2205 /*
2206  * If an image has a non-zero parent overlap, get a reference to its
2207  * parent.
2208  *
2209  * Returns true if the rbd device has a parent with a non-zero
2210  * overlap and a reference for it was successfully taken, or
2211  * false otherwise.
2212  */
2213 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2214 {
2215         int counter = 0;
2216
2217         if (!rbd_dev->parent_spec)
2218                 return false;
2219
2220         down_read(&rbd_dev->header_rwsem);
2221         if (rbd_dev->parent_overlap)
2222                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2223         up_read(&rbd_dev->header_rwsem);
2224
2225         if (counter < 0)
2226                 rbd_warn(rbd_dev, "parent reference overflow");
2227
2228         return counter > 0;
2229 }
2230
2231 /*
2232  * Caller is responsible for filling in the list of object requests
2233  * that comprises the image request, and the Linux request pointer
2234  * (if there is one).
2235  */
2236 static struct rbd_img_request *rbd_img_request_create(
2237                                         struct rbd_device *rbd_dev,
2238                                         u64 offset, u64 length,
2239                                         enum obj_operation_type op_type,
2240                                         struct ceph_snap_context *snapc)
2241 {
2242         struct rbd_img_request *img_request;
2243
2244         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2245         if (!img_request)
2246                 return NULL;
2247
2248         img_request->rq = NULL;
2249         img_request->rbd_dev = rbd_dev;
2250         img_request->offset = offset;
2251         img_request->length = length;
2252         img_request->flags = 0;
2253         if (op_type == OBJ_OP_DISCARD) {
2254                 img_request_discard_set(img_request);
2255                 img_request->snapc = snapc;
2256         } else if (op_type == OBJ_OP_WRITE) {
2257                 img_request_write_set(img_request);
2258                 img_request->snapc = snapc;
2259         } else {
2260                 img_request->snap_id = rbd_dev->spec->snap_id;
2261         }
2262         if (rbd_dev_parent_get(rbd_dev))
2263                 img_request_layered_set(img_request);
2264         spin_lock_init(&img_request->completion_lock);
2265         img_request->next_completion = 0;
2266         img_request->callback = NULL;
2267         img_request->result = 0;
2268         img_request->obj_request_count = 0;
2269         INIT_LIST_HEAD(&img_request->obj_requests);
2270         kref_init(&img_request->kref);
2271
2272         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2273                 obj_op_name(op_type), offset, length, img_request);
2274
2275         return img_request;
2276 }
2277
2278 static void rbd_img_request_destroy(struct kref *kref)
2279 {
2280         struct rbd_img_request *img_request;
2281         struct rbd_obj_request *obj_request;
2282         struct rbd_obj_request *next_obj_request;
2283
2284         img_request = container_of(kref, struct rbd_img_request, kref);
2285
2286         dout("%s: img %p\n", __func__, img_request);
2287
2288         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2289                 rbd_img_obj_request_del(img_request, obj_request);
2290         rbd_assert(img_request->obj_request_count == 0);
2291
2292         if (img_request_layered_test(img_request)) {
2293                 img_request_layered_clear(img_request);
2294                 rbd_dev_parent_put(img_request->rbd_dev);
2295         }
2296
2297         if (img_request_write_test(img_request) ||
2298                 img_request_discard_test(img_request))
2299                 ceph_put_snap_context(img_request->snapc);
2300
2301         kmem_cache_free(rbd_img_request_cache, img_request);
2302 }
2303
2304 static struct rbd_img_request *rbd_parent_request_create(
2305                                         struct rbd_obj_request *obj_request,
2306                                         u64 img_offset, u64 length)
2307 {
2308         struct rbd_img_request *parent_request;
2309         struct rbd_device *rbd_dev;
2310
2311         rbd_assert(obj_request->img_request);
2312         rbd_dev = obj_request->img_request->rbd_dev;
2313
2314         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2315                                                 length, OBJ_OP_READ, NULL);
2316         if (!parent_request)
2317                 return NULL;
2318
2319         img_request_child_set(parent_request);
2320         rbd_obj_request_get(obj_request);
2321         parent_request->obj_request = obj_request;
2322
2323         return parent_request;
2324 }
2325
2326 static void rbd_parent_request_destroy(struct kref *kref)
2327 {
2328         struct rbd_img_request *parent_request;
2329         struct rbd_obj_request *orig_request;
2330
2331         parent_request = container_of(kref, struct rbd_img_request, kref);
2332         orig_request = parent_request->obj_request;
2333
2334         parent_request->obj_request = NULL;
2335         rbd_obj_request_put(orig_request);
2336         img_request_child_clear(parent_request);
2337
2338         rbd_img_request_destroy(kref);
2339 }
2340
2341 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2342 {
2343         struct rbd_img_request *img_request;
2344         unsigned int xferred;
2345         int result;
2346         bool more;
2347
2348         rbd_assert(obj_request_img_data_test(obj_request));
2349         img_request = obj_request->img_request;
2350
2351         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2352         xferred = (unsigned int)obj_request->xferred;
2353         result = obj_request->result;
2354         if (result) {
2355                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2356                 enum obj_operation_type op_type;
2357
2358                 if (img_request_discard_test(img_request))
2359                         op_type = OBJ_OP_DISCARD;
2360                 else if (img_request_write_test(img_request))
2361                         op_type = OBJ_OP_WRITE;
2362                 else
2363                         op_type = OBJ_OP_READ;
2364
2365                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2366                         obj_op_name(op_type), obj_request->length,
2367                         obj_request->img_offset, obj_request->offset);
2368                 rbd_warn(rbd_dev, "  result %d xferred %x",
2369                         result, xferred);
2370                 if (!img_request->result)
2371                         img_request->result = result;
2372                 /*
2373                  * Need to end I/O on the entire obj_request worth of
2374                  * bytes in case of error.
2375                  */
2376                 xferred = obj_request->length;
2377         }
2378
2379         if (img_request_child_test(img_request)) {
2380                 rbd_assert(img_request->obj_request != NULL);
2381                 more = obj_request->which < img_request->obj_request_count - 1;
2382         } else {
2383                 rbd_assert(img_request->rq != NULL);
2384
2385                 more = blk_update_request(img_request->rq, result, xferred);
2386                 if (!more)
2387                         __blk_mq_end_request(img_request->rq, result);
2388         }
2389
2390         return more;
2391 }
2392
2393 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2394 {
2395         struct rbd_img_request *img_request;
2396         u32 which = obj_request->which;
2397         bool more = true;
2398
2399         rbd_assert(obj_request_img_data_test(obj_request));
2400         img_request = obj_request->img_request;
2401
2402         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2403         rbd_assert(img_request != NULL);
2404         rbd_assert(img_request->obj_request_count > 0);
2405         rbd_assert(which != BAD_WHICH);
2406         rbd_assert(which < img_request->obj_request_count);
2407
2408         spin_lock_irq(&img_request->completion_lock);
2409         if (which != img_request->next_completion)
2410                 goto out;
2411
2412         for_each_obj_request_from(img_request, obj_request) {
2413                 rbd_assert(more);
2414                 rbd_assert(which < img_request->obj_request_count);
2415
2416                 if (!obj_request_done_test(obj_request))
2417                         break;
2418                 more = rbd_img_obj_end_request(obj_request);
2419                 which++;
2420         }
2421
2422         rbd_assert(more ^ (which == img_request->obj_request_count));
2423         img_request->next_completion = which;
2424 out:
2425         spin_unlock_irq(&img_request->completion_lock);
2426         rbd_img_request_put(img_request);
2427
2428         if (!more)
2429                 rbd_img_request_complete(img_request);
2430 }
2431
2432 /*
2433  * Add individual osd ops to the given ceph_osd_request and prepare
2434  * them for submission. num_ops is the current number of
2435  * osd operations already to the object request.
2436  */
2437 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2438                                 struct ceph_osd_request *osd_request,
2439                                 enum obj_operation_type op_type,
2440                                 unsigned int num_ops)
2441 {
2442         struct rbd_img_request *img_request = obj_request->img_request;
2443         struct rbd_device *rbd_dev = img_request->rbd_dev;
2444         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2445         u64 offset = obj_request->offset;
2446         u64 length = obj_request->length;
2447         u64 img_end;
2448         u16 opcode;
2449
2450         if (op_type == OBJ_OP_DISCARD) {
2451                 if (!offset && length == object_size &&
2452                     (!img_request_layered_test(img_request) ||
2453                      !obj_request_overlaps_parent(obj_request))) {
2454                         opcode = CEPH_OSD_OP_DELETE;
2455                 } else if ((offset + length == object_size)) {
2456                         opcode = CEPH_OSD_OP_TRUNCATE;
2457                 } else {
2458                         down_read(&rbd_dev->header_rwsem);
2459                         img_end = rbd_dev->header.image_size;
2460                         up_read(&rbd_dev->header_rwsem);
2461
2462                         if (obj_request->img_offset + length == img_end)
2463                                 opcode = CEPH_OSD_OP_TRUNCATE;
2464                         else
2465                                 opcode = CEPH_OSD_OP_ZERO;
2466                 }
2467         } else if (op_type == OBJ_OP_WRITE) {
2468                 if (!offset && length == object_size)
2469                         opcode = CEPH_OSD_OP_WRITEFULL;
2470                 else
2471                         opcode = CEPH_OSD_OP_WRITE;
2472                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2473                                         object_size, object_size);
2474                 num_ops++;
2475         } else {
2476                 opcode = CEPH_OSD_OP_READ;
2477         }
2478
2479         if (opcode == CEPH_OSD_OP_DELETE)
2480                 osd_req_op_init(osd_request, num_ops, opcode, 0);
2481         else
2482                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2483                                        offset, length, 0, 0);
2484
2485         if (obj_request->type == OBJ_REQUEST_BIO)
2486                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2487                                         obj_request->bio_list, length);
2488         else if (obj_request->type == OBJ_REQUEST_PAGES)
2489                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2490                                         obj_request->pages, length,
2491                                         offset & ~PAGE_MASK, false, false);
2492
2493         /* Discards are also writes */
2494         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2495                 rbd_osd_req_format_write(obj_request);
2496         else
2497                 rbd_osd_req_format_read(obj_request);
2498 }
2499
2500 /*
2501  * Split up an image request into one or more object requests, each
2502  * to a different object.  The "type" parameter indicates whether
2503  * "data_desc" is the pointer to the head of a list of bio
2504  * structures, or the base of a page array.  In either case this
2505  * function assumes data_desc describes memory sufficient to hold
2506  * all data described by the image request.
2507  */
2508 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2509                                         enum obj_request_type type,
2510                                         void *data_desc)
2511 {
2512         struct rbd_device *rbd_dev = img_request->rbd_dev;
2513         struct rbd_obj_request *obj_request = NULL;
2514         struct rbd_obj_request *next_obj_request;
2515         struct bio *bio_list = NULL;
2516         unsigned int bio_offset = 0;
2517         struct page **pages = NULL;
2518         enum obj_operation_type op_type;
2519         u64 img_offset;
2520         u64 resid;
2521
2522         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2523                 (int)type, data_desc);
2524
2525         img_offset = img_request->offset;
2526         resid = img_request->length;
2527         rbd_assert(resid > 0);
2528         op_type = rbd_img_request_op_type(img_request);
2529
2530         if (type == OBJ_REQUEST_BIO) {
2531                 bio_list = data_desc;
2532                 rbd_assert(img_offset ==
2533                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2534         } else if (type == OBJ_REQUEST_PAGES) {
2535                 pages = data_desc;
2536         }
2537
2538         while (resid) {
2539                 struct ceph_osd_request *osd_req;
2540                 const char *object_name;
2541                 u64 offset;
2542                 u64 length;
2543
2544                 object_name = rbd_segment_name(rbd_dev, img_offset);
2545                 if (!object_name)
2546                         goto out_unwind;
2547                 offset = rbd_segment_offset(rbd_dev, img_offset);
2548                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2549                 obj_request = rbd_obj_request_create(object_name,
2550                                                 offset, length, type);
2551                 /* object request has its own copy of the object name */
2552                 rbd_segment_name_free(object_name);
2553                 if (!obj_request)
2554                         goto out_unwind;
2555
2556                 /*
2557                  * set obj_request->img_request before creating the
2558                  * osd_request so that it gets the right snapc
2559                  */
2560                 rbd_img_obj_request_add(img_request, obj_request);
2561
2562                 if (type == OBJ_REQUEST_BIO) {
2563                         unsigned int clone_size;
2564
2565                         rbd_assert(length <= (u64)UINT_MAX);
2566                         clone_size = (unsigned int)length;
2567                         obj_request->bio_list =
2568                                         bio_chain_clone_range(&bio_list,
2569                                                                 &bio_offset,
2570                                                                 clone_size,
2571                                                                 GFP_NOIO);
2572                         if (!obj_request->bio_list)
2573                                 goto out_unwind;
2574                 } else if (type == OBJ_REQUEST_PAGES) {
2575                         unsigned int page_count;
2576
2577                         obj_request->pages = pages;
2578                         page_count = (u32)calc_pages_for(offset, length);
2579                         obj_request->page_count = page_count;
2580                         if ((offset + length) & ~PAGE_MASK)
2581                                 page_count--;   /* more on last page */
2582                         pages += page_count;
2583                 }
2584
2585                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2586                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2587                                         obj_request);
2588                 if (!osd_req)
2589                         goto out_unwind;
2590
2591                 obj_request->osd_req = osd_req;
2592                 obj_request->callback = rbd_img_obj_callback;
2593                 obj_request->img_offset = img_offset;
2594
2595                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2596
2597                 img_offset += length;
2598                 resid -= length;
2599         }
2600
2601         return 0;
2602
2603 out_unwind:
2604         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2605                 rbd_img_obj_request_del(img_request, obj_request);
2606
2607         return -ENOMEM;
2608 }
2609
2610 static void
2611 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2612 {
2613         struct rbd_img_request *img_request;
2614         struct rbd_device *rbd_dev;
2615         struct page **pages;
2616         u32 page_count;
2617
2618         dout("%s: obj %p\n", __func__, obj_request);
2619
2620         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2621                 obj_request->type == OBJ_REQUEST_NODATA);
2622         rbd_assert(obj_request_img_data_test(obj_request));
2623         img_request = obj_request->img_request;
2624         rbd_assert(img_request);
2625
2626         rbd_dev = img_request->rbd_dev;
2627         rbd_assert(rbd_dev);
2628
2629         pages = obj_request->copyup_pages;
2630         rbd_assert(pages != NULL);
2631         obj_request->copyup_pages = NULL;
2632         page_count = obj_request->copyup_page_count;
2633         rbd_assert(page_count);
2634         obj_request->copyup_page_count = 0;
2635         ceph_release_page_vector(pages, page_count);
2636
2637         /*
2638          * We want the transfer count to reflect the size of the
2639          * original write request.  There is no such thing as a
2640          * successful short write, so if the request was successful
2641          * we can just set it to the originally-requested length.
2642          */
2643         if (!obj_request->result)
2644                 obj_request->xferred = obj_request->length;
2645
2646         obj_request_done_set(obj_request);
2647 }
2648
2649 static void
2650 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2651 {
2652         struct rbd_obj_request *orig_request;
2653         struct ceph_osd_request *osd_req;
2654         struct rbd_device *rbd_dev;
2655         struct page **pages;
2656         enum obj_operation_type op_type;
2657         u32 page_count;
2658         int img_result;
2659         u64 parent_length;
2660
2661         rbd_assert(img_request_child_test(img_request));
2662
2663         /* First get what we need from the image request */
2664
2665         pages = img_request->copyup_pages;
2666         rbd_assert(pages != NULL);
2667         img_request->copyup_pages = NULL;
2668         page_count = img_request->copyup_page_count;
2669         rbd_assert(page_count);
2670         img_request->copyup_page_count = 0;
2671
2672         orig_request = img_request->obj_request;
2673         rbd_assert(orig_request != NULL);
2674         rbd_assert(obj_request_type_valid(orig_request->type));
2675         img_result = img_request->result;
2676         parent_length = img_request->length;
2677         rbd_assert(img_result || parent_length == img_request->xferred);
2678         rbd_img_request_put(img_request);
2679
2680         rbd_assert(orig_request->img_request);
2681         rbd_dev = orig_request->img_request->rbd_dev;
2682         rbd_assert(rbd_dev);
2683
2684         /*
2685          * If the overlap has become 0 (most likely because the
2686          * image has been flattened) we need to free the pages
2687          * and re-submit the original write request.
2688          */
2689         if (!rbd_dev->parent_overlap) {
2690                 ceph_release_page_vector(pages, page_count);
2691                 rbd_obj_request_submit(orig_request);
2692                 return;
2693         }
2694
2695         if (img_result)
2696                 goto out_err;
2697
2698         /*
2699          * The original osd request is of no use to use any more.
2700          * We need a new one that can hold the three ops in a copyup
2701          * request.  Allocate the new copyup osd request for the
2702          * original request, and release the old one.
2703          */
2704         img_result = -ENOMEM;
2705         osd_req = rbd_osd_req_create_copyup(orig_request);
2706         if (!osd_req)
2707                 goto out_err;
2708         rbd_osd_req_destroy(orig_request->osd_req);
2709         orig_request->osd_req = osd_req;
2710         orig_request->copyup_pages = pages;
2711         orig_request->copyup_page_count = page_count;
2712
2713         /* Initialize the copyup op */
2714
2715         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2716         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2717                                                 false, false);
2718
2719         /* Add the other op(s) */
2720
2721         op_type = rbd_img_request_op_type(orig_request->img_request);
2722         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2723
2724         /* All set, send it off. */
2725
2726         rbd_obj_request_submit(orig_request);
2727         return;
2728
2729 out_err:
2730         ceph_release_page_vector(pages, page_count);
2731         rbd_obj_request_error(orig_request, img_result);
2732 }
2733
2734 /*
2735  * Read from the parent image the range of data that covers the
2736  * entire target of the given object request.  This is used for
2737  * satisfying a layered image write request when the target of an
2738  * object request from the image request does not exist.
2739  *
2740  * A page array big enough to hold the returned data is allocated
2741  * and supplied to rbd_img_request_fill() as the "data descriptor."
2742  * When the read completes, this page array will be transferred to
2743  * the original object request for the copyup operation.
2744  *
2745  * If an error occurs, it is recorded as the result of the original
2746  * object request in rbd_img_obj_exists_callback().
2747  */
2748 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2749 {
2750         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2751         struct rbd_img_request *parent_request = NULL;
2752         u64 img_offset;
2753         u64 length;
2754         struct page **pages = NULL;
2755         u32 page_count;
2756         int result;
2757
2758         rbd_assert(rbd_dev->parent != NULL);
2759
2760         /*
2761          * Determine the byte range covered by the object in the
2762          * child image to which the original request was to be sent.
2763          */
2764         img_offset = obj_request->img_offset - obj_request->offset;
2765         length = (u64)1 << rbd_dev->header.obj_order;
2766
2767         /*
2768          * There is no defined parent data beyond the parent
2769          * overlap, so limit what we read at that boundary if
2770          * necessary.
2771          */
2772         if (img_offset + length > rbd_dev->parent_overlap) {
2773                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2774                 length = rbd_dev->parent_overlap - img_offset;
2775         }
2776
2777         /*
2778          * Allocate a page array big enough to receive the data read
2779          * from the parent.
2780          */
2781         page_count = (u32)calc_pages_for(0, length);
2782         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2783         if (IS_ERR(pages)) {
2784                 result = PTR_ERR(pages);
2785                 pages = NULL;
2786                 goto out_err;
2787         }
2788
2789         result = -ENOMEM;
2790         parent_request = rbd_parent_request_create(obj_request,
2791                                                 img_offset, length);
2792         if (!parent_request)
2793                 goto out_err;
2794
2795         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2796         if (result)
2797                 goto out_err;
2798
2799         parent_request->copyup_pages = pages;
2800         parent_request->copyup_page_count = page_count;
2801         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2802
2803         result = rbd_img_request_submit(parent_request);
2804         if (!result)
2805                 return 0;
2806
2807         parent_request->copyup_pages = NULL;
2808         parent_request->copyup_page_count = 0;
2809         parent_request->obj_request = NULL;
2810         rbd_obj_request_put(obj_request);
2811 out_err:
2812         if (pages)
2813                 ceph_release_page_vector(pages, page_count);
2814         if (parent_request)
2815                 rbd_img_request_put(parent_request);
2816         return result;
2817 }
2818
2819 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2820 {
2821         struct rbd_obj_request *orig_request;
2822         struct rbd_device *rbd_dev;
2823         int result;
2824
2825         rbd_assert(!obj_request_img_data_test(obj_request));
2826
2827         /*
2828          * All we need from the object request is the original
2829          * request and the result of the STAT op.  Grab those, then
2830          * we're done with the request.
2831          */
2832         orig_request = obj_request->obj_request;
2833         obj_request->obj_request = NULL;
2834         rbd_obj_request_put(orig_request);
2835         rbd_assert(orig_request);
2836         rbd_assert(orig_request->img_request);
2837
2838         result = obj_request->result;
2839         obj_request->result = 0;
2840
2841         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2842                 obj_request, orig_request, result,
2843                 obj_request->xferred, obj_request->length);
2844         rbd_obj_request_put(obj_request);
2845
2846         /*
2847          * If the overlap has become 0 (most likely because the
2848          * image has been flattened) we need to re-submit the
2849          * original request.
2850          */
2851         rbd_dev = orig_request->img_request->rbd_dev;
2852         if (!rbd_dev->parent_overlap) {
2853                 rbd_obj_request_submit(orig_request);
2854                 return;
2855         }
2856
2857         /*
2858          * Our only purpose here is to determine whether the object
2859          * exists, and we don't want to treat the non-existence as
2860          * an error.  If something else comes back, transfer the
2861          * error to the original request and complete it now.
2862          */
2863         if (!result) {
2864                 obj_request_existence_set(orig_request, true);
2865         } else if (result == -ENOENT) {
2866                 obj_request_existence_set(orig_request, false);
2867         } else {
2868                 goto fail_orig_request;
2869         }
2870
2871         /*
2872          * Resubmit the original request now that we have recorded
2873          * whether the target object exists.
2874          */
2875         result = rbd_img_obj_request_submit(orig_request);
2876         if (result)
2877                 goto fail_orig_request;
2878
2879         return;
2880
2881 fail_orig_request:
2882         rbd_obj_request_error(orig_request, result);
2883 }
2884
2885 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2886 {
2887         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2888         struct rbd_obj_request *stat_request;
2889         struct page **pages;
2890         u32 page_count;
2891         size_t size;
2892         int ret;
2893
2894         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2895                                               OBJ_REQUEST_PAGES);
2896         if (!stat_request)
2897                 return -ENOMEM;
2898
2899         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2900                                                    stat_request);
2901         if (!stat_request->osd_req) {
2902                 ret = -ENOMEM;
2903                 goto fail_stat_request;
2904         }
2905
2906         /*
2907          * The response data for a STAT call consists of:
2908          *     le64 length;
2909          *     struct {
2910          *         le32 tv_sec;
2911          *         le32 tv_nsec;
2912          *     } mtime;
2913          */
2914         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2915         page_count = (u32)calc_pages_for(0, size);
2916         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2917         if (IS_ERR(pages)) {
2918                 ret = PTR_ERR(pages);
2919                 goto fail_stat_request;
2920         }
2921
2922         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2923         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2924                                      false, false);
2925
2926         rbd_obj_request_get(obj_request);
2927         stat_request->obj_request = obj_request;
2928         stat_request->pages = pages;
2929         stat_request->page_count = page_count;
2930         stat_request->callback = rbd_img_obj_exists_callback;
2931
2932         rbd_obj_request_submit(stat_request);
2933         return 0;
2934
2935 fail_stat_request:
2936         rbd_obj_request_put(stat_request);
2937         return ret;
2938 }
2939
2940 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2941 {
2942         struct rbd_img_request *img_request = obj_request->img_request;
2943         struct rbd_device *rbd_dev = img_request->rbd_dev;
2944
2945         /* Reads */
2946         if (!img_request_write_test(img_request) &&
2947             !img_request_discard_test(img_request))
2948                 return true;
2949
2950         /* Non-layered writes */
2951         if (!img_request_layered_test(img_request))
2952                 return true;
2953
2954         /*
2955          * Layered writes outside of the parent overlap range don't
2956          * share any data with the parent.
2957          */
2958         if (!obj_request_overlaps_parent(obj_request))
2959                 return true;
2960
2961         /*
2962          * Entire-object layered writes - we will overwrite whatever
2963          * parent data there is anyway.
2964          */
2965         if (!obj_request->offset &&
2966             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2967                 return true;
2968
2969         /*
2970          * If the object is known to already exist, its parent data has
2971          * already been copied.
2972          */
2973         if (obj_request_known_test(obj_request) &&
2974             obj_request_exists_test(obj_request))
2975                 return true;
2976
2977         return false;
2978 }
2979
2980 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2981 {
2982         rbd_assert(obj_request_img_data_test(obj_request));
2983         rbd_assert(obj_request_type_valid(obj_request->type));
2984         rbd_assert(obj_request->img_request);
2985
2986         if (img_obj_request_simple(obj_request)) {
2987                 rbd_obj_request_submit(obj_request);
2988                 return 0;
2989         }
2990
2991         /*
2992          * It's a layered write.  The target object might exist but
2993          * we may not know that yet.  If we know it doesn't exist,
2994          * start by reading the data for the full target object from
2995          * the parent so we can use it for a copyup to the target.
2996          */
2997         if (obj_request_known_test(obj_request))
2998                 return rbd_img_obj_parent_read_full(obj_request);
2999
3000         /* We don't know whether the target exists.  Go find out. */
3001
3002         return rbd_img_obj_exists_submit(obj_request);
3003 }
3004
3005 static int rbd_img_request_submit(struct rbd_img_request *img_request)
3006 {
3007         struct rbd_obj_request *obj_request;
3008         struct rbd_obj_request *next_obj_request;
3009         int ret = 0;
3010
3011         dout("%s: img %p\n", __func__, img_request);
3012
3013         rbd_img_request_get(img_request);
3014         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
3015                 ret = rbd_img_obj_request_submit(obj_request);
3016                 if (ret)
3017                         goto out_put_ireq;
3018         }
3019
3020 out_put_ireq:
3021         rbd_img_request_put(img_request);
3022         return ret;
3023 }
3024
3025 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
3026 {
3027         struct rbd_obj_request *obj_request;
3028         struct rbd_device *rbd_dev;
3029         u64 obj_end;
3030         u64 img_xferred;
3031         int img_result;
3032
3033         rbd_assert(img_request_child_test(img_request));
3034
3035         /* First get what we need from the image request and release it */
3036
3037         obj_request = img_request->obj_request;
3038         img_xferred = img_request->xferred;
3039         img_result = img_request->result;
3040         rbd_img_request_put(img_request);
3041
3042         /*
3043          * If the overlap has become 0 (most likely because the
3044          * image has been flattened) we need to re-submit the
3045          * original request.
3046          */
3047         rbd_assert(obj_request);
3048         rbd_assert(obj_request->img_request);
3049         rbd_dev = obj_request->img_request->rbd_dev;
3050         if (!rbd_dev->parent_overlap) {
3051                 rbd_obj_request_submit(obj_request);
3052                 return;
3053         }
3054
3055         obj_request->result = img_result;
3056         if (obj_request->result)
3057                 goto out;
3058
3059         /*
3060          * We need to zero anything beyond the parent overlap
3061          * boundary.  Since rbd_img_obj_request_read_callback()
3062          * will zero anything beyond the end of a short read, an
3063          * easy way to do this is to pretend the data from the
3064          * parent came up short--ending at the overlap boundary.
3065          */
3066         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3067         obj_end = obj_request->img_offset + obj_request->length;
3068         if (obj_end > rbd_dev->parent_overlap) {
3069                 u64 xferred = 0;
3070
3071                 if (obj_request->img_offset < rbd_dev->parent_overlap)
3072                         xferred = rbd_dev->parent_overlap -
3073                                         obj_request->img_offset;
3074
3075                 obj_request->xferred = min(img_xferred, xferred);
3076         } else {
3077                 obj_request->xferred = img_xferred;
3078         }
3079 out:
3080         rbd_img_obj_request_read_callback(obj_request);
3081         rbd_obj_request_complete(obj_request);
3082 }
3083
3084 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3085 {
3086         struct rbd_img_request *img_request;
3087         int result;
3088
3089         rbd_assert(obj_request_img_data_test(obj_request));
3090         rbd_assert(obj_request->img_request != NULL);
3091         rbd_assert(obj_request->result == (s32) -ENOENT);
3092         rbd_assert(obj_request_type_valid(obj_request->type));
3093
3094         /* rbd_read_finish(obj_request, obj_request->length); */
3095         img_request = rbd_parent_request_create(obj_request,
3096                                                 obj_request->img_offset,
3097                                                 obj_request->length);
3098         result = -ENOMEM;
3099         if (!img_request)
3100                 goto out_err;
3101
3102         if (obj_request->type == OBJ_REQUEST_BIO)
3103                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3104                                                 obj_request->bio_list);
3105         else
3106                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3107                                                 obj_request->pages);
3108         if (result)
3109                 goto out_err;
3110
3111         img_request->callback = rbd_img_parent_read_callback;
3112         result = rbd_img_request_submit(img_request);
3113         if (result)
3114                 goto out_err;
3115
3116         return;
3117 out_err:
3118         if (img_request)
3119                 rbd_img_request_put(img_request);
3120         obj_request->result = result;
3121         obj_request->xferred = 0;
3122         obj_request_done_set(obj_request);
3123 }
3124
3125 static const struct rbd_client_id rbd_empty_cid;
3126
3127 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3128                           const struct rbd_client_id *rhs)
3129 {
3130         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3131 }
3132
3133 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3134 {
3135         struct rbd_client_id cid;
3136
3137         mutex_lock(&rbd_dev->watch_mutex);
3138         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3139         cid.handle = rbd_dev->watch_cookie;
3140         mutex_unlock(&rbd_dev->watch_mutex);
3141         return cid;
3142 }
3143
3144 /*
3145  * lock_rwsem must be held for write
3146  */
3147 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3148                               const struct rbd_client_id *cid)
3149 {
3150         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3151              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3152              cid->gid, cid->handle);
3153         rbd_dev->owner_cid = *cid; /* struct */
3154 }
3155
3156 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3157 {
3158         mutex_lock(&rbd_dev->watch_mutex);
3159         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3160         mutex_unlock(&rbd_dev->watch_mutex);
3161 }
3162
3163 /*
3164  * lock_rwsem must be held for write
3165  */
3166 static int rbd_lock(struct rbd_device *rbd_dev)
3167 {
3168         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3169         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3170         char cookie[32];
3171         int ret;
3172
3173         WARN_ON(__rbd_is_lock_owner(rbd_dev));
3174
3175         format_lock_cookie(rbd_dev, cookie);
3176         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3177                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3178                             RBD_LOCK_TAG, "", 0);
3179         if (ret)
3180                 return ret;
3181
3182         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3183         rbd_set_owner_cid(rbd_dev, &cid);
3184         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3185         return 0;
3186 }
3187
3188 /*
3189  * lock_rwsem must be held for write
3190  */
3191 static int rbd_unlock(struct rbd_device *rbd_dev)
3192 {
3193         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3194         char cookie[32];
3195         int ret;
3196
3197         WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3198
3199         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3200
3201         format_lock_cookie(rbd_dev, cookie);
3202         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3203                               RBD_LOCK_NAME, cookie);
3204         if (ret && ret != -ENOENT) {
3205                 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3206                 return ret;
3207         }
3208
3209         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3210         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3211         return 0;
3212 }
3213
3214 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3215                                 enum rbd_notify_op notify_op,
3216                                 struct page ***preply_pages,
3217                                 size_t *preply_len)
3218 {
3219         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3220         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3221         int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3222         char buf[buf_size];
3223         void *p = buf;
3224
3225         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3226
3227         /* encode *LockPayload NotifyMessage (op + ClientId) */
3228         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3229         ceph_encode_32(&p, notify_op);
3230         ceph_encode_64(&p, cid.gid);
3231         ceph_encode_64(&p, cid.handle);
3232
3233         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3234                                 &rbd_dev->header_oloc, buf, buf_size,
3235                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3236 }
3237
3238 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3239                                enum rbd_notify_op notify_op)
3240 {
3241         struct page **reply_pages;
3242         size_t reply_len;
3243
3244         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3245         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3246 }
3247
3248 static void rbd_notify_acquired_lock(struct work_struct *work)
3249 {
3250         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3251                                                   acquired_lock_work);
3252
3253         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3254 }
3255
3256 static void rbd_notify_released_lock(struct work_struct *work)
3257 {
3258         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3259                                                   released_lock_work);
3260
3261         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3262 }
3263
3264 static int rbd_request_lock(struct rbd_device *rbd_dev)
3265 {
3266         struct page **reply_pages;
3267         size_t reply_len;
3268         bool lock_owner_responded = false;
3269         int ret;
3270
3271         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3272
3273         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3274                                    &reply_pages, &reply_len);
3275         if (ret && ret != -ETIMEDOUT) {
3276                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3277                 goto out;
3278         }
3279
3280         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3281                 void *p = page_address(reply_pages[0]);
3282                 void *const end = p + reply_len;
3283                 u32 n;
3284
3285                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3286                 while (n--) {
3287                         u8 struct_v;
3288                         u32 len;
3289
3290                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3291                         p += 8 + 8; /* skip gid and cookie */
3292
3293                         ceph_decode_32_safe(&p, end, len, e_inval);
3294                         if (!len)
3295                                 continue;
3296
3297                         if (lock_owner_responded) {
3298                                 rbd_warn(rbd_dev,
3299                                          "duplicate lock owners detected");
3300                                 ret = -EIO;
3301                                 goto out;
3302                         }
3303
3304                         lock_owner_responded = true;
3305                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3306                                                   &struct_v, &len);
3307                         if (ret) {
3308                                 rbd_warn(rbd_dev,
3309                                          "failed to decode ResponseMessage: %d",
3310                                          ret);
3311                                 goto e_inval;
3312                         }
3313
3314                         ret = ceph_decode_32(&p);
3315                 }
3316         }
3317
3318         if (!lock_owner_responded) {
3319                 rbd_warn(rbd_dev, "no lock owners detected");
3320                 ret = -ETIMEDOUT;
3321         }
3322
3323 out:
3324         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3325         return ret;
3326
3327 e_inval:
3328         ret = -EINVAL;
3329         goto out;
3330 }
3331
3332 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3333 {
3334         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3335
3336         cancel_delayed_work(&rbd_dev->lock_dwork);
3337         if (wake_all)
3338                 wake_up_all(&rbd_dev->lock_waitq);
3339         else
3340                 wake_up(&rbd_dev->lock_waitq);
3341 }
3342
3343 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3344                                struct ceph_locker **lockers, u32 *num_lockers)
3345 {
3346         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3347         u8 lock_type;
3348         char *lock_tag;
3349         int ret;
3350
3351         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3352
3353         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3354                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3355                                  &lock_type, &lock_tag, lockers, num_lockers);
3356         if (ret)
3357                 return ret;
3358
3359         if (*num_lockers == 0) {
3360                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3361                 goto out;
3362         }
3363
3364         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3365                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3366                          lock_tag);
3367                 ret = -EBUSY;
3368                 goto out;
3369         }
3370
3371         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3372                 rbd_warn(rbd_dev, "shared lock type detected");
3373                 ret = -EBUSY;
3374                 goto out;
3375         }
3376
3377         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3378                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3379                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3380                          (*lockers)[0].id.cookie);
3381                 ret = -EBUSY;
3382                 goto out;
3383         }
3384
3385 out:
3386         kfree(lock_tag);
3387         return ret;
3388 }
3389
3390 static int find_watcher(struct rbd_device *rbd_dev,
3391                         const struct ceph_locker *locker)
3392 {
3393         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3394         struct ceph_watch_item *watchers;
3395         u32 num_watchers;
3396         u64 cookie;
3397         int i;
3398         int ret;
3399
3400         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3401                                       &rbd_dev->header_oloc, &watchers,
3402                                       &num_watchers);
3403         if (ret)
3404                 return ret;
3405
3406         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3407         for (i = 0; i < num_watchers; i++) {
3408                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3409                             sizeof(locker->info.addr)) &&
3410                     watchers[i].cookie == cookie) {
3411                         struct rbd_client_id cid = {
3412                                 .gid = le64_to_cpu(watchers[i].name.num),
3413                                 .handle = cookie,
3414                         };
3415
3416                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3417                              rbd_dev, cid.gid, cid.handle);
3418                         rbd_set_owner_cid(rbd_dev, &cid);
3419                         ret = 1;
3420                         goto out;
3421                 }
3422         }
3423
3424         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3425         ret = 0;
3426 out:
3427         kfree(watchers);
3428         return ret;
3429 }
3430
3431 /*
3432  * lock_rwsem must be held for write
3433  */
3434 static int rbd_try_lock(struct rbd_device *rbd_dev)
3435 {
3436         struct ceph_client *client = rbd_dev->rbd_client->client;
3437         struct ceph_locker *lockers;
3438         u32 num_lockers;
3439         int ret;
3440
3441         for (;;) {
3442                 ret = rbd_lock(rbd_dev);
3443                 if (ret != -EBUSY)
3444                         return ret;
3445
3446                 /* determine if the current lock holder is still alive */
3447                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3448                 if (ret)
3449                         return ret;
3450
3451                 if (num_lockers == 0)
3452                         goto again;
3453
3454                 ret = find_watcher(rbd_dev, lockers);
3455                 if (ret) {
3456                         if (ret > 0)
3457                                 ret = 0; /* have to request lock */
3458                         goto out;
3459                 }
3460
3461                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3462                          ENTITY_NAME(lockers[0].id.name));
3463
3464                 ret = ceph_monc_blacklist_add(&client->monc,
3465                                               &lockers[0].info.addr);
3466                 if (ret) {
3467                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3468                                  ENTITY_NAME(lockers[0].id.name), ret);
3469                         goto out;
3470                 }
3471
3472                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3473                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3474                                           lockers[0].id.cookie,
3475                                           &lockers[0].id.name);
3476                 if (ret && ret != -ENOENT)
3477                         goto out;
3478
3479 again:
3480                 ceph_free_lockers(lockers, num_lockers);
3481         }
3482
3483 out:
3484         ceph_free_lockers(lockers, num_lockers);
3485         return ret;
3486 }
3487
3488 /*
3489  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3490  */
3491 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3492                                                 int *pret)
3493 {
3494         enum rbd_lock_state lock_state;
3495
3496         down_read(&rbd_dev->lock_rwsem);
3497         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3498              rbd_dev->lock_state);
3499         if (__rbd_is_lock_owner(rbd_dev)) {
3500                 lock_state = rbd_dev->lock_state;
3501                 up_read(&rbd_dev->lock_rwsem);
3502                 return lock_state;
3503         }
3504
3505         up_read(&rbd_dev->lock_rwsem);
3506         down_write(&rbd_dev->lock_rwsem);
3507         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3508              rbd_dev->lock_state);
3509         if (!__rbd_is_lock_owner(rbd_dev)) {
3510                 *pret = rbd_try_lock(rbd_dev);
3511                 if (*pret)
3512                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3513         }
3514
3515         lock_state = rbd_dev->lock_state;
3516         up_write(&rbd_dev->lock_rwsem);
3517         return lock_state;
3518 }
3519
3520 static void rbd_acquire_lock(struct work_struct *work)
3521 {
3522         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3523                                             struct rbd_device, lock_dwork);
3524         enum rbd_lock_state lock_state;
3525         int ret;
3526
3527         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3528 again:
3529         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3530         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3531                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3532                         wake_requests(rbd_dev, true);
3533                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3534                      rbd_dev, lock_state, ret);
3535                 return;
3536         }
3537
3538         ret = rbd_request_lock(rbd_dev);
3539         if (ret == -ETIMEDOUT) {
3540                 goto again; /* treat this as a dead client */
3541         } else if (ret < 0) {
3542                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3543                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3544                                  RBD_RETRY_DELAY);
3545         } else {
3546                 /*
3547                  * lock owner acked, but resend if we don't see them
3548                  * release the lock
3549                  */
3550                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3551                      rbd_dev);
3552                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3553                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3554         }
3555 }
3556
3557 /*
3558  * lock_rwsem must be held for write
3559  */
3560 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3561 {
3562         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3563              rbd_dev->lock_state);
3564         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3565                 return false;
3566
3567         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3568         downgrade_write(&rbd_dev->lock_rwsem);
3569         /*
3570          * Ensure that all in-flight IO is flushed.
3571          *
3572          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3573          * may be shared with other devices.
3574          */
3575         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3576         up_read(&rbd_dev->lock_rwsem);
3577
3578         down_write(&rbd_dev->lock_rwsem);
3579         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3580              rbd_dev->lock_state);
3581         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3582                 return false;
3583
3584         if (!rbd_unlock(rbd_dev))
3585                 /*
3586                  * Give others a chance to grab the lock - we would re-acquire
3587                  * almost immediately if we got new IO during ceph_osdc_sync()
3588                  * otherwise.  We need to ack our own notifications, so this
3589                  * lock_dwork will be requeued from rbd_wait_state_locked()
3590                  * after wake_requests() in rbd_handle_released_lock().
3591                  */
3592                 cancel_delayed_work(&rbd_dev->lock_dwork);
3593
3594         return true;
3595 }
3596
3597 static void rbd_release_lock_work(struct work_struct *work)
3598 {
3599         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3600                                                   unlock_work);
3601
3602         down_write(&rbd_dev->lock_rwsem);
3603         rbd_release_lock(rbd_dev);
3604         up_write(&rbd_dev->lock_rwsem);
3605 }
3606
3607 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3608                                      void **p)
3609 {
3610         struct rbd_client_id cid = { 0 };
3611
3612         if (struct_v >= 2) {
3613                 cid.gid = ceph_decode_64(p);
3614                 cid.handle = ceph_decode_64(p);
3615         }
3616
3617         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3618              cid.handle);
3619         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3620                 down_write(&rbd_dev->lock_rwsem);
3621                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3622                         /*
3623                          * we already know that the remote client is
3624                          * the owner
3625                          */
3626                         up_write(&rbd_dev->lock_rwsem);
3627                         return;
3628                 }
3629
3630                 rbd_set_owner_cid(rbd_dev, &cid);
3631                 downgrade_write(&rbd_dev->lock_rwsem);
3632         } else {
3633                 down_read(&rbd_dev->lock_rwsem);
3634         }
3635
3636         if (!__rbd_is_lock_owner(rbd_dev))
3637                 wake_requests(rbd_dev, false);
3638         up_read(&rbd_dev->lock_rwsem);
3639 }
3640
3641 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3642                                      void **p)
3643 {
3644         struct rbd_client_id cid = { 0 };
3645
3646         if (struct_v >= 2) {
3647                 cid.gid = ceph_decode_64(p);
3648                 cid.handle = ceph_decode_64(p);
3649         }
3650
3651         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3652              cid.handle);
3653         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3654                 down_write(&rbd_dev->lock_rwsem);
3655                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3656                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3657                              __func__, rbd_dev, cid.gid, cid.handle,
3658                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3659                         up_write(&rbd_dev->lock_rwsem);
3660                         return;
3661                 }
3662
3663                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3664                 downgrade_write(&rbd_dev->lock_rwsem);
3665         } else {
3666                 down_read(&rbd_dev->lock_rwsem);
3667         }
3668
3669         if (!__rbd_is_lock_owner(rbd_dev))
3670                 wake_requests(rbd_dev, false);
3671         up_read(&rbd_dev->lock_rwsem);
3672 }
3673
3674 static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3675                                     void **p)
3676 {
3677         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3678         struct rbd_client_id cid = { 0 };
3679         bool need_to_send;
3680
3681         if (struct_v >= 2) {
3682                 cid.gid = ceph_decode_64(p);
3683                 cid.handle = ceph_decode_64(p);
3684         }
3685
3686         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3687              cid.handle);
3688         if (rbd_cid_equal(&cid, &my_cid))
3689                 return false;
3690
3691         down_read(&rbd_dev->lock_rwsem);
3692         need_to_send = __rbd_is_lock_owner(rbd_dev);
3693         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3694                 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3695                         dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3696                              rbd_dev);
3697                         queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3698                 }
3699         }
3700         up_read(&rbd_dev->lock_rwsem);
3701         return need_to_send;
3702 }
3703
3704 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3705                                      u64 notify_id, u64 cookie, s32 *result)
3706 {
3707         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3708         int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3709         char buf[buf_size];
3710         int ret;
3711
3712         if (result) {
3713                 void *p = buf;
3714
3715                 /* encode ResponseMessage */
3716                 ceph_start_encoding(&p, 1, 1,
3717                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3718                 ceph_encode_32(&p, *result);
3719         } else {
3720                 buf_size = 0;
3721         }
3722
3723         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3724                                    &rbd_dev->header_oloc, notify_id, cookie,
3725                                    buf, buf_size);
3726         if (ret)
3727                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3728 }
3729
3730 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3731                                    u64 cookie)
3732 {
3733         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3734         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3735 }
3736
3737 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3738                                           u64 notify_id, u64 cookie, s32 result)
3739 {
3740         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3741         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3742 }
3743
3744 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3745                          u64 notifier_id, void *data, size_t data_len)
3746 {
3747         struct rbd_device *rbd_dev = arg;
3748         void *p = data;
3749         void *const end = p + data_len;
3750         u8 struct_v = 0;
3751         u32 len;
3752         u32 notify_op;
3753         int ret;
3754
3755         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3756              __func__, rbd_dev, cookie, notify_id, data_len);
3757         if (data_len) {
3758                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3759                                           &struct_v, &len);
3760                 if (ret) {
3761                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3762                                  ret);
3763                         return;
3764                 }
3765
3766                 notify_op = ceph_decode_32(&p);
3767         } else {
3768                 /* legacy notification for header updates */
3769                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3770                 len = 0;
3771         }
3772
3773         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3774         switch (notify_op) {
3775         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3776                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3777                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3778                 break;
3779         case RBD_NOTIFY_OP_RELEASED_LOCK:
3780                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3781                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3782                 break;
3783         case RBD_NOTIFY_OP_REQUEST_LOCK:
3784                 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3785                         /*
3786                          * send ResponseMessage(0) back so the client
3787                          * can detect a missing owner
3788                          */
3789                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3790                                                       cookie, 0);
3791                 else
3792                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3793                 break;
3794         case RBD_NOTIFY_OP_HEADER_UPDATE:
3795                 ret = rbd_dev_refresh(rbd_dev);
3796                 if (ret)
3797                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3798
3799                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3800                 break;
3801         default:
3802                 if (rbd_is_lock_owner(rbd_dev))
3803                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3804                                                       cookie, -EOPNOTSUPP);
3805                 else
3806                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3807                 break;
3808         }
3809 }
3810
3811 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3812
3813 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3814 {
3815         struct rbd_device *rbd_dev = arg;
3816
3817         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3818
3819         down_write(&rbd_dev->lock_rwsem);
3820         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3821         up_write(&rbd_dev->lock_rwsem);
3822
3823         mutex_lock(&rbd_dev->watch_mutex);
3824         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3825                 __rbd_unregister_watch(rbd_dev);
3826                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3827
3828                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3829         }
3830         mutex_unlock(&rbd_dev->watch_mutex);
3831 }
3832
3833 /*
3834  * watch_mutex must be locked
3835  */
3836 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3837 {
3838         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3839         struct ceph_osd_linger_request *handle;
3840
3841         rbd_assert(!rbd_dev->watch_handle);
3842         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3843
3844         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3845                                  &rbd_dev->header_oloc, rbd_watch_cb,
3846                                  rbd_watch_errcb, rbd_dev);
3847         if (IS_ERR(handle))
3848                 return PTR_ERR(handle);
3849
3850         rbd_dev->watch_handle = handle;
3851         return 0;
3852 }
3853
3854 /*
3855  * watch_mutex must be locked
3856  */
3857 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3858 {
3859         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3860         int ret;
3861
3862         rbd_assert(rbd_dev->watch_handle);
3863         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3864
3865         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3866         if (ret)
3867                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3868
3869         rbd_dev->watch_handle = NULL;
3870 }
3871
3872 static int rbd_register_watch(struct rbd_device *rbd_dev)
3873 {
3874         int ret;
3875
3876         mutex_lock(&rbd_dev->watch_mutex);
3877         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3878         ret = __rbd_register_watch(rbd_dev);
3879         if (ret)
3880                 goto out;
3881
3882         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3883         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3884
3885 out:
3886         mutex_unlock(&rbd_dev->watch_mutex);
3887         return ret;
3888 }
3889
3890 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3891 {
3892         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3893
3894         cancel_work_sync(&rbd_dev->acquired_lock_work);
3895         cancel_work_sync(&rbd_dev->released_lock_work);
3896         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3897         cancel_work_sync(&rbd_dev->unlock_work);
3898 }
3899
3900 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3901 {
3902         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3903         cancel_tasks_sync(rbd_dev);
3904
3905         mutex_lock(&rbd_dev->watch_mutex);
3906         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3907                 __rbd_unregister_watch(rbd_dev);
3908         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3909         mutex_unlock(&rbd_dev->watch_mutex);
3910
3911         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3912         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3913 }
3914
3915 static void rbd_reregister_watch(struct work_struct *work)
3916 {
3917         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3918                                             struct rbd_device, watch_dwork);
3919         bool was_lock_owner = false;
3920         bool need_to_wake = false;
3921         int ret;
3922
3923         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3924
3925         down_write(&rbd_dev->lock_rwsem);
3926         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3927                 was_lock_owner = rbd_release_lock(rbd_dev);
3928
3929         mutex_lock(&rbd_dev->watch_mutex);
3930         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3931                 mutex_unlock(&rbd_dev->watch_mutex);
3932                 goto out;
3933         }
3934
3935         ret = __rbd_register_watch(rbd_dev);
3936         if (ret) {
3937                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3938                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3939                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3940                         need_to_wake = true;
3941                 } else {
3942                         queue_delayed_work(rbd_dev->task_wq,
3943                                            &rbd_dev->watch_dwork,
3944                                            RBD_RETRY_DELAY);
3945                 }
3946                 mutex_unlock(&rbd_dev->watch_mutex);
3947                 goto out;
3948         }
3949
3950         need_to_wake = true;
3951         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3952         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3953         mutex_unlock(&rbd_dev->watch_mutex);
3954
3955         ret = rbd_dev_refresh(rbd_dev);
3956         if (ret)
3957                 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3958
3959         if (was_lock_owner) {
3960                 ret = rbd_try_lock(rbd_dev);
3961                 if (ret)
3962                         rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3963                                  ret);
3964         }
3965
3966 out:
3967         up_write(&rbd_dev->lock_rwsem);
3968         if (need_to_wake)
3969                 wake_requests(rbd_dev, true);
3970 }
3971
3972 /*
3973  * Synchronous osd object method call.  Returns the number of bytes
3974  * returned in the outbound buffer, or a negative error code.
3975  */
3976 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3977                              const char *object_name,
3978                              const char *class_name,
3979                              const char *method_name,
3980                              const void *outbound,
3981                              size_t outbound_size,
3982                              void *inbound,
3983                              size_t inbound_size)
3984 {
3985         struct rbd_obj_request *obj_request;
3986         struct page **pages;
3987         u32 page_count;
3988         int ret;
3989
3990         /*
3991          * Method calls are ultimately read operations.  The result
3992          * should placed into the inbound buffer provided.  They
3993          * also supply outbound data--parameters for the object
3994          * method.  Currently if this is present it will be a
3995          * snapshot id.
3996          */
3997         page_count = (u32)calc_pages_for(0, inbound_size);
3998         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3999         if (IS_ERR(pages))
4000                 return PTR_ERR(pages);
4001
4002         ret = -ENOMEM;
4003         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
4004                                                         OBJ_REQUEST_PAGES);
4005         if (!obj_request)
4006                 goto out;
4007
4008         obj_request->pages = pages;
4009         obj_request->page_count = page_count;
4010
4011         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4012                                                   obj_request);
4013         if (!obj_request->osd_req)
4014                 goto out;
4015
4016         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
4017                                         class_name, method_name);
4018         if (outbound_size) {
4019                 struct ceph_pagelist *pagelist;
4020
4021                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
4022                 if (!pagelist)
4023                         goto out;
4024
4025                 ceph_pagelist_init(pagelist);
4026                 ceph_pagelist_append(pagelist, outbound, outbound_size);
4027                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
4028                                                 pagelist);
4029         }
4030         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
4031                                         obj_request->pages, inbound_size,
4032                                         0, false, false);
4033
4034         rbd_obj_request_submit(obj_request);
4035         ret = rbd_obj_request_wait(obj_request);
4036         if (ret)
4037                 goto out;
4038
4039         ret = obj_request->result;
4040         if (ret < 0)
4041                 goto out;
4042
4043         rbd_assert(obj_request->xferred < (u64)INT_MAX);
4044         ret = (int)obj_request->xferred;
4045         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
4046 out:
4047         if (obj_request)
4048                 rbd_obj_request_put(obj_request);
4049         else
4050                 ceph_release_page_vector(pages, page_count);
4051
4052         return ret;
4053 }
4054
4055 /*
4056  * lock_rwsem must be held for read
4057  */
4058 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4059 {
4060         DEFINE_WAIT(wait);
4061
4062         do {
4063                 /*
4064                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
4065                  * and cancel_delayed_work() in wake_requests().
4066                  */
4067                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4068                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4069                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4070                                           TASK_UNINTERRUPTIBLE);
4071                 up_read(&rbd_dev->lock_rwsem);
4072                 schedule();
4073                 down_read(&rbd_dev->lock_rwsem);
4074         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4075                  !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4076
4077         finish_wait(&rbd_dev->lock_waitq, &wait);
4078 }
4079
4080 static void rbd_queue_workfn(struct work_struct *work)
4081 {
4082         struct request *rq = blk_mq_rq_from_pdu(work);
4083         struct rbd_device *rbd_dev = rq->q->queuedata;
4084         struct rbd_img_request *img_request;
4085         struct ceph_snap_context *snapc = NULL;
4086         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4087         u64 length = blk_rq_bytes(rq);
4088         enum obj_operation_type op_type;
4089         u64 mapping_size;
4090         bool must_be_locked;
4091         int result;
4092
4093         if (rq->cmd_type != REQ_TYPE_FS) {
4094                 dout("%s: non-fs request type %d\n", __func__,
4095                         (int) rq->cmd_type);
4096                 result = -EIO;
4097                 goto err;
4098         }
4099
4100         if (req_op(rq) == REQ_OP_DISCARD)
4101                 op_type = OBJ_OP_DISCARD;
4102         else if (req_op(rq) == REQ_OP_WRITE)
4103                 op_type = OBJ_OP_WRITE;
4104         else
4105                 op_type = OBJ_OP_READ;
4106
4107         /* Ignore/skip any zero-length requests */
4108
4109         if (!length) {
4110                 dout("%s: zero-length request\n", __func__);
4111                 result = 0;
4112                 goto err_rq;
4113         }
4114
4115         /* Only reads are allowed to a read-only device */
4116
4117         if (op_type != OBJ_OP_READ) {
4118                 if (rbd_dev->mapping.read_only) {
4119                         result = -EROFS;
4120                         goto err_rq;
4121                 }
4122                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4123         }
4124
4125         /*
4126          * Quit early if the mapped snapshot no longer exists.  It's
4127          * still possible the snapshot will have disappeared by the
4128          * time our request arrives at the osd, but there's no sense in
4129          * sending it if we already know.
4130          */
4131         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4132                 dout("request for non-existent snapshot");
4133                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4134                 result = -ENXIO;
4135                 goto err_rq;
4136         }
4137
4138         if (offset && length > U64_MAX - offset + 1) {
4139                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4140                          length);
4141                 result = -EINVAL;
4142                 goto err_rq;    /* Shouldn't happen */
4143         }
4144
4145         blk_mq_start_request(rq);
4146
4147         down_read(&rbd_dev->header_rwsem);
4148         mapping_size = rbd_dev->mapping.size;
4149         if (op_type != OBJ_OP_READ) {
4150                 snapc = rbd_dev->header.snapc;
4151                 ceph_get_snap_context(snapc);
4152                 must_be_locked = rbd_is_lock_supported(rbd_dev);
4153         } else {
4154                 must_be_locked = rbd_dev->opts->lock_on_read &&
4155                                         rbd_is_lock_supported(rbd_dev);
4156         }
4157         up_read(&rbd_dev->header_rwsem);
4158
4159         if (offset + length > mapping_size) {
4160                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4161                          length, mapping_size);
4162                 result = -EIO;
4163                 goto err_rq;
4164         }
4165
4166         if (must_be_locked) {
4167                 down_read(&rbd_dev->lock_rwsem);
4168                 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4169                     !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
4170                         rbd_wait_state_locked(rbd_dev);
4171
4172                 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4173                         !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4174                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4175                         result = -EBLACKLISTED;
4176                         goto err_unlock;
4177                 }
4178         }
4179
4180         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4181                                              snapc);
4182         if (!img_request) {
4183                 result = -ENOMEM;
4184                 goto err_unlock;
4185         }
4186         img_request->rq = rq;
4187         snapc = NULL; /* img_request consumes a ref */
4188
4189         if (op_type == OBJ_OP_DISCARD)
4190                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4191                                               NULL);
4192         else
4193                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4194                                               rq->bio);
4195         if (result)
4196                 goto err_img_request;
4197
4198         result = rbd_img_request_submit(img_request);
4199         if (result)
4200                 goto err_img_request;
4201
4202         if (must_be_locked)
4203                 up_read(&rbd_dev->lock_rwsem);
4204         return;
4205
4206 err_img_request:
4207         rbd_img_request_put(img_request);
4208 err_unlock:
4209         if (must_be_locked)
4210                 up_read(&rbd_dev->lock_rwsem);
4211 err_rq:
4212         if (result)
4213                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4214                          obj_op_name(op_type), length, offset, result);
4215         ceph_put_snap_context(snapc);
4216 err:
4217         blk_mq_end_request(rq, result);
4218 }
4219
4220 static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4221                 const struct blk_mq_queue_data *bd)
4222 {
4223         struct request *rq = bd->rq;
4224         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4225
4226         queue_work(rbd_wq, work);
4227         return BLK_MQ_RQ_QUEUE_OK;
4228 }
4229
4230 static void rbd_free_disk(struct rbd_device *rbd_dev)
4231 {
4232         struct gendisk *disk = rbd_dev->disk;
4233
4234         if (!disk)
4235                 return;
4236
4237         rbd_dev->disk = NULL;
4238         if (disk->flags & GENHD_FL_UP) {
4239                 del_gendisk(disk);
4240                 if (disk->queue)
4241                         blk_cleanup_queue(disk->queue);
4242                 blk_mq_free_tag_set(&rbd_dev->tag_set);
4243         }
4244         put_disk(disk);
4245 }
4246
4247 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4248                                 const char *object_name,
4249                                 u64 offset, u64 length, void *buf)
4250
4251 {
4252         struct rbd_obj_request *obj_request;
4253         struct page **pages = NULL;
4254         u32 page_count;
4255         size_t size;
4256         int ret;
4257
4258         page_count = (u32) calc_pages_for(offset, length);
4259         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4260         if (IS_ERR(pages))
4261                 return PTR_ERR(pages);
4262
4263         ret = -ENOMEM;
4264         obj_request = rbd_obj_request_create(object_name, offset, length,
4265                                                         OBJ_REQUEST_PAGES);
4266         if (!obj_request)
4267                 goto out;
4268
4269         obj_request->pages = pages;
4270         obj_request->page_count = page_count;
4271
4272         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4273                                                   obj_request);
4274         if (!obj_request->osd_req)
4275                 goto out;
4276
4277         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
4278                                         offset, length, 0, 0);
4279         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
4280                                         obj_request->pages,
4281                                         obj_request->length,
4282                                         obj_request->offset & ~PAGE_MASK,
4283                                         false, false);
4284
4285         rbd_obj_request_submit(obj_request);
4286         ret = rbd_obj_request_wait(obj_request);
4287         if (ret)
4288                 goto out;
4289
4290         ret = obj_request->result;
4291         if (ret < 0)
4292                 goto out;
4293
4294         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
4295         size = (size_t) obj_request->xferred;
4296         ceph_copy_from_page_vector(pages, buf, 0, size);
4297         rbd_assert(size <= (size_t)INT_MAX);
4298         ret = (int)size;
4299 out:
4300         if (obj_request)
4301                 rbd_obj_request_put(obj_request);
4302         else
4303                 ceph_release_page_vector(pages, page_count);
4304
4305         return ret;
4306 }
4307
4308 /*
4309  * Read the complete header for the given rbd device.  On successful
4310  * return, the rbd_dev->header field will contain up-to-date
4311  * information about the image.
4312  */
4313 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4314 {
4315         struct rbd_image_header_ondisk *ondisk = NULL;
4316         u32 snap_count = 0;
4317         u64 names_size = 0;
4318         u32 want_count;
4319         int ret;
4320
4321         /*
4322          * The complete header will include an array of its 64-bit
4323          * snapshot ids, followed by the names of those snapshots as
4324          * a contiguous block of NUL-terminated strings.  Note that
4325          * the number of snapshots could change by the time we read
4326          * it in, in which case we re-read it.
4327          */
4328         do {
4329                 size_t size;
4330
4331                 kfree(ondisk);
4332
4333                 size = sizeof (*ondisk);
4334                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4335                 size += names_size;
4336                 ondisk = kmalloc(size, GFP_KERNEL);
4337                 if (!ondisk)
4338                         return -ENOMEM;
4339
4340                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
4341                                        0, size, ondisk);
4342                 if (ret < 0)
4343                         goto out;
4344                 if ((size_t)ret < size) {
4345                         ret = -ENXIO;
4346                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4347                                 size, ret);
4348                         goto out;
4349                 }
4350                 if (!rbd_dev_ondisk_valid(ondisk)) {
4351                         ret = -ENXIO;
4352                         rbd_warn(rbd_dev, "invalid header");
4353                         goto out;
4354                 }
4355
4356                 names_size = le64_to_cpu(ondisk->snap_names_len);
4357                 want_count = snap_count;
4358                 snap_count = le32_to_cpu(ondisk->snap_count);
4359         } while (snap_count != want_count);
4360
4361         ret = rbd_header_from_disk(rbd_dev, ondisk);
4362 out:
4363         kfree(ondisk);
4364
4365         return ret;
4366 }
4367
4368 /*
4369  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4370  * has disappeared from the (just updated) snapshot context.
4371  */
4372 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4373 {
4374         u64 snap_id;
4375
4376         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4377                 return;
4378
4379         snap_id = rbd_dev->spec->snap_id;
4380         if (snap_id == CEPH_NOSNAP)
4381                 return;
4382
4383         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4384                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4385 }
4386
4387 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4388 {
4389         sector_t size;
4390
4391         /*
4392          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4393          * try to update its size.  If REMOVING is set, updating size
4394          * is just useless work since the device can't be opened.
4395          */
4396         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4397             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4398                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4399                 dout("setting size to %llu sectors", (unsigned long long)size);
4400                 set_capacity(rbd_dev->disk, size);
4401                 revalidate_disk(rbd_dev->disk);
4402         }
4403 }
4404
4405 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4406 {
4407         u64 mapping_size;
4408         int ret;
4409
4410         down_write(&rbd_dev->header_rwsem);
4411         mapping_size = rbd_dev->mapping.size;
4412
4413         ret = rbd_dev_header_info(rbd_dev);
4414         if (ret)
4415                 goto out;
4416
4417         /*
4418          * If there is a parent, see if it has disappeared due to the
4419          * mapped image getting flattened.
4420          */
4421         if (rbd_dev->parent) {
4422                 ret = rbd_dev_v2_parent_info(rbd_dev);
4423                 if (ret)
4424                         goto out;
4425         }
4426
4427         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4428                 rbd_dev->mapping.size = rbd_dev->header.image_size;
4429         } else {
4430                 /* validate mapped snapshot's EXISTS flag */
4431                 rbd_exists_validate(rbd_dev);
4432         }
4433
4434 out:
4435         up_write(&rbd_dev->header_rwsem);
4436         if (!ret && mapping_size != rbd_dev->mapping.size)
4437                 rbd_dev_update_size(rbd_dev);
4438
4439         return ret;
4440 }
4441
4442 static int rbd_init_request(void *data, struct request *rq,
4443                 unsigned int hctx_idx, unsigned int request_idx,
4444                 unsigned int numa_node)
4445 {
4446         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4447
4448         INIT_WORK(work, rbd_queue_workfn);
4449         return 0;
4450 }
4451
4452 static struct blk_mq_ops rbd_mq_ops = {
4453         .queue_rq       = rbd_queue_rq,
4454         .init_request   = rbd_init_request,
4455 };
4456
4457 static int rbd_init_disk(struct rbd_device *rbd_dev)
4458 {
4459         struct gendisk *disk;
4460         struct request_queue *q;
4461         u64 segment_size;
4462         int err;
4463
4464         /* create gendisk info */
4465         disk = alloc_disk(single_major ?
4466                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4467                           RBD_MINORS_PER_MAJOR);
4468         if (!disk)
4469                 return -ENOMEM;
4470
4471         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4472                  rbd_dev->dev_id);
4473         disk->major = rbd_dev->major;
4474         disk->first_minor = rbd_dev->minor;
4475         if (single_major)
4476                 disk->flags |= GENHD_FL_EXT_DEVT;
4477         disk->fops = &rbd_bd_ops;
4478         disk->private_data = rbd_dev;
4479
4480         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4481         rbd_dev->tag_set.ops = &rbd_mq_ops;
4482         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4483         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4484         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4485         rbd_dev->tag_set.nr_hw_queues = 1;
4486         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4487
4488         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4489         if (err)
4490                 goto out_disk;
4491
4492         q = blk_mq_init_queue(&rbd_dev->tag_set);
4493         if (IS_ERR(q)) {
4494                 err = PTR_ERR(q);
4495                 goto out_tag_set;
4496         }
4497
4498         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4499         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4500
4501         /* set io sizes to object size */
4502         segment_size = rbd_obj_bytes(&rbd_dev->header);
4503         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4504         q->limits.max_sectors = queue_max_hw_sectors(q);
4505         blk_queue_max_segments(q, USHRT_MAX);
4506         blk_queue_max_segment_size(q, segment_size);
4507         blk_queue_io_min(q, segment_size);
4508         blk_queue_io_opt(q, segment_size);
4509
4510         /* enable the discard support */
4511         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4512         q->limits.discard_granularity = segment_size;
4513         q->limits.discard_alignment = segment_size;
4514         blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4515         q->limits.discard_zeroes_data = 1;
4516
4517         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4518                 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
4519
4520         disk->queue = q;
4521
4522         q->queuedata = rbd_dev;
4523
4524         rbd_dev->disk = disk;
4525
4526         return 0;
4527 out_tag_set:
4528         blk_mq_free_tag_set(&rbd_dev->tag_set);
4529 out_disk:
4530         put_disk(disk);
4531         return err;
4532 }
4533
4534 /*
4535   sysfs
4536 */
4537
4538 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4539 {
4540         return container_of(dev, struct rbd_device, dev);
4541 }
4542
4543 static ssize_t rbd_size_show(struct device *dev,
4544                              struct device_attribute *attr, char *buf)
4545 {
4546         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4547
4548         return sprintf(buf, "%llu\n",
4549                 (unsigned long long)rbd_dev->mapping.size);
4550 }
4551
4552 /*
4553  * Note this shows the features for whatever's mapped, which is not
4554  * necessarily the base image.
4555  */
4556 static ssize_t rbd_features_show(struct device *dev,
4557                              struct device_attribute *attr, char *buf)
4558 {
4559         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4560
4561         return sprintf(buf, "0x%016llx\n",
4562                         (unsigned long long)rbd_dev->mapping.features);
4563 }
4564
4565 static ssize_t rbd_major_show(struct device *dev,
4566                               struct device_attribute *attr, char *buf)
4567 {
4568         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4569
4570         if (rbd_dev->major)
4571                 return sprintf(buf, "%d\n", rbd_dev->major);
4572
4573         return sprintf(buf, "(none)\n");
4574 }
4575
4576 static ssize_t rbd_minor_show(struct device *dev,
4577                               struct device_attribute *attr, char *buf)
4578 {
4579         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4580
4581         return sprintf(buf, "%d\n", rbd_dev->minor);
4582 }
4583
4584 static ssize_t rbd_client_addr_show(struct device *dev,
4585                                     struct device_attribute *attr, char *buf)
4586 {
4587         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4588         struct ceph_entity_addr *client_addr =
4589             ceph_client_addr(rbd_dev->rbd_client->client);
4590
4591         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4592                        le32_to_cpu(client_addr->nonce));
4593 }
4594
4595 static ssize_t rbd_client_id_show(struct device *dev,
4596                                   struct device_attribute *attr, char *buf)
4597 {
4598         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4599
4600         return sprintf(buf, "client%lld\n",
4601                        ceph_client_gid(rbd_dev->rbd_client->client));
4602 }
4603
4604 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4605                                      struct device_attribute *attr, char *buf)
4606 {
4607         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4608
4609         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4610 }
4611
4612 static ssize_t rbd_config_info_show(struct device *dev,
4613                                     struct device_attribute *attr, char *buf)
4614 {
4615         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4616
4617         if (!capable(CAP_SYS_ADMIN))
4618                 return -EPERM;
4619
4620         return sprintf(buf, "%s\n", rbd_dev->config_info);
4621 }
4622
4623 static ssize_t rbd_pool_show(struct device *dev,
4624                              struct device_attribute *attr, char *buf)
4625 {
4626         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4627
4628         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4629 }
4630
4631 static ssize_t rbd_pool_id_show(struct device *dev,
4632                              struct device_attribute *attr, char *buf)
4633 {
4634         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4635
4636         return sprintf(buf, "%llu\n",
4637                         (unsigned long long) rbd_dev->spec->pool_id);
4638 }
4639
4640 static ssize_t rbd_name_show(struct device *dev,
4641                              struct device_attribute *attr, char *buf)
4642 {
4643         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4644
4645         if (rbd_dev->spec->image_name)
4646                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4647
4648         return sprintf(buf, "(unknown)\n");
4649 }
4650
4651 static ssize_t rbd_image_id_show(struct device *dev,
4652                              struct device_attribute *attr, char *buf)
4653 {
4654         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4655
4656         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4657 }
4658
4659 /*
4660  * Shows the name of the currently-mapped snapshot (or
4661  * RBD_SNAP_HEAD_NAME for the base image).
4662  */
4663 static ssize_t rbd_snap_show(struct device *dev,
4664                              struct device_attribute *attr,
4665                              char *buf)
4666 {
4667         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4668
4669         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4670 }
4671
4672 static ssize_t rbd_snap_id_show(struct device *dev,
4673                                 struct device_attribute *attr, char *buf)
4674 {
4675         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4676
4677         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4678 }
4679
4680 /*
4681  * For a v2 image, shows the chain of parent images, separated by empty
4682  * lines.  For v1 images or if there is no parent, shows "(no parent
4683  * image)".
4684  */
4685 static ssize_t rbd_parent_show(struct device *dev,
4686                                struct device_attribute *attr,
4687                                char *buf)
4688 {
4689         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4690         ssize_t count = 0;
4691
4692         if (!rbd_dev->parent)
4693                 return sprintf(buf, "(no parent image)\n");
4694
4695         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4696                 struct rbd_spec *spec = rbd_dev->parent_spec;
4697
4698                 count += sprintf(&buf[count], "%s"
4699                             "pool_id %llu\npool_name %s\n"
4700                             "image_id %s\nimage_name %s\n"
4701                             "snap_id %llu\nsnap_name %s\n"
4702                             "overlap %llu\n",
4703                             !count ? "" : "\n", /* first? */
4704                             spec->pool_id, spec->pool_name,
4705                             spec->image_id, spec->image_name ?: "(unknown)",
4706                             spec->snap_id, spec->snap_name,
4707                             rbd_dev->parent_overlap);
4708         }
4709
4710         return count;
4711 }
4712
4713 static ssize_t rbd_image_refresh(struct device *dev,
4714                                  struct device_attribute *attr,
4715                                  const char *buf,
4716                                  size_t size)
4717 {
4718         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4719         int ret;
4720
4721         if (!capable(CAP_SYS_ADMIN))
4722                 return -EPERM;
4723
4724         ret = rbd_dev_refresh(rbd_dev);
4725         if (ret)
4726                 return ret;
4727
4728         return size;
4729 }
4730
4731 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4732 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4733 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4734 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4735 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4736 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4737 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4738 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4739 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4740 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4741 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4742 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4743 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4744 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4745 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4746 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4747
4748 static struct attribute *rbd_attrs[] = {
4749         &dev_attr_size.attr,
4750         &dev_attr_features.attr,
4751         &dev_attr_major.attr,
4752         &dev_attr_minor.attr,
4753         &dev_attr_client_addr.attr,
4754         &dev_attr_client_id.attr,
4755         &dev_attr_cluster_fsid.attr,
4756         &dev_attr_config_info.attr,
4757         &dev_attr_pool.attr,
4758         &dev_attr_pool_id.attr,
4759         &dev_attr_name.attr,
4760         &dev_attr_image_id.attr,
4761         &dev_attr_current_snap.attr,
4762         &dev_attr_snap_id.attr,
4763         &dev_attr_parent.attr,
4764         &dev_attr_refresh.attr,
4765         NULL
4766 };
4767
4768 static struct attribute_group rbd_attr_group = {
4769         .attrs = rbd_attrs,
4770 };
4771
4772 static const struct attribute_group *rbd_attr_groups[] = {
4773         &rbd_attr_group,
4774         NULL
4775 };
4776
4777 static void rbd_dev_release(struct device *dev);
4778
4779 static struct device_type rbd_device_type = {
4780         .name           = "rbd",
4781         .groups         = rbd_attr_groups,
4782         .release        = rbd_dev_release,
4783 };
4784
4785 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4786 {
4787         kref_get(&spec->kref);
4788
4789         return spec;
4790 }
4791
4792 static void rbd_spec_free(struct kref *kref);
4793 static void rbd_spec_put(struct rbd_spec *spec)
4794 {
4795         if (spec)
4796                 kref_put(&spec->kref, rbd_spec_free);
4797 }
4798
4799 static struct rbd_spec *rbd_spec_alloc(void)
4800 {
4801         struct rbd_spec *spec;
4802
4803         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4804         if (!spec)
4805                 return NULL;
4806
4807         spec->pool_id = CEPH_NOPOOL;
4808         spec->snap_id = CEPH_NOSNAP;
4809         kref_init(&spec->kref);
4810
4811         return spec;
4812 }
4813
4814 static void rbd_spec_free(struct kref *kref)
4815 {
4816         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4817
4818         kfree(spec->pool_name);
4819         kfree(spec->image_id);
4820         kfree(spec->image_name);
4821         kfree(spec->snap_name);
4822         kfree(spec);
4823 }
4824
4825 static void rbd_dev_free(struct rbd_device *rbd_dev)
4826 {
4827         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4828         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4829
4830         ceph_oid_destroy(&rbd_dev->header_oid);
4831         ceph_oloc_destroy(&rbd_dev->header_oloc);
4832         kfree(rbd_dev->config_info);
4833
4834         rbd_put_client(rbd_dev->rbd_client);
4835         rbd_spec_put(rbd_dev->spec);
4836         kfree(rbd_dev->opts);
4837         kfree(rbd_dev);
4838 }
4839
4840 static void rbd_dev_release(struct device *dev)
4841 {
4842         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4843         bool need_put = !!rbd_dev->opts;
4844
4845         if (need_put) {
4846                 destroy_workqueue(rbd_dev->task_wq);
4847                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4848         }
4849
4850         rbd_dev_free(rbd_dev);
4851
4852         /*
4853          * This is racy, but way better than putting module outside of
4854          * the release callback.  The race window is pretty small, so
4855          * doing something similar to dm (dm-builtin.c) is overkill.
4856          */
4857         if (need_put)
4858                 module_put(THIS_MODULE);
4859 }
4860
4861 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4862                                            struct rbd_spec *spec)
4863 {
4864         struct rbd_device *rbd_dev;
4865
4866         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4867         if (!rbd_dev)
4868                 return NULL;
4869
4870         spin_lock_init(&rbd_dev->lock);
4871         INIT_LIST_HEAD(&rbd_dev->node);
4872         init_rwsem(&rbd_dev->header_rwsem);
4873
4874         ceph_oid_init(&rbd_dev->header_oid);
4875         ceph_oloc_init(&rbd_dev->header_oloc);
4876
4877         mutex_init(&rbd_dev->watch_mutex);
4878         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4879         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4880
4881         init_rwsem(&rbd_dev->lock_rwsem);
4882         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4883         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4884         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4885         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4886         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4887         init_waitqueue_head(&rbd_dev->lock_waitq);
4888
4889         rbd_dev->dev.bus = &rbd_bus_type;
4890         rbd_dev->dev.type = &rbd_device_type;
4891         rbd_dev->dev.parent = &rbd_root_dev;
4892         device_initialize(&rbd_dev->dev);
4893
4894         rbd_dev->rbd_client = rbdc;
4895         rbd_dev->spec = spec;
4896
4897         rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
4898         rbd_dev->layout.stripe_count = 1;
4899         rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
4900         rbd_dev->layout.pool_id = spec->pool_id;
4901         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
4902
4903         return rbd_dev;
4904 }
4905
4906 /*
4907  * Create a mapping rbd_dev.
4908  */
4909 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4910                                          struct rbd_spec *spec,
4911                                          struct rbd_options *opts)
4912 {
4913         struct rbd_device *rbd_dev;
4914
4915         rbd_dev = __rbd_dev_create(rbdc, spec);
4916         if (!rbd_dev)
4917                 return NULL;
4918
4919         rbd_dev->opts = opts;
4920
4921         /* get an id and fill in device name */
4922         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4923                                          minor_to_rbd_dev_id(1 << MINORBITS),
4924                                          GFP_KERNEL);
4925         if (rbd_dev->dev_id < 0)
4926                 goto fail_rbd_dev;
4927
4928         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4929         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4930                                                    rbd_dev->name);
4931         if (!rbd_dev->task_wq)
4932                 goto fail_dev_id;
4933
4934         /* we have a ref from do_rbd_add() */
4935         __module_get(THIS_MODULE);
4936
4937         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4938         return rbd_dev;
4939
4940 fail_dev_id:
4941         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4942 fail_rbd_dev:
4943         rbd_dev_free(rbd_dev);
4944         return NULL;
4945 }
4946
4947 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4948 {
4949         if (rbd_dev)
4950                 put_device(&rbd_dev->dev);
4951 }
4952
4953 /*
4954  * Get the size and object order for an image snapshot, or if
4955  * snap_id is CEPH_NOSNAP, gets this information for the base
4956  * image.
4957  */
4958 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4959                                 u8 *order, u64 *snap_size)
4960 {
4961         __le64 snapid = cpu_to_le64(snap_id);
4962         int ret;
4963         struct {
4964                 u8 order;
4965                 __le64 size;
4966         } __attribute__ ((packed)) size_buf = { 0 };
4967
4968         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4969                                 "rbd", "get_size",
4970                                 &snapid, sizeof (snapid),
4971                                 &size_buf, sizeof (size_buf));
4972         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4973         if (ret < 0)
4974                 return ret;
4975         if (ret < sizeof (size_buf))
4976                 return -ERANGE;
4977
4978         if (order) {
4979                 *order = size_buf.order;
4980                 dout("  order %u", (unsigned int)*order);
4981         }
4982         *snap_size = le64_to_cpu(size_buf.size);
4983
4984         dout("  snap_id 0x%016llx snap_size = %llu\n",
4985                 (unsigned long long)snap_id,
4986                 (unsigned long long)*snap_size);
4987
4988         return 0;
4989 }
4990
4991 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4992 {
4993         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4994                                         &rbd_dev->header.obj_order,
4995                                         &rbd_dev->header.image_size);
4996 }
4997
4998 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4999 {
5000         void *reply_buf;
5001         int ret;
5002         void *p;
5003
5004         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
5005         if (!reply_buf)
5006                 return -ENOMEM;
5007
5008         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5009                                 "rbd", "get_object_prefix", NULL, 0,
5010                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
5011         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5012         if (ret < 0)
5013                 goto out;
5014
5015         p = reply_buf;
5016         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5017                                                 p + ret, NULL, GFP_NOIO);
5018         ret = 0;
5019
5020         if (IS_ERR(rbd_dev->header.object_prefix)) {
5021                 ret = PTR_ERR(rbd_dev->header.object_prefix);
5022                 rbd_dev->header.object_prefix = NULL;
5023         } else {
5024                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5025         }
5026 out:
5027         kfree(reply_buf);
5028
5029         return ret;
5030 }
5031
5032 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5033                 u64 *snap_features)
5034 {
5035         __le64 snapid = cpu_to_le64(snap_id);
5036         struct {
5037                 __le64 features;
5038                 __le64 incompat;
5039         } __attribute__ ((packed)) features_buf = { 0 };
5040         u64 unsup;
5041         int ret;
5042
5043         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5044                                 "rbd", "get_features",
5045                                 &snapid, sizeof (snapid),
5046                                 &features_buf, sizeof (features_buf));
5047         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5048         if (ret < 0)
5049                 return ret;
5050         if (ret < sizeof (features_buf))
5051                 return -ERANGE;
5052
5053         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5054         if (unsup) {
5055                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5056                          unsup);
5057                 return -ENXIO;
5058         }
5059
5060         *snap_features = le64_to_cpu(features_buf.features);
5061
5062         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5063                 (unsigned long long)snap_id,
5064                 (unsigned long long)*snap_features,
5065                 (unsigned long long)le64_to_cpu(features_buf.incompat));
5066
5067         return 0;
5068 }
5069
5070 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5071 {
5072         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5073                                                 &rbd_dev->header.features);
5074 }
5075
5076 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5077 {
5078         struct rbd_spec *parent_spec;
5079         size_t size;
5080         void *reply_buf = NULL;
5081         __le64 snapid;
5082         void *p;
5083         void *end;
5084         u64 pool_id;
5085         char *image_id;
5086         u64 snap_id;
5087         u64 overlap;
5088         int ret;
5089
5090         parent_spec = rbd_spec_alloc();
5091         if (!parent_spec)
5092                 return -ENOMEM;
5093
5094         size = sizeof (__le64) +                                /* pool_id */
5095                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
5096                 sizeof (__le64) +                               /* snap_id */
5097                 sizeof (__le64);                                /* overlap */
5098         reply_buf = kmalloc(size, GFP_KERNEL);
5099         if (!reply_buf) {
5100                 ret = -ENOMEM;
5101                 goto out_err;
5102         }
5103
5104         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5105         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5106                                 "rbd", "get_parent",
5107                                 &snapid, sizeof (snapid),
5108                                 reply_buf, size);
5109         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5110         if (ret < 0)
5111                 goto out_err;
5112
5113         p = reply_buf;
5114         end = reply_buf + ret;
5115         ret = -ERANGE;
5116         ceph_decode_64_safe(&p, end, pool_id, out_err);
5117         if (pool_id == CEPH_NOPOOL) {
5118                 /*
5119                  * Either the parent never existed, or we have
5120                  * record of it but the image got flattened so it no
5121                  * longer has a parent.  When the parent of a
5122                  * layered image disappears we immediately set the
5123                  * overlap to 0.  The effect of this is that all new
5124                  * requests will be treated as if the image had no
5125                  * parent.
5126                  */
5127                 if (rbd_dev->parent_overlap) {
5128                         rbd_dev->parent_overlap = 0;
5129                         rbd_dev_parent_put(rbd_dev);
5130                         pr_info("%s: clone image has been flattened\n",
5131                                 rbd_dev->disk->disk_name);
5132                 }
5133
5134                 goto out;       /* No parent?  No problem. */
5135         }
5136
5137         /* The ceph file layout needs to fit pool id in 32 bits */
5138
5139         ret = -EIO;
5140         if (pool_id > (u64)U32_MAX) {
5141                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5142                         (unsigned long long)pool_id, U32_MAX);
5143                 goto out_err;
5144         }
5145
5146         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5147         if (IS_ERR(image_id)) {
5148                 ret = PTR_ERR(image_id);
5149                 goto out_err;
5150         }
5151         ceph_decode_64_safe(&p, end, snap_id, out_err);
5152         ceph_decode_64_safe(&p, end, overlap, out_err);
5153
5154         /*
5155          * The parent won't change (except when the clone is
5156          * flattened, already handled that).  So we only need to
5157          * record the parent spec we have not already done so.
5158          */
5159         if (!rbd_dev->parent_spec) {
5160                 parent_spec->pool_id = pool_id;
5161                 parent_spec->image_id = image_id;
5162                 parent_spec->snap_id = snap_id;
5163                 rbd_dev->parent_spec = parent_spec;
5164                 parent_spec = NULL;     /* rbd_dev now owns this */
5165         } else {
5166                 kfree(image_id);
5167         }
5168
5169         /*
5170          * We always update the parent overlap.  If it's zero we issue
5171          * a warning, as we will proceed as if there was no parent.
5172          */
5173         if (!overlap) {
5174                 if (parent_spec) {
5175                         /* refresh, careful to warn just once */
5176                         if (rbd_dev->parent_overlap)
5177                                 rbd_warn(rbd_dev,
5178                                     "clone now standalone (overlap became 0)");
5179                 } else {
5180                         /* initial probe */
5181                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5182                 }
5183         }
5184         rbd_dev->parent_overlap = overlap;
5185
5186 out:
5187         ret = 0;
5188 out_err:
5189         kfree(reply_buf);
5190         rbd_spec_put(parent_spec);
5191
5192         return ret;
5193 }
5194
5195 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5196 {
5197         struct {
5198                 __le64 stripe_unit;
5199                 __le64 stripe_count;
5200         } __attribute__ ((packed)) striping_info_buf = { 0 };
5201         size_t size = sizeof (striping_info_buf);
5202         void *p;
5203         u64 obj_size;
5204         u64 stripe_unit;
5205         u64 stripe_count;
5206         int ret;
5207
5208         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5209                                 "rbd", "get_stripe_unit_count", NULL, 0,
5210                                 (char *)&striping_info_buf, size);
5211         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5212         if (ret < 0)
5213                 return ret;
5214         if (ret < size)
5215                 return -ERANGE;
5216
5217         /*
5218          * We don't actually support the "fancy striping" feature
5219          * (STRIPINGV2) yet, but if the striping sizes are the
5220          * defaults the behavior is the same as before.  So find
5221          * out, and only fail if the image has non-default values.
5222          */
5223         ret = -EINVAL;
5224         obj_size = (u64)1 << rbd_dev->header.obj_order;
5225         p = &striping_info_buf;
5226         stripe_unit = ceph_decode_64(&p);
5227         if (stripe_unit != obj_size) {
5228                 rbd_warn(rbd_dev, "unsupported stripe unit "
5229                                 "(got %llu want %llu)",
5230                                 stripe_unit, obj_size);
5231                 return -EINVAL;
5232         }
5233         stripe_count = ceph_decode_64(&p);
5234         if (stripe_count != 1) {
5235                 rbd_warn(rbd_dev, "unsupported stripe count "
5236                                 "(got %llu want 1)", stripe_count);
5237                 return -EINVAL;
5238         }
5239         rbd_dev->header.stripe_unit = stripe_unit;
5240         rbd_dev->header.stripe_count = stripe_count;
5241
5242         return 0;
5243 }
5244
5245 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5246 {
5247         size_t image_id_size;
5248         char *image_id;
5249         void *p;
5250         void *end;
5251         size_t size;
5252         void *reply_buf = NULL;
5253         size_t len = 0;
5254         char *image_name = NULL;
5255         int ret;
5256
5257         rbd_assert(!rbd_dev->spec->image_name);
5258
5259         len = strlen(rbd_dev->spec->image_id);
5260         image_id_size = sizeof (__le32) + len;
5261         image_id = kmalloc(image_id_size, GFP_KERNEL);
5262         if (!image_id)
5263                 return NULL;
5264
5265         p = image_id;
5266         end = image_id + image_id_size;
5267         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5268
5269         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5270         reply_buf = kmalloc(size, GFP_KERNEL);
5271         if (!reply_buf)
5272                 goto out;
5273
5274         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
5275                                 "rbd", "dir_get_name",
5276                                 image_id, image_id_size,
5277                                 reply_buf, size);
5278         if (ret < 0)
5279                 goto out;
5280         p = reply_buf;
5281         end = reply_buf + ret;
5282
5283         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5284         if (IS_ERR(image_name))
5285                 image_name = NULL;
5286         else
5287                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5288 out:
5289         kfree(reply_buf);
5290         kfree(image_id);
5291
5292         return image_name;
5293 }
5294
5295 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5296 {
5297         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5298         const char *snap_name;
5299         u32 which = 0;
5300
5301         /* Skip over names until we find the one we are looking for */
5302
5303         snap_name = rbd_dev->header.snap_names;
5304         while (which < snapc->num_snaps) {
5305                 if (!strcmp(name, snap_name))
5306                         return snapc->snaps[which];
5307                 snap_name += strlen(snap_name) + 1;
5308                 which++;
5309         }
5310         return CEPH_NOSNAP;
5311 }
5312
5313 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5314 {
5315         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5316         u32 which;
5317         bool found = false;
5318         u64 snap_id;
5319
5320         for (which = 0; !found && which < snapc->num_snaps; which++) {
5321                 const char *snap_name;
5322
5323                 snap_id = snapc->snaps[which];
5324                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5325                 if (IS_ERR(snap_name)) {
5326                         /* ignore no-longer existing snapshots */
5327                         if (PTR_ERR(snap_name) == -ENOENT)
5328                                 continue;
5329                         else
5330                                 break;
5331                 }
5332                 found = !strcmp(name, snap_name);
5333                 kfree(snap_name);
5334         }
5335         return found ? snap_id : CEPH_NOSNAP;
5336 }
5337
5338 /*
5339  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5340  * no snapshot by that name is found, or if an error occurs.
5341  */
5342 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5343 {
5344         if (rbd_dev->image_format == 1)
5345                 return rbd_v1_snap_id_by_name(rbd_dev, name);
5346
5347         return rbd_v2_snap_id_by_name(rbd_dev, name);
5348 }
5349
5350 /*
5351  * An image being mapped will have everything but the snap id.
5352  */
5353 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5354 {
5355         struct rbd_spec *spec = rbd_dev->spec;
5356
5357         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5358         rbd_assert(spec->image_id && spec->image_name);
5359         rbd_assert(spec->snap_name);
5360
5361         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5362                 u64 snap_id;
5363
5364                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5365                 if (snap_id == CEPH_NOSNAP)
5366                         return -ENOENT;
5367
5368                 spec->snap_id = snap_id;
5369         } else {
5370                 spec->snap_id = CEPH_NOSNAP;
5371         }
5372
5373         return 0;
5374 }
5375
5376 /*
5377  * A parent image will have all ids but none of the names.
5378  *
5379  * All names in an rbd spec are dynamically allocated.  It's OK if we
5380  * can't figure out the name for an image id.
5381  */
5382 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5383 {
5384         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5385         struct rbd_spec *spec = rbd_dev->spec;
5386         const char *pool_name;
5387         const char *image_name;
5388         const char *snap_name;
5389         int ret;
5390
5391         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5392         rbd_assert(spec->image_id);
5393         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5394
5395         /* Get the pool name; we have to make our own copy of this */
5396
5397         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5398         if (!pool_name) {
5399                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5400                 return -EIO;
5401         }
5402         pool_name = kstrdup(pool_name, GFP_KERNEL);
5403         if (!pool_name)
5404                 return -ENOMEM;
5405
5406         /* Fetch the image name; tolerate failure here */
5407
5408         image_name = rbd_dev_image_name(rbd_dev);
5409         if (!image_name)
5410                 rbd_warn(rbd_dev, "unable to get image name");
5411
5412         /* Fetch the snapshot name */
5413
5414         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5415         if (IS_ERR(snap_name)) {
5416                 ret = PTR_ERR(snap_name);
5417                 goto out_err;
5418         }
5419
5420         spec->pool_name = pool_name;
5421         spec->image_name = image_name;
5422         spec->snap_name = snap_name;
5423
5424         return 0;
5425
5426 out_err:
5427         kfree(image_name);
5428         kfree(pool_name);
5429         return ret;
5430 }
5431
5432 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5433 {
5434         size_t size;
5435         int ret;
5436         void *reply_buf;
5437         void *p;
5438         void *end;
5439         u64 seq;
5440         u32 snap_count;
5441         struct ceph_snap_context *snapc;
5442         u32 i;
5443
5444         /*
5445          * We'll need room for the seq value (maximum snapshot id),
5446          * snapshot count, and array of that many snapshot ids.
5447          * For now we have a fixed upper limit on the number we're
5448          * prepared to receive.
5449          */
5450         size = sizeof (__le64) + sizeof (__le32) +
5451                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5452         reply_buf = kzalloc(size, GFP_KERNEL);
5453         if (!reply_buf)
5454                 return -ENOMEM;
5455
5456         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5457                                 "rbd", "get_snapcontext", NULL, 0,
5458                                 reply_buf, size);
5459         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5460         if (ret < 0)
5461                 goto out;
5462
5463         p = reply_buf;
5464         end = reply_buf + ret;
5465         ret = -ERANGE;
5466         ceph_decode_64_safe(&p, end, seq, out);
5467         ceph_decode_32_safe(&p, end, snap_count, out);
5468
5469         /*
5470          * Make sure the reported number of snapshot ids wouldn't go
5471          * beyond the end of our buffer.  But before checking that,
5472          * make sure the computed size of the snapshot context we
5473          * allocate is representable in a size_t.
5474          */
5475         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5476                                  / sizeof (u64)) {
5477                 ret = -EINVAL;
5478                 goto out;
5479         }
5480         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5481                 goto out;
5482         ret = 0;
5483
5484         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5485         if (!snapc) {
5486                 ret = -ENOMEM;
5487                 goto out;
5488         }
5489         snapc->seq = seq;
5490         for (i = 0; i < snap_count; i++)
5491                 snapc->snaps[i] = ceph_decode_64(&p);
5492
5493         ceph_put_snap_context(rbd_dev->header.snapc);
5494         rbd_dev->header.snapc = snapc;
5495
5496         dout("  snap context seq = %llu, snap_count = %u\n",
5497                 (unsigned long long)seq, (unsigned int)snap_count);
5498 out:
5499         kfree(reply_buf);
5500
5501         return ret;
5502 }
5503
5504 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5505                                         u64 snap_id)
5506 {
5507         size_t size;
5508         void *reply_buf;
5509         __le64 snapid;
5510         int ret;
5511         void *p;
5512         void *end;
5513         char *snap_name;
5514
5515         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5516         reply_buf = kmalloc(size, GFP_KERNEL);
5517         if (!reply_buf)
5518                 return ERR_PTR(-ENOMEM);
5519
5520         snapid = cpu_to_le64(snap_id);
5521         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5522                                 "rbd", "get_snapshot_name",
5523                                 &snapid, sizeof (snapid),
5524                                 reply_buf, size);
5525         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5526         if (ret < 0) {
5527                 snap_name = ERR_PTR(ret);
5528                 goto out;
5529         }
5530
5531         p = reply_buf;
5532         end = reply_buf + ret;
5533         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5534         if (IS_ERR(snap_name))
5535                 goto out;
5536
5537         dout("  snap_id 0x%016llx snap_name = %s\n",
5538                 (unsigned long long)snap_id, snap_name);
5539 out:
5540         kfree(reply_buf);
5541
5542         return snap_name;
5543 }
5544
5545 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5546 {
5547         bool first_time = rbd_dev->header.object_prefix == NULL;
5548         int ret;
5549
5550         ret = rbd_dev_v2_image_size(rbd_dev);
5551         if (ret)
5552                 return ret;
5553
5554         if (first_time) {
5555                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5556                 if (ret)
5557                         return ret;
5558         }
5559
5560         ret = rbd_dev_v2_snap_context(rbd_dev);
5561         if (ret && first_time) {
5562                 kfree(rbd_dev->header.object_prefix);
5563                 rbd_dev->header.object_prefix = NULL;
5564         }
5565
5566         return ret;
5567 }
5568
5569 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5570 {
5571         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5572
5573         if (rbd_dev->image_format == 1)
5574                 return rbd_dev_v1_header_info(rbd_dev);
5575
5576         return rbd_dev_v2_header_info(rbd_dev);
5577 }
5578
5579 /*
5580  * Skips over white space at *buf, and updates *buf to point to the
5581  * first found non-space character (if any). Returns the length of
5582  * the token (string of non-white space characters) found.  Note
5583  * that *buf must be terminated with '\0'.
5584  */
5585 static inline size_t next_token(const char **buf)
5586 {
5587         /*
5588         * These are the characters that produce nonzero for
5589         * isspace() in the "C" and "POSIX" locales.
5590         */
5591         const char *spaces = " \f\n\r\t\v";
5592
5593         *buf += strspn(*buf, spaces);   /* Find start of token */
5594
5595         return strcspn(*buf, spaces);   /* Return token length */
5596 }
5597
5598 /*
5599  * Finds the next token in *buf, dynamically allocates a buffer big
5600  * enough to hold a copy of it, and copies the token into the new
5601  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5602  * that a duplicate buffer is created even for a zero-length token.
5603  *
5604  * Returns a pointer to the newly-allocated duplicate, or a null
5605  * pointer if memory for the duplicate was not available.  If
5606  * the lenp argument is a non-null pointer, the length of the token
5607  * (not including the '\0') is returned in *lenp.
5608  *
5609  * If successful, the *buf pointer will be updated to point beyond
5610  * the end of the found token.
5611  *
5612  * Note: uses GFP_KERNEL for allocation.
5613  */
5614 static inline char *dup_token(const char **buf, size_t *lenp)
5615 {
5616         char *dup;
5617         size_t len;
5618
5619         len = next_token(buf);
5620         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5621         if (!dup)
5622                 return NULL;
5623         *(dup + len) = '\0';
5624         *buf += len;
5625
5626         if (lenp)
5627                 *lenp = len;
5628
5629         return dup;
5630 }
5631
5632 /*
5633  * Parse the options provided for an "rbd add" (i.e., rbd image
5634  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5635  * and the data written is passed here via a NUL-terminated buffer.
5636  * Returns 0 if successful or an error code otherwise.
5637  *
5638  * The information extracted from these options is recorded in
5639  * the other parameters which return dynamically-allocated
5640  * structures:
5641  *  ceph_opts
5642  *      The address of a pointer that will refer to a ceph options
5643  *      structure.  Caller must release the returned pointer using
5644  *      ceph_destroy_options() when it is no longer needed.
5645  *  rbd_opts
5646  *      Address of an rbd options pointer.  Fully initialized by
5647  *      this function; caller must release with kfree().
5648  *  spec
5649  *      Address of an rbd image specification pointer.  Fully
5650  *      initialized by this function based on parsed options.
5651  *      Caller must release with rbd_spec_put().
5652  *
5653  * The options passed take this form:
5654  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5655  * where:
5656  *  <mon_addrs>
5657  *      A comma-separated list of one or more monitor addresses.
5658  *      A monitor address is an ip address, optionally followed
5659  *      by a port number (separated by a colon).
5660  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5661  *  <options>
5662  *      A comma-separated list of ceph and/or rbd options.
5663  *  <pool_name>
5664  *      The name of the rados pool containing the rbd image.
5665  *  <image_name>
5666  *      The name of the image in that pool to map.
5667  *  <snap_id>
5668  *      An optional snapshot id.  If provided, the mapping will
5669  *      present data from the image at the time that snapshot was
5670  *      created.  The image head is used if no snapshot id is
5671  *      provided.  Snapshot mappings are always read-only.
5672  */
5673 static int rbd_add_parse_args(const char *buf,
5674                                 struct ceph_options **ceph_opts,
5675                                 struct rbd_options **opts,
5676                                 struct rbd_spec **rbd_spec)
5677 {
5678         size_t len;
5679         char *options;
5680         const char *mon_addrs;
5681         char *snap_name;
5682         size_t mon_addrs_size;
5683         struct rbd_spec *spec = NULL;
5684         struct rbd_options *rbd_opts = NULL;
5685         struct ceph_options *copts;
5686         int ret;
5687
5688         /* The first four tokens are required */
5689
5690         len = next_token(&buf);
5691         if (!len) {
5692                 rbd_warn(NULL, "no monitor address(es) provided");
5693                 return -EINVAL;
5694         }
5695         mon_addrs = buf;
5696         mon_addrs_size = len + 1;
5697         buf += len;
5698
5699         ret = -EINVAL;
5700         options = dup_token(&buf, NULL);
5701         if (!options)
5702                 return -ENOMEM;
5703         if (!*options) {
5704                 rbd_warn(NULL, "no options provided");
5705                 goto out_err;
5706         }
5707
5708         spec = rbd_spec_alloc();
5709         if (!spec)
5710                 goto out_mem;
5711
5712         spec->pool_name = dup_token(&buf, NULL);
5713         if (!spec->pool_name)
5714                 goto out_mem;
5715         if (!*spec->pool_name) {
5716                 rbd_warn(NULL, "no pool name provided");
5717                 goto out_err;
5718         }
5719
5720         spec->image_name = dup_token(&buf, NULL);
5721         if (!spec->image_name)
5722                 goto out_mem;
5723         if (!*spec->image_name) {
5724                 rbd_warn(NULL, "no image name provided");
5725                 goto out_err;
5726         }
5727
5728         /*
5729          * Snapshot name is optional; default is to use "-"
5730          * (indicating the head/no snapshot).
5731          */
5732         len = next_token(&buf);
5733         if (!len) {
5734                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5735                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5736         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5737                 ret = -ENAMETOOLONG;
5738                 goto out_err;
5739         }
5740         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5741         if (!snap_name)
5742                 goto out_mem;
5743         *(snap_name + len) = '\0';
5744         spec->snap_name = snap_name;
5745
5746         /* Initialize all rbd options to the defaults */
5747
5748         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5749         if (!rbd_opts)
5750                 goto out_mem;
5751
5752         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5753         rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5754         rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5755
5756         copts = ceph_parse_options(options, mon_addrs,
5757                                         mon_addrs + mon_addrs_size - 1,
5758                                         parse_rbd_opts_token, rbd_opts);
5759         if (IS_ERR(copts)) {
5760                 ret = PTR_ERR(copts);
5761                 goto out_err;
5762         }
5763         kfree(options);
5764
5765         *ceph_opts = copts;
5766         *opts = rbd_opts;
5767         *rbd_spec = spec;
5768
5769         return 0;
5770 out_mem:
5771         ret = -ENOMEM;
5772 out_err:
5773         kfree(rbd_opts);
5774         rbd_spec_put(spec);
5775         kfree(options);
5776
5777         return ret;
5778 }
5779
5780 /*
5781  * Return pool id (>= 0) or a negative error code.
5782  */
5783 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5784 {
5785         struct ceph_options *opts = rbdc->client->options;
5786         u64 newest_epoch;
5787         int tries = 0;
5788         int ret;
5789
5790 again:
5791         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5792         if (ret == -ENOENT && tries++ < 1) {
5793                 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5794                                             &newest_epoch);
5795                 if (ret < 0)
5796                         return ret;
5797
5798                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5799                         ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5800                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5801                                                      newest_epoch,
5802                                                      opts->mount_timeout);
5803                         goto again;
5804                 } else {
5805                         /* the osdmap we have is new enough */
5806                         return -ENOENT;
5807                 }
5808         }
5809
5810         return ret;
5811 }
5812
5813 /*
5814  * An rbd format 2 image has a unique identifier, distinct from the
5815  * name given to it by the user.  Internally, that identifier is
5816  * what's used to specify the names of objects related to the image.
5817  *
5818  * A special "rbd id" object is used to map an rbd image name to its
5819  * id.  If that object doesn't exist, then there is no v2 rbd image
5820  * with the supplied name.
5821  *
5822  * This function will record the given rbd_dev's image_id field if
5823  * it can be determined, and in that case will return 0.  If any
5824  * errors occur a negative errno will be returned and the rbd_dev's
5825  * image_id field will be unchanged (and should be NULL).
5826  */
5827 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5828 {
5829         int ret;
5830         size_t size;
5831         char *object_name;
5832         void *response;
5833         char *image_id;
5834
5835         /*
5836          * When probing a parent image, the image id is already
5837          * known (and the image name likely is not).  There's no
5838          * need to fetch the image id again in this case.  We
5839          * do still need to set the image format though.
5840          */
5841         if (rbd_dev->spec->image_id) {
5842                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5843
5844                 return 0;
5845         }
5846
5847         /*
5848          * First, see if the format 2 image id file exists, and if
5849          * so, get the image's persistent id from it.
5850          */
5851         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
5852         object_name = kmalloc(size, GFP_NOIO);
5853         if (!object_name)
5854                 return -ENOMEM;
5855         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
5856         dout("rbd id object name is %s\n", object_name);
5857
5858         /* Response will be an encoded string, which includes a length */
5859
5860         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5861         response = kzalloc(size, GFP_NOIO);
5862         if (!response) {
5863                 ret = -ENOMEM;
5864                 goto out;
5865         }
5866
5867         /* If it doesn't exist we'll assume it's a format 1 image */
5868
5869         ret = rbd_obj_method_sync(rbd_dev, object_name,
5870                                 "rbd", "get_id", NULL, 0,
5871                                 response, RBD_IMAGE_ID_LEN_MAX);
5872         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5873         if (ret == -ENOENT) {
5874                 image_id = kstrdup("", GFP_KERNEL);
5875                 ret = image_id ? 0 : -ENOMEM;
5876                 if (!ret)
5877                         rbd_dev->image_format = 1;
5878         } else if (ret >= 0) {
5879                 void *p = response;
5880
5881                 image_id = ceph_extract_encoded_string(&p, p + ret,
5882                                                 NULL, GFP_NOIO);
5883                 ret = PTR_ERR_OR_ZERO(image_id);
5884                 if (!ret)
5885                         rbd_dev->image_format = 2;
5886         }
5887
5888         if (!ret) {
5889                 rbd_dev->spec->image_id = image_id;
5890                 dout("image_id is %s\n", image_id);
5891         }
5892 out:
5893         kfree(response);
5894         kfree(object_name);
5895
5896         return ret;
5897 }
5898
5899 /*
5900  * Undo whatever state changes are made by v1 or v2 header info
5901  * call.
5902  */
5903 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5904 {
5905         struct rbd_image_header *header;
5906
5907         rbd_dev_parent_put(rbd_dev);
5908
5909         /* Free dynamic fields from the header, then zero it out */
5910
5911         header = &rbd_dev->header;
5912         ceph_put_snap_context(header->snapc);
5913         kfree(header->snap_sizes);
5914         kfree(header->snap_names);
5915         kfree(header->object_prefix);
5916         memset(header, 0, sizeof (*header));
5917 }
5918
5919 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5920 {
5921         int ret;
5922
5923         ret = rbd_dev_v2_object_prefix(rbd_dev);
5924         if (ret)
5925                 goto out_err;
5926
5927         /*
5928          * Get the and check features for the image.  Currently the
5929          * features are assumed to never change.
5930          */
5931         ret = rbd_dev_v2_features(rbd_dev);
5932         if (ret)
5933                 goto out_err;
5934
5935         /* If the image supports fancy striping, get its parameters */
5936
5937         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5938                 ret = rbd_dev_v2_striping_info(rbd_dev);
5939                 if (ret < 0)
5940                         goto out_err;
5941         }
5942         /* No support for crypto and compression type format 2 images */
5943
5944         return 0;
5945 out_err:
5946         rbd_dev->header.features = 0;
5947         kfree(rbd_dev->header.object_prefix);
5948         rbd_dev->header.object_prefix = NULL;
5949
5950         return ret;
5951 }
5952
5953 /*
5954  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5955  * rbd_dev_image_probe() recursion depth, which means it's also the
5956  * length of the already discovered part of the parent chain.
5957  */
5958 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5959 {
5960         struct rbd_device *parent = NULL;
5961         int ret;
5962
5963         if (!rbd_dev->parent_spec)
5964                 return 0;
5965
5966         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5967                 pr_info("parent chain is too long (%d)\n", depth);
5968                 ret = -EINVAL;
5969                 goto out_err;
5970         }
5971
5972         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5973         if (!parent) {
5974                 ret = -ENOMEM;
5975                 goto out_err;
5976         }
5977
5978         /*
5979          * Images related by parent/child relationships always share
5980          * rbd_client and spec/parent_spec, so bump their refcounts.
5981          */
5982         __rbd_get_client(rbd_dev->rbd_client);
5983         rbd_spec_get(rbd_dev->parent_spec);
5984
5985         ret = rbd_dev_image_probe(parent, depth);
5986         if (ret < 0)
5987                 goto out_err;
5988
5989         rbd_dev->parent = parent;
5990         atomic_set(&rbd_dev->parent_ref, 1);
5991         return 0;
5992
5993 out_err:
5994         rbd_dev_unparent(rbd_dev);
5995         rbd_dev_destroy(parent);
5996         return ret;
5997 }
5998
5999 /*
6000  * rbd_dev->header_rwsem must be locked for write and will be unlocked
6001  * upon return.
6002  */
6003 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6004 {
6005         int ret;
6006
6007         /* Record our major and minor device numbers. */
6008
6009         if (!single_major) {
6010                 ret = register_blkdev(0, rbd_dev->name);
6011                 if (ret < 0)
6012                         goto err_out_unlock;
6013
6014                 rbd_dev->major = ret;
6015                 rbd_dev->minor = 0;
6016         } else {
6017                 rbd_dev->major = rbd_major;
6018                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6019         }
6020
6021         /* Set up the blkdev mapping. */
6022
6023         ret = rbd_init_disk(rbd_dev);
6024         if (ret)
6025                 goto err_out_blkdev;
6026
6027         ret = rbd_dev_mapping_set(rbd_dev);
6028         if (ret)
6029                 goto err_out_disk;
6030
6031         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6032         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
6033
6034         dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6035         ret = device_add(&rbd_dev->dev);
6036         if (ret)
6037                 goto err_out_mapping;
6038
6039         /* Everything's ready.  Announce the disk to the world. */
6040
6041         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6042         up_write(&rbd_dev->header_rwsem);
6043
6044         spin_lock(&rbd_dev_list_lock);
6045         list_add_tail(&rbd_dev->node, &rbd_dev_list);
6046         spin_unlock(&rbd_dev_list_lock);
6047
6048         add_disk(rbd_dev->disk);
6049         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6050                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6051                 rbd_dev->header.features);
6052
6053         return ret;
6054
6055 err_out_mapping:
6056         rbd_dev_mapping_clear(rbd_dev);
6057 err_out_disk:
6058         rbd_free_disk(rbd_dev);
6059 err_out_blkdev:
6060         if (!single_major)
6061                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6062 err_out_unlock:
6063         up_write(&rbd_dev->header_rwsem);
6064         return ret;
6065 }
6066
6067 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6068 {
6069         struct rbd_spec *spec = rbd_dev->spec;
6070         int ret;
6071
6072         /* Record the header object name for this rbd image. */
6073
6074         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6075
6076         rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
6077         if (rbd_dev->image_format == 1)
6078                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6079                                        spec->image_name, RBD_SUFFIX);
6080         else
6081                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6082                                        RBD_HEADER_PREFIX, spec->image_id);
6083
6084         return ret;
6085 }
6086
6087 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6088 {
6089         rbd_dev_unprobe(rbd_dev);
6090         rbd_dev->image_format = 0;
6091         kfree(rbd_dev->spec->image_id);
6092         rbd_dev->spec->image_id = NULL;
6093
6094         rbd_dev_destroy(rbd_dev);
6095 }
6096
6097 /*
6098  * Probe for the existence of the header object for the given rbd
6099  * device.  If this image is the one being mapped (i.e., not a
6100  * parent), initiate a watch on its header object before using that
6101  * object to get detailed information about the rbd image.
6102  */
6103 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6104 {
6105         int ret;
6106
6107         /*
6108          * Get the id from the image id object.  Unless there's an
6109          * error, rbd_dev->spec->image_id will be filled in with
6110          * a dynamically-allocated string, and rbd_dev->image_format
6111          * will be set to either 1 or 2.
6112          */
6113         ret = rbd_dev_image_id(rbd_dev);
6114         if (ret)
6115                 return ret;
6116
6117         ret = rbd_dev_header_name(rbd_dev);
6118         if (ret)
6119                 goto err_out_format;
6120
6121         if (!depth) {
6122                 ret = rbd_register_watch(rbd_dev);
6123                 if (ret) {
6124                         if (ret == -ENOENT)
6125                                 pr_info("image %s/%s does not exist\n",
6126                                         rbd_dev->spec->pool_name,
6127                                         rbd_dev->spec->image_name);
6128                         goto err_out_format;
6129                 }
6130         }
6131
6132         ret = rbd_dev_header_info(rbd_dev);
6133         if (ret)
6134                 goto err_out_watch;
6135
6136         /*
6137          * If this image is the one being mapped, we have pool name and
6138          * id, image name and id, and snap name - need to fill snap id.
6139          * Otherwise this is a parent image, identified by pool, image
6140          * and snap ids - need to fill in names for those ids.
6141          */
6142         if (!depth)
6143                 ret = rbd_spec_fill_snap_id(rbd_dev);
6144         else
6145                 ret = rbd_spec_fill_names(rbd_dev);
6146         if (ret) {
6147                 if (ret == -ENOENT)
6148                         pr_info("snap %s/%s@%s does not exist\n",
6149                                 rbd_dev->spec->pool_name,
6150                                 rbd_dev->spec->image_name,
6151                                 rbd_dev->spec->snap_name);
6152                 goto err_out_probe;
6153         }
6154
6155         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6156                 ret = rbd_dev_v2_parent_info(rbd_dev);
6157                 if (ret)
6158                         goto err_out_probe;
6159
6160                 /*
6161                  * Need to warn users if this image is the one being
6162                  * mapped and has a parent.
6163                  */
6164                 if (!depth && rbd_dev->parent_spec)
6165                         rbd_warn(rbd_dev,
6166                                  "WARNING: kernel layering is EXPERIMENTAL!");
6167         }
6168
6169         ret = rbd_dev_probe_parent(rbd_dev, depth);
6170         if (ret)
6171                 goto err_out_probe;
6172
6173         dout("discovered format %u image, header name is %s\n",
6174                 rbd_dev->image_format, rbd_dev->header_oid.name);
6175         return 0;
6176
6177 err_out_probe:
6178         rbd_dev_unprobe(rbd_dev);
6179 err_out_watch:
6180         if (!depth)
6181                 rbd_unregister_watch(rbd_dev);
6182 err_out_format:
6183         rbd_dev->image_format = 0;
6184         kfree(rbd_dev->spec->image_id);
6185         rbd_dev->spec->image_id = NULL;
6186         return ret;
6187 }
6188
6189 static ssize_t do_rbd_add(struct bus_type *bus,
6190                           const char *buf,
6191                           size_t count)
6192 {
6193         struct rbd_device *rbd_dev = NULL;
6194         struct ceph_options *ceph_opts = NULL;
6195         struct rbd_options *rbd_opts = NULL;
6196         struct rbd_spec *spec = NULL;
6197         struct rbd_client *rbdc;
6198         bool read_only;
6199         int rc;
6200
6201         if (!capable(CAP_SYS_ADMIN))
6202                 return -EPERM;
6203
6204         if (!try_module_get(THIS_MODULE))
6205                 return -ENODEV;
6206
6207         /* parse add command */
6208         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6209         if (rc < 0)
6210                 goto out;
6211
6212         rbdc = rbd_get_client(ceph_opts);
6213         if (IS_ERR(rbdc)) {
6214                 rc = PTR_ERR(rbdc);
6215                 goto err_out_args;
6216         }
6217
6218         /* pick the pool */
6219         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6220         if (rc < 0) {
6221                 if (rc == -ENOENT)
6222                         pr_info("pool %s does not exist\n", spec->pool_name);
6223                 goto err_out_client;
6224         }
6225         spec->pool_id = (u64)rc;
6226
6227         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6228         if (!rbd_dev) {
6229                 rc = -ENOMEM;
6230                 goto err_out_client;
6231         }
6232         rbdc = NULL;            /* rbd_dev now owns this */
6233         spec = NULL;            /* rbd_dev now owns this */
6234         rbd_opts = NULL;        /* rbd_dev now owns this */
6235
6236         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6237         if (!rbd_dev->config_info) {
6238                 rc = -ENOMEM;
6239                 goto err_out_rbd_dev;
6240         }
6241
6242         down_write(&rbd_dev->header_rwsem);
6243         rc = rbd_dev_image_probe(rbd_dev, 0);
6244         if (rc < 0) {
6245                 up_write(&rbd_dev->header_rwsem);
6246                 goto err_out_rbd_dev;
6247         }
6248
6249         /* If we are mapping a snapshot it must be marked read-only */
6250
6251         read_only = rbd_dev->opts->read_only;
6252         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6253                 read_only = true;
6254         rbd_dev->mapping.read_only = read_only;
6255
6256         rc = rbd_dev_device_setup(rbd_dev);
6257         if (rc) {
6258                 /*
6259                  * rbd_unregister_watch() can't be moved into
6260                  * rbd_dev_image_release() without refactoring, see
6261                  * commit 1f3ef78861ac.
6262                  */
6263                 rbd_unregister_watch(rbd_dev);
6264                 rbd_dev_image_release(rbd_dev);
6265                 goto out;
6266         }
6267
6268         rc = count;
6269 out:
6270         module_put(THIS_MODULE);
6271         return rc;
6272
6273 err_out_rbd_dev:
6274         rbd_dev_destroy(rbd_dev);
6275 err_out_client:
6276         rbd_put_client(rbdc);
6277 err_out_args:
6278         rbd_spec_put(spec);
6279         kfree(rbd_opts);
6280         goto out;
6281 }
6282
6283 static ssize_t rbd_add(struct bus_type *bus,
6284                        const char *buf,
6285                        size_t count)
6286 {
6287         if (single_major)
6288                 return -EINVAL;
6289
6290         return do_rbd_add(bus, buf, count);
6291 }
6292
6293 static ssize_t rbd_add_single_major(struct bus_type *bus,
6294                                     const char *buf,
6295                                     size_t count)
6296 {
6297         return do_rbd_add(bus, buf, count);
6298 }
6299
6300 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6301 {
6302         rbd_free_disk(rbd_dev);
6303
6304         spin_lock(&rbd_dev_list_lock);
6305         list_del_init(&rbd_dev->node);
6306         spin_unlock(&rbd_dev_list_lock);
6307
6308         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6309         device_del(&rbd_dev->dev);
6310         rbd_dev_mapping_clear(rbd_dev);
6311         if (!single_major)
6312                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6313 }
6314
6315 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6316 {
6317         while (rbd_dev->parent) {
6318                 struct rbd_device *first = rbd_dev;
6319                 struct rbd_device *second = first->parent;
6320                 struct rbd_device *third;
6321
6322                 /*
6323                  * Follow to the parent with no grandparent and
6324                  * remove it.
6325                  */
6326                 while (second && (third = second->parent)) {
6327                         first = second;
6328                         second = third;
6329                 }
6330                 rbd_assert(second);
6331                 rbd_dev_image_release(second);
6332                 first->parent = NULL;
6333                 first->parent_overlap = 0;
6334
6335                 rbd_assert(first->parent_spec);
6336                 rbd_spec_put(first->parent_spec);
6337                 first->parent_spec = NULL;
6338         }
6339 }
6340
6341 static ssize_t do_rbd_remove(struct bus_type *bus,
6342                              const char *buf,
6343                              size_t count)
6344 {
6345         struct rbd_device *rbd_dev = NULL;
6346         struct list_head *tmp;
6347         int dev_id;
6348         char opt_buf[6];
6349         bool force = false;
6350         int ret;
6351
6352         if (!capable(CAP_SYS_ADMIN))
6353                 return -EPERM;
6354
6355         dev_id = -1;
6356         opt_buf[0] = '\0';
6357         sscanf(buf, "%d %5s", &dev_id, opt_buf);
6358         if (dev_id < 0) {
6359                 pr_err("dev_id out of range\n");
6360                 return -EINVAL;
6361         }
6362         if (opt_buf[0] != '\0') {
6363                 if (!strcmp(opt_buf, "force")) {
6364                         force = true;
6365                 } else {
6366                         pr_err("bad remove option at '%s'\n", opt_buf);
6367                         return -EINVAL;
6368                 }
6369         }
6370
6371         ret = -ENOENT;
6372         spin_lock(&rbd_dev_list_lock);
6373         list_for_each(tmp, &rbd_dev_list) {
6374                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6375                 if (rbd_dev->dev_id == dev_id) {
6376                         ret = 0;
6377                         break;
6378                 }
6379         }
6380         if (!ret) {
6381                 spin_lock_irq(&rbd_dev->lock);
6382                 if (rbd_dev->open_count && !force)
6383                         ret = -EBUSY;
6384                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6385                                           &rbd_dev->flags))
6386                         ret = -EINPROGRESS;
6387                 spin_unlock_irq(&rbd_dev->lock);
6388         }
6389         spin_unlock(&rbd_dev_list_lock);
6390         if (ret)
6391                 return ret;
6392
6393         if (force) {
6394                 /*
6395                  * Prevent new IO from being queued and wait for existing
6396                  * IO to complete/fail.
6397                  */
6398                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6399                 blk_set_queue_dying(rbd_dev->disk->queue);
6400         }
6401
6402         down_write(&rbd_dev->lock_rwsem);
6403         if (__rbd_is_lock_owner(rbd_dev))
6404                 rbd_unlock(rbd_dev);
6405         up_write(&rbd_dev->lock_rwsem);
6406         rbd_unregister_watch(rbd_dev);
6407
6408         /*
6409          * Don't free anything from rbd_dev->disk until after all
6410          * notifies are completely processed. Otherwise
6411          * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6412          * in a potential use after free of rbd_dev->disk or rbd_dev.
6413          */
6414         rbd_dev_device_release(rbd_dev);
6415         rbd_dev_image_release(rbd_dev);
6416
6417         return count;
6418 }
6419
6420 static ssize_t rbd_remove(struct bus_type *bus,
6421                           const char *buf,
6422                           size_t count)
6423 {
6424         if (single_major)
6425                 return -EINVAL;
6426
6427         return do_rbd_remove(bus, buf, count);
6428 }
6429
6430 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6431                                        const char *buf,
6432                                        size_t count)
6433 {
6434         return do_rbd_remove(bus, buf, count);
6435 }
6436
6437 /*
6438  * create control files in sysfs
6439  * /sys/bus/rbd/...
6440  */
6441 static int rbd_sysfs_init(void)
6442 {
6443         int ret;
6444
6445         ret = device_register(&rbd_root_dev);
6446         if (ret < 0)
6447                 return ret;
6448
6449         ret = bus_register(&rbd_bus_type);
6450         if (ret < 0)
6451                 device_unregister(&rbd_root_dev);
6452
6453         return ret;
6454 }
6455
6456 static void rbd_sysfs_cleanup(void)
6457 {
6458         bus_unregister(&rbd_bus_type);
6459         device_unregister(&rbd_root_dev);
6460 }
6461
6462 static int rbd_slab_init(void)
6463 {
6464         rbd_assert(!rbd_img_request_cache);
6465         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6466         if (!rbd_img_request_cache)
6467                 return -ENOMEM;
6468
6469         rbd_assert(!rbd_obj_request_cache);
6470         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6471         if (!rbd_obj_request_cache)
6472                 goto out_err;
6473
6474         rbd_assert(!rbd_segment_name_cache);
6475         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
6476                                         CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
6477         if (rbd_segment_name_cache)
6478                 return 0;
6479 out_err:
6480         kmem_cache_destroy(rbd_obj_request_cache);
6481         rbd_obj_request_cache = NULL;
6482
6483         kmem_cache_destroy(rbd_img_request_cache);
6484         rbd_img_request_cache = NULL;
6485
6486         return -ENOMEM;
6487 }
6488
6489 static void rbd_slab_exit(void)
6490 {
6491         rbd_assert(rbd_segment_name_cache);
6492         kmem_cache_destroy(rbd_segment_name_cache);
6493         rbd_segment_name_cache = NULL;
6494
6495         rbd_assert(rbd_obj_request_cache);
6496         kmem_cache_destroy(rbd_obj_request_cache);
6497         rbd_obj_request_cache = NULL;
6498
6499         rbd_assert(rbd_img_request_cache);
6500         kmem_cache_destroy(rbd_img_request_cache);
6501         rbd_img_request_cache = NULL;
6502 }
6503
6504 static int __init rbd_init(void)
6505 {
6506         int rc;
6507
6508         if (!libceph_compatible(NULL)) {
6509                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6510                 return -EINVAL;
6511         }
6512
6513         rc = rbd_slab_init();
6514         if (rc)
6515                 return rc;
6516
6517         /*
6518          * The number of active work items is limited by the number of
6519          * rbd devices * queue depth, so leave @max_active at default.
6520          */
6521         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6522         if (!rbd_wq) {
6523                 rc = -ENOMEM;
6524                 goto err_out_slab;
6525         }
6526
6527         if (single_major) {
6528                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6529                 if (rbd_major < 0) {
6530                         rc = rbd_major;
6531                         goto err_out_wq;
6532                 }
6533         }
6534
6535         rc = rbd_sysfs_init();
6536         if (rc)
6537                 goto err_out_blkdev;
6538
6539         if (single_major)
6540                 pr_info("loaded (major %d)\n", rbd_major);
6541         else
6542                 pr_info("loaded\n");
6543
6544         return 0;
6545
6546 err_out_blkdev:
6547         if (single_major)
6548                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6549 err_out_wq:
6550         destroy_workqueue(rbd_wq);
6551 err_out_slab:
6552         rbd_slab_exit();
6553         return rc;
6554 }
6555
6556 static void __exit rbd_exit(void)
6557 {
6558         ida_destroy(&rbd_dev_id_ida);
6559         rbd_sysfs_cleanup();
6560         if (single_major)
6561                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6562         destroy_workqueue(rbd_wq);
6563         rbd_slab_exit();
6564 }
6565
6566 module_init(rbd_init);
6567 module_exit(rbd_exit);
6568
6569 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6570 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6571 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6572 /* following authorship retained from original osdblk.c */
6573 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6574
6575 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6576 MODULE_LICENSE("GPL");