GNU Linux-libre 4.14.251-gnu1
[releases.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG       /* Activate rbd_assert() calls */
52
53 /*
54  * Increment the given counter and return its updated value.
55  * If the counter is already 0 it will not be incremented.
56  * If the counter is already at its maximum value returns
57  * -EINVAL without updating it.
58  */
59 static int atomic_inc_return_safe(atomic_t *v)
60 {
61         unsigned int counter;
62
63         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
64         if (counter <= (unsigned int)INT_MAX)
65                 return (int)counter;
66
67         atomic_dec(v);
68
69         return -EINVAL;
70 }
71
72 /* Decrement the counter.  Return the resulting value, or -EINVAL */
73 static int atomic_dec_return_safe(atomic_t *v)
74 {
75         int counter;
76
77         counter = atomic_dec_return(v);
78         if (counter >= 0)
79                 return counter;
80
81         atomic_inc(v);
82
83         return -EINVAL;
84 }
85
86 #define RBD_DRV_NAME "rbd"
87
88 #define RBD_MINORS_PER_MAJOR            256
89 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
90
91 #define RBD_MAX_PARENT_CHAIN_LEN        16
92
93 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
94 #define RBD_MAX_SNAP_NAME_LEN   \
95                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
96
97 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
98
99 #define RBD_SNAP_HEAD_NAME      "-"
100
101 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
102
103 /* This allows a single page to hold an image name sent by OSD */
104 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
105 #define RBD_IMAGE_ID_LEN_MAX    64
106
107 #define RBD_OBJ_PREFIX_LEN_MAX  64
108
109 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
110 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING            (1ULL<<0)
115 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
116 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
117 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
118 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
119
120 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
121                                  RBD_FEATURE_STRIPINGV2 |       \
122                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
123                                  RBD_FEATURE_DATA_POOL |        \
124                                  RBD_FEATURE_OPERATIONS)
125
126 /* Features supported by this (client software) implementation. */
127
128 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
129
130 /*
131  * An RBD device name will be "rbd#", where the "rbd" comes from
132  * RBD_DRV_NAME above, and # is a unique integer identifier.
133  */
134 #define DEV_NAME_LEN            32
135
136 /*
137  * block device image metadata (in-memory version)
138  */
139 struct rbd_image_header {
140         /* These six fields never change for a given rbd image */
141         char *object_prefix;
142         __u8 obj_order;
143         u64 stripe_unit;
144         u64 stripe_count;
145         s64 data_pool_id;
146         u64 features;           /* Might be changeable someday? */
147
148         /* The remaining fields need to be updated occasionally */
149         u64 image_size;
150         struct ceph_snap_context *snapc;
151         char *snap_names;       /* format 1 only */
152         u64 *snap_sizes;        /* format 1 only */
153 };
154
155 /*
156  * An rbd image specification.
157  *
158  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
159  * identify an image.  Each rbd_dev structure includes a pointer to
160  * an rbd_spec structure that encapsulates this identity.
161  *
162  * Each of the id's in an rbd_spec has an associated name.  For a
163  * user-mapped image, the names are supplied and the id's associated
164  * with them are looked up.  For a layered image, a parent image is
165  * defined by the tuple, and the names are looked up.
166  *
167  * An rbd_dev structure contains a parent_spec pointer which is
168  * non-null if the image it represents is a child in a layered
169  * image.  This pointer will refer to the rbd_spec structure used
170  * by the parent rbd_dev for its own identity (i.e., the structure
171  * is shared between the parent and child).
172  *
173  * Since these structures are populated once, during the discovery
174  * phase of image construction, they are effectively immutable so
175  * we make no effort to synchronize access to them.
176  *
177  * Note that code herein does not assume the image name is known (it
178  * could be a null pointer).
179  */
180 struct rbd_spec {
181         u64             pool_id;
182         const char      *pool_name;
183
184         const char      *image_id;
185         const char      *image_name;
186
187         u64             snap_id;
188         const char      *snap_name;
189
190         struct kref     kref;
191 };
192
193 /*
194  * an instance of the client.  multiple devices may share an rbd client.
195  */
196 struct rbd_client {
197         struct ceph_client      *client;
198         struct kref             kref;
199         struct list_head        node;
200 };
201
202 struct rbd_img_request;
203 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
204
205 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
206
207 struct rbd_obj_request;
208 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
209
210 enum obj_request_type {
211         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
212 };
213
214 enum obj_operation_type {
215         OBJ_OP_WRITE,
216         OBJ_OP_READ,
217         OBJ_OP_DISCARD,
218 };
219
220 enum obj_req_flags {
221         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
222         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
223         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
224         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
225 };
226
227 struct rbd_obj_request {
228         u64                     object_no;
229         u64                     offset;         /* object start byte */
230         u64                     length;         /* bytes from offset */
231         unsigned long           flags;
232
233         /*
234          * An object request associated with an image will have its
235          * img_data flag set; a standalone object request will not.
236          *
237          * A standalone object request will have which == BAD_WHICH
238          * and a null obj_request pointer.
239          *
240          * An object request initiated in support of a layered image
241          * object (to check for its existence before a write) will
242          * have which == BAD_WHICH and a non-null obj_request pointer.
243          *
244          * Finally, an object request for rbd image data will have
245          * which != BAD_WHICH, and will have a non-null img_request
246          * pointer.  The value of which will be in the range
247          * 0..(img_request->obj_request_count-1).
248          */
249         union {
250                 struct rbd_obj_request  *obj_request;   /* STAT op */
251                 struct {
252                         struct rbd_img_request  *img_request;
253                         u64                     img_offset;
254                         /* links for img_request->obj_requests list */
255                         struct list_head        links;
256                 };
257         };
258         u32                     which;          /* posn image request list */
259
260         enum obj_request_type   type;
261         union {
262                 struct bio      *bio_list;
263                 struct {
264                         struct page     **pages;
265                         u32             page_count;
266                 };
267         };
268         struct page             **copyup_pages;
269         u32                     copyup_page_count;
270
271         struct ceph_osd_request *osd_req;
272
273         u64                     xferred;        /* bytes transferred */
274         int                     result;
275
276         rbd_obj_callback_t      callback;
277         struct completion       completion;
278
279         struct kref             kref;
280 };
281
282 enum img_req_flags {
283         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
284         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
285         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
286         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
287 };
288
289 struct rbd_img_request {
290         struct rbd_device       *rbd_dev;
291         u64                     offset; /* starting image byte offset */
292         u64                     length; /* byte count from offset */
293         unsigned long           flags;
294         union {
295                 u64                     snap_id;        /* for reads */
296                 struct ceph_snap_context *snapc;        /* for writes */
297         };
298         union {
299                 struct request          *rq;            /* block request */
300                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
301         };
302         struct page             **copyup_pages;
303         u32                     copyup_page_count;
304         spinlock_t              completion_lock;/* protects next_completion */
305         u32                     next_completion;
306         rbd_img_callback_t      callback;
307         u64                     xferred;/* aggregate bytes transferred */
308         int                     result; /* first nonzero obj_request result */
309
310         u32                     obj_request_count;
311         struct list_head        obj_requests;   /* rbd_obj_request structs */
312
313         struct kref             kref;
314 };
315
316 #define for_each_obj_request(ireq, oreq) \
317         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
318 #define for_each_obj_request_from(ireq, oreq) \
319         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
320 #define for_each_obj_request_safe(ireq, oreq, n) \
321         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
322
323 enum rbd_watch_state {
324         RBD_WATCH_STATE_UNREGISTERED,
325         RBD_WATCH_STATE_REGISTERED,
326         RBD_WATCH_STATE_ERROR,
327 };
328
329 enum rbd_lock_state {
330         RBD_LOCK_STATE_UNLOCKED,
331         RBD_LOCK_STATE_LOCKED,
332         RBD_LOCK_STATE_RELEASING,
333 };
334
335 /* WatchNotify::ClientId */
336 struct rbd_client_id {
337         u64 gid;
338         u64 handle;
339 };
340
341 struct rbd_mapping {
342         u64                     size;
343         u64                     features;
344         bool                    read_only;
345 };
346
347 /*
348  * a single device
349  */
350 struct rbd_device {
351         int                     dev_id;         /* blkdev unique id */
352
353         int                     major;          /* blkdev assigned major */
354         int                     minor;
355         struct gendisk          *disk;          /* blkdev's gendisk and rq */
356
357         u32                     image_format;   /* Either 1 or 2 */
358         struct rbd_client       *rbd_client;
359
360         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
361
362         spinlock_t              lock;           /* queue, flags, open_count */
363
364         struct rbd_image_header header;
365         unsigned long           flags;          /* possibly lock protected */
366         struct rbd_spec         *spec;
367         struct rbd_options      *opts;
368         char                    *config_info;   /* add{,_single_major} string */
369
370         struct ceph_object_id   header_oid;
371         struct ceph_object_locator header_oloc;
372
373         struct ceph_file_layout layout;         /* used for all rbd requests */
374
375         struct mutex            watch_mutex;
376         enum rbd_watch_state    watch_state;
377         struct ceph_osd_linger_request *watch_handle;
378         u64                     watch_cookie;
379         struct delayed_work     watch_dwork;
380
381         struct rw_semaphore     lock_rwsem;
382         enum rbd_lock_state     lock_state;
383         char                    lock_cookie[32];
384         struct rbd_client_id    owner_cid;
385         struct work_struct      acquired_lock_work;
386         struct work_struct      released_lock_work;
387         struct delayed_work     lock_dwork;
388         struct work_struct      unlock_work;
389         wait_queue_head_t       lock_waitq;
390
391         struct workqueue_struct *task_wq;
392
393         struct rbd_spec         *parent_spec;
394         u64                     parent_overlap;
395         atomic_t                parent_ref;
396         struct rbd_device       *parent;
397
398         /* Block layer tags. */
399         struct blk_mq_tag_set   tag_set;
400
401         /* protects updating the header */
402         struct rw_semaphore     header_rwsem;
403
404         struct rbd_mapping      mapping;
405
406         struct list_head        node;
407
408         /* sysfs related */
409         struct device           dev;
410         unsigned long           open_count;     /* protected by lock */
411 };
412
413 /*
414  * Flag bits for rbd_dev->flags:
415  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
416  *   by rbd_dev->lock
417  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
418  */
419 enum rbd_dev_flags {
420         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
421         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
422         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
423 };
424
425 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
426
427 static LIST_HEAD(rbd_dev_list);    /* devices */
428 static DEFINE_SPINLOCK(rbd_dev_list_lock);
429
430 static LIST_HEAD(rbd_client_list);              /* clients */
431 static DEFINE_SPINLOCK(rbd_client_list_lock);
432
433 /* Slab caches for frequently-allocated structures */
434
435 static struct kmem_cache        *rbd_img_request_cache;
436 static struct kmem_cache        *rbd_obj_request_cache;
437
438 static struct bio_set           *rbd_bio_clone;
439
440 static int rbd_major;
441 static DEFINE_IDA(rbd_dev_id_ida);
442
443 static struct workqueue_struct *rbd_wq;
444
445 /*
446  * Default to false for now, as single-major requires >= 0.75 version of
447  * userspace rbd utility.
448  */
449 static bool single_major = false;
450 module_param(single_major, bool, S_IRUGO);
451 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
452
453 static int rbd_img_request_submit(struct rbd_img_request *img_request);
454
455 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
456                        size_t count);
457 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
458                           size_t count);
459 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
460                                     size_t count);
461 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
462                                        size_t count);
463 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
464 static void rbd_spec_put(struct rbd_spec *spec);
465
466 static int rbd_dev_id_to_minor(int dev_id)
467 {
468         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
469 }
470
471 static int minor_to_rbd_dev_id(int minor)
472 {
473         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
474 }
475
476 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
477 {
478         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
479                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
480 }
481
482 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
483 {
484         bool is_lock_owner;
485
486         down_read(&rbd_dev->lock_rwsem);
487         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
488         up_read(&rbd_dev->lock_rwsem);
489         return is_lock_owner;
490 }
491
492 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
493 {
494         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
495 }
496
497 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
498 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
499 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
500 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
501 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
502
503 static struct attribute *rbd_bus_attrs[] = {
504         &bus_attr_add.attr,
505         &bus_attr_remove.attr,
506         &bus_attr_add_single_major.attr,
507         &bus_attr_remove_single_major.attr,
508         &bus_attr_supported_features.attr,
509         NULL,
510 };
511
512 static umode_t rbd_bus_is_visible(struct kobject *kobj,
513                                   struct attribute *attr, int index)
514 {
515         if (!single_major &&
516             (attr == &bus_attr_add_single_major.attr ||
517              attr == &bus_attr_remove_single_major.attr))
518                 return 0;
519
520         return attr->mode;
521 }
522
523 static const struct attribute_group rbd_bus_group = {
524         .attrs = rbd_bus_attrs,
525         .is_visible = rbd_bus_is_visible,
526 };
527 __ATTRIBUTE_GROUPS(rbd_bus);
528
529 static struct bus_type rbd_bus_type = {
530         .name           = "rbd",
531         .bus_groups     = rbd_bus_groups,
532 };
533
534 static void rbd_root_dev_release(struct device *dev)
535 {
536 }
537
538 static struct device rbd_root_dev = {
539         .init_name =    "rbd",
540         .release =      rbd_root_dev_release,
541 };
542
543 static __printf(2, 3)
544 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
545 {
546         struct va_format vaf;
547         va_list args;
548
549         va_start(args, fmt);
550         vaf.fmt = fmt;
551         vaf.va = &args;
552
553         if (!rbd_dev)
554                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
555         else if (rbd_dev->disk)
556                 printk(KERN_WARNING "%s: %s: %pV\n",
557                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
558         else if (rbd_dev->spec && rbd_dev->spec->image_name)
559                 printk(KERN_WARNING "%s: image %s: %pV\n",
560                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
561         else if (rbd_dev->spec && rbd_dev->spec->image_id)
562                 printk(KERN_WARNING "%s: id %s: %pV\n",
563                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
564         else    /* punt */
565                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
566                         RBD_DRV_NAME, rbd_dev, &vaf);
567         va_end(args);
568 }
569
570 #ifdef RBD_DEBUG
571 #define rbd_assert(expr)                                                \
572                 if (unlikely(!(expr))) {                                \
573                         printk(KERN_ERR "\nAssertion failure in %s() "  \
574                                                 "at line %d:\n\n"       \
575                                         "\trbd_assert(%s);\n\n",        \
576                                         __func__, __LINE__, #expr);     \
577                         BUG();                                          \
578                 }
579 #else /* !RBD_DEBUG */
580 #  define rbd_assert(expr)      ((void) 0)
581 #endif /* !RBD_DEBUG */
582
583 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
584 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
585 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
586 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
587
588 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
589 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
590 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
591 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
592 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
593                                         u64 snap_id);
594 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
595                                 u8 *order, u64 *snap_size);
596 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
597                 u64 *snap_features);
598
599 static int rbd_open(struct block_device *bdev, fmode_t mode)
600 {
601         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602         bool removing = false;
603
604         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
605                 return -EROFS;
606
607         spin_lock_irq(&rbd_dev->lock);
608         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
609                 removing = true;
610         else
611                 rbd_dev->open_count++;
612         spin_unlock_irq(&rbd_dev->lock);
613         if (removing)
614                 return -ENOENT;
615
616         (void) get_device(&rbd_dev->dev);
617
618         return 0;
619 }
620
621 static void rbd_release(struct gendisk *disk, fmode_t mode)
622 {
623         struct rbd_device *rbd_dev = disk->private_data;
624         unsigned long open_count_before;
625
626         spin_lock_irq(&rbd_dev->lock);
627         open_count_before = rbd_dev->open_count--;
628         spin_unlock_irq(&rbd_dev->lock);
629         rbd_assert(open_count_before > 0);
630
631         put_device(&rbd_dev->dev);
632 }
633
634 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
635 {
636         int ret = 0;
637         int val;
638         bool ro;
639         bool ro_changed = false;
640
641         /* get_user() may sleep, so call it before taking rbd_dev->lock */
642         if (get_user(val, (int __user *)(arg)))
643                 return -EFAULT;
644
645         ro = val ? true : false;
646         /* Snapshot doesn't allow to write*/
647         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
648                 return -EROFS;
649
650         spin_lock_irq(&rbd_dev->lock);
651         /* prevent others open this device */
652         if (rbd_dev->open_count > 1) {
653                 ret = -EBUSY;
654                 goto out;
655         }
656
657         if (rbd_dev->mapping.read_only != ro) {
658                 rbd_dev->mapping.read_only = ro;
659                 ro_changed = true;
660         }
661
662 out:
663         spin_unlock_irq(&rbd_dev->lock);
664         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
665         if (ret == 0 && ro_changed)
666                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
667
668         return ret;
669 }
670
671 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
672                         unsigned int cmd, unsigned long arg)
673 {
674         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
675         int ret = 0;
676
677         switch (cmd) {
678         case BLKROSET:
679                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
680                 break;
681         default:
682                 ret = -ENOTTY;
683         }
684
685         return ret;
686 }
687
688 #ifdef CONFIG_COMPAT
689 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
690                                 unsigned int cmd, unsigned long arg)
691 {
692         return rbd_ioctl(bdev, mode, cmd, arg);
693 }
694 #endif /* CONFIG_COMPAT */
695
696 static const struct block_device_operations rbd_bd_ops = {
697         .owner                  = THIS_MODULE,
698         .open                   = rbd_open,
699         .release                = rbd_release,
700         .ioctl                  = rbd_ioctl,
701 #ifdef CONFIG_COMPAT
702         .compat_ioctl           = rbd_compat_ioctl,
703 #endif
704 };
705
706 /*
707  * Initialize an rbd client instance.  Success or not, this function
708  * consumes ceph_opts.  Caller holds client_mutex.
709  */
710 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
711 {
712         struct rbd_client *rbdc;
713         int ret = -ENOMEM;
714
715         dout("%s:\n", __func__);
716         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
717         if (!rbdc)
718                 goto out_opt;
719
720         kref_init(&rbdc->kref);
721         INIT_LIST_HEAD(&rbdc->node);
722
723         rbdc->client = ceph_create_client(ceph_opts, rbdc);
724         if (IS_ERR(rbdc->client))
725                 goto out_rbdc;
726         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
727
728         ret = ceph_open_session(rbdc->client);
729         if (ret < 0)
730                 goto out_client;
731
732         spin_lock(&rbd_client_list_lock);
733         list_add_tail(&rbdc->node, &rbd_client_list);
734         spin_unlock(&rbd_client_list_lock);
735
736         dout("%s: rbdc %p\n", __func__, rbdc);
737
738         return rbdc;
739 out_client:
740         ceph_destroy_client(rbdc->client);
741 out_rbdc:
742         kfree(rbdc);
743 out_opt:
744         if (ceph_opts)
745                 ceph_destroy_options(ceph_opts);
746         dout("%s: error %d\n", __func__, ret);
747
748         return ERR_PTR(ret);
749 }
750
751 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
752 {
753         kref_get(&rbdc->kref);
754
755         return rbdc;
756 }
757
758 /*
759  * Find a ceph client with specific addr and configuration.  If
760  * found, bump its reference count.
761  */
762 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
763 {
764         struct rbd_client *client_node;
765         bool found = false;
766
767         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
768                 return NULL;
769
770         spin_lock(&rbd_client_list_lock);
771         list_for_each_entry(client_node, &rbd_client_list, node) {
772                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
773                         __rbd_get_client(client_node);
774
775                         found = true;
776                         break;
777                 }
778         }
779         spin_unlock(&rbd_client_list_lock);
780
781         return found ? client_node : NULL;
782 }
783
784 /*
785  * (Per device) rbd map options
786  */
787 enum {
788         Opt_queue_depth,
789         Opt_last_int,
790         /* int args above */
791         Opt_last_string,
792         /* string args above */
793         Opt_read_only,
794         Opt_read_write,
795         Opt_lock_on_read,
796         Opt_exclusive,
797         Opt_err
798 };
799
800 static match_table_t rbd_opts_tokens = {
801         {Opt_queue_depth, "queue_depth=%d"},
802         /* int args above */
803         /* string args above */
804         {Opt_read_only, "read_only"},
805         {Opt_read_only, "ro"},          /* Alternate spelling */
806         {Opt_read_write, "read_write"},
807         {Opt_read_write, "rw"},         /* Alternate spelling */
808         {Opt_lock_on_read, "lock_on_read"},
809         {Opt_exclusive, "exclusive"},
810         {Opt_err, NULL}
811 };
812
813 struct rbd_options {
814         int     queue_depth;
815         bool    read_only;
816         bool    lock_on_read;
817         bool    exclusive;
818 };
819
820 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
821 #define RBD_READ_ONLY_DEFAULT   false
822 #define RBD_LOCK_ON_READ_DEFAULT false
823 #define RBD_EXCLUSIVE_DEFAULT   false
824
825 static int parse_rbd_opts_token(char *c, void *private)
826 {
827         struct rbd_options *rbd_opts = private;
828         substring_t argstr[MAX_OPT_ARGS];
829         int token, intval, ret;
830
831         token = match_token(c, rbd_opts_tokens, argstr);
832         if (token < Opt_last_int) {
833                 ret = match_int(&argstr[0], &intval);
834                 if (ret < 0) {
835                         pr_err("bad mount option arg (not int) at '%s'\n", c);
836                         return ret;
837                 }
838                 dout("got int token %d val %d\n", token, intval);
839         } else if (token > Opt_last_int && token < Opt_last_string) {
840                 dout("got string token %d val %s\n", token, argstr[0].from);
841         } else {
842                 dout("got token %d\n", token);
843         }
844
845         switch (token) {
846         case Opt_queue_depth:
847                 if (intval < 1) {
848                         pr_err("queue_depth out of range\n");
849                         return -EINVAL;
850                 }
851                 rbd_opts->queue_depth = intval;
852                 break;
853         case Opt_read_only:
854                 rbd_opts->read_only = true;
855                 break;
856         case Opt_read_write:
857                 rbd_opts->read_only = false;
858                 break;
859         case Opt_lock_on_read:
860                 rbd_opts->lock_on_read = true;
861                 break;
862         case Opt_exclusive:
863                 rbd_opts->exclusive = true;
864                 break;
865         default:
866                 /* libceph prints "bad option" msg */
867                 return -EINVAL;
868         }
869
870         return 0;
871 }
872
873 static char* obj_op_name(enum obj_operation_type op_type)
874 {
875         switch (op_type) {
876         case OBJ_OP_READ:
877                 return "read";
878         case OBJ_OP_WRITE:
879                 return "write";
880         case OBJ_OP_DISCARD:
881                 return "discard";
882         default:
883                 return "???";
884         }
885 }
886
887 /*
888  * Get a ceph client with specific addr and configuration, if one does
889  * not exist create it.  Either way, ceph_opts is consumed by this
890  * function.
891  */
892 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
893 {
894         struct rbd_client *rbdc;
895
896         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
897         rbdc = rbd_client_find(ceph_opts);
898         if (rbdc)       /* using an existing client */
899                 ceph_destroy_options(ceph_opts);
900         else
901                 rbdc = rbd_client_create(ceph_opts);
902         mutex_unlock(&client_mutex);
903
904         return rbdc;
905 }
906
907 /*
908  * Destroy ceph client
909  *
910  * Caller must hold rbd_client_list_lock.
911  */
912 static void rbd_client_release(struct kref *kref)
913 {
914         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
915
916         dout("%s: rbdc %p\n", __func__, rbdc);
917         spin_lock(&rbd_client_list_lock);
918         list_del(&rbdc->node);
919         spin_unlock(&rbd_client_list_lock);
920
921         ceph_destroy_client(rbdc->client);
922         kfree(rbdc);
923 }
924
925 /*
926  * Drop reference to ceph client node. If it's not referenced anymore, release
927  * it.
928  */
929 static void rbd_put_client(struct rbd_client *rbdc)
930 {
931         if (rbdc)
932                 kref_put(&rbdc->kref, rbd_client_release);
933 }
934
935 static bool rbd_image_format_valid(u32 image_format)
936 {
937         return image_format == 1 || image_format == 2;
938 }
939
940 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
941 {
942         size_t size;
943         u32 snap_count;
944
945         /* The header has to start with the magic rbd header text */
946         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
947                 return false;
948
949         /* The bio layer requires at least sector-sized I/O */
950
951         if (ondisk->options.order < SECTOR_SHIFT)
952                 return false;
953
954         /* If we use u64 in a few spots we may be able to loosen this */
955
956         if (ondisk->options.order > 8 * sizeof (int) - 1)
957                 return false;
958
959         /*
960          * The size of a snapshot header has to fit in a size_t, and
961          * that limits the number of snapshots.
962          */
963         snap_count = le32_to_cpu(ondisk->snap_count);
964         size = SIZE_MAX - sizeof (struct ceph_snap_context);
965         if (snap_count > size / sizeof (__le64))
966                 return false;
967
968         /*
969          * Not only that, but the size of the entire the snapshot
970          * header must also be representable in a size_t.
971          */
972         size -= snap_count * sizeof (__le64);
973         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
974                 return false;
975
976         return true;
977 }
978
979 /*
980  * returns the size of an object in the image
981  */
982 static u32 rbd_obj_bytes(struct rbd_image_header *header)
983 {
984         return 1U << header->obj_order;
985 }
986
987 static void rbd_init_layout(struct rbd_device *rbd_dev)
988 {
989         if (rbd_dev->header.stripe_unit == 0 ||
990             rbd_dev->header.stripe_count == 0) {
991                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
992                 rbd_dev->header.stripe_count = 1;
993         }
994
995         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
996         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
997         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
998         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
999                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1000         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1001 }
1002
1003 /*
1004  * Fill an rbd image header with information from the given format 1
1005  * on-disk header.
1006  */
1007 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1008                                  struct rbd_image_header_ondisk *ondisk)
1009 {
1010         struct rbd_image_header *header = &rbd_dev->header;
1011         bool first_time = header->object_prefix == NULL;
1012         struct ceph_snap_context *snapc;
1013         char *object_prefix = NULL;
1014         char *snap_names = NULL;
1015         u64 *snap_sizes = NULL;
1016         u32 snap_count;
1017         int ret = -ENOMEM;
1018         u32 i;
1019
1020         /* Allocate this now to avoid having to handle failure below */
1021
1022         if (first_time) {
1023                 object_prefix = kstrndup(ondisk->object_prefix,
1024                                          sizeof(ondisk->object_prefix),
1025                                          GFP_KERNEL);
1026                 if (!object_prefix)
1027                         return -ENOMEM;
1028         }
1029
1030         /* Allocate the snapshot context and fill it in */
1031
1032         snap_count = le32_to_cpu(ondisk->snap_count);
1033         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1034         if (!snapc)
1035                 goto out_err;
1036         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1037         if (snap_count) {
1038                 struct rbd_image_snap_ondisk *snaps;
1039                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1040
1041                 /* We'll keep a copy of the snapshot names... */
1042
1043                 if (snap_names_len > (u64)SIZE_MAX)
1044                         goto out_2big;
1045                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1046                 if (!snap_names)
1047                         goto out_err;
1048
1049                 /* ...as well as the array of their sizes. */
1050                 snap_sizes = kmalloc_array(snap_count,
1051                                            sizeof(*header->snap_sizes),
1052                                            GFP_KERNEL);
1053                 if (!snap_sizes)
1054                         goto out_err;
1055
1056                 /*
1057                  * Copy the names, and fill in each snapshot's id
1058                  * and size.
1059                  *
1060                  * Note that rbd_dev_v1_header_info() guarantees the
1061                  * ondisk buffer we're working with has
1062                  * snap_names_len bytes beyond the end of the
1063                  * snapshot id array, this memcpy() is safe.
1064                  */
1065                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1066                 snaps = ondisk->snaps;
1067                 for (i = 0; i < snap_count; i++) {
1068                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1069                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1070                 }
1071         }
1072
1073         /* We won't fail any more, fill in the header */
1074
1075         if (first_time) {
1076                 header->object_prefix = object_prefix;
1077                 header->obj_order = ondisk->options.order;
1078                 rbd_init_layout(rbd_dev);
1079         } else {
1080                 ceph_put_snap_context(header->snapc);
1081                 kfree(header->snap_names);
1082                 kfree(header->snap_sizes);
1083         }
1084
1085         /* The remaining fields always get updated (when we refresh) */
1086
1087         header->image_size = le64_to_cpu(ondisk->image_size);
1088         header->snapc = snapc;
1089         header->snap_names = snap_names;
1090         header->snap_sizes = snap_sizes;
1091
1092         return 0;
1093 out_2big:
1094         ret = -EIO;
1095 out_err:
1096         kfree(snap_sizes);
1097         kfree(snap_names);
1098         ceph_put_snap_context(snapc);
1099         kfree(object_prefix);
1100
1101         return ret;
1102 }
1103
1104 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1105 {
1106         const char *snap_name;
1107
1108         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1109
1110         /* Skip over names until we find the one we are looking for */
1111
1112         snap_name = rbd_dev->header.snap_names;
1113         while (which--)
1114                 snap_name += strlen(snap_name) + 1;
1115
1116         return kstrdup(snap_name, GFP_KERNEL);
1117 }
1118
1119 /*
1120  * Snapshot id comparison function for use with qsort()/bsearch().
1121  * Note that result is for snapshots in *descending* order.
1122  */
1123 static int snapid_compare_reverse(const void *s1, const void *s2)
1124 {
1125         u64 snap_id1 = *(u64 *)s1;
1126         u64 snap_id2 = *(u64 *)s2;
1127
1128         if (snap_id1 < snap_id2)
1129                 return 1;
1130         return snap_id1 == snap_id2 ? 0 : -1;
1131 }
1132
1133 /*
1134  * Search a snapshot context to see if the given snapshot id is
1135  * present.
1136  *
1137  * Returns the position of the snapshot id in the array if it's found,
1138  * or BAD_SNAP_INDEX otherwise.
1139  *
1140  * Note: The snapshot array is in kept sorted (by the osd) in
1141  * reverse order, highest snapshot id first.
1142  */
1143 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1144 {
1145         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1146         u64 *found;
1147
1148         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1149                                 sizeof (snap_id), snapid_compare_reverse);
1150
1151         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1152 }
1153
1154 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1155                                         u64 snap_id)
1156 {
1157         u32 which;
1158         const char *snap_name;
1159
1160         which = rbd_dev_snap_index(rbd_dev, snap_id);
1161         if (which == BAD_SNAP_INDEX)
1162                 return ERR_PTR(-ENOENT);
1163
1164         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1165         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1166 }
1167
1168 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1169 {
1170         if (snap_id == CEPH_NOSNAP)
1171                 return RBD_SNAP_HEAD_NAME;
1172
1173         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1174         if (rbd_dev->image_format == 1)
1175                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1176
1177         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1178 }
1179
1180 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1181                                 u64 *snap_size)
1182 {
1183         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1184         if (snap_id == CEPH_NOSNAP) {
1185                 *snap_size = rbd_dev->header.image_size;
1186         } else if (rbd_dev->image_format == 1) {
1187                 u32 which;
1188
1189                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1190                 if (which == BAD_SNAP_INDEX)
1191                         return -ENOENT;
1192
1193                 *snap_size = rbd_dev->header.snap_sizes[which];
1194         } else {
1195                 u64 size = 0;
1196                 int ret;
1197
1198                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1199                 if (ret)
1200                         return ret;
1201
1202                 *snap_size = size;
1203         }
1204         return 0;
1205 }
1206
1207 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1208                         u64 *snap_features)
1209 {
1210         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1211         if (snap_id == CEPH_NOSNAP) {
1212                 *snap_features = rbd_dev->header.features;
1213         } else if (rbd_dev->image_format == 1) {
1214                 *snap_features = 0;     /* No features for format 1 */
1215         } else {
1216                 u64 features = 0;
1217                 int ret;
1218
1219                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1220                 if (ret)
1221                         return ret;
1222
1223                 *snap_features = features;
1224         }
1225         return 0;
1226 }
1227
1228 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1229 {
1230         u64 snap_id = rbd_dev->spec->snap_id;
1231         u64 size = 0;
1232         u64 features = 0;
1233         int ret;
1234
1235         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1236         if (ret)
1237                 return ret;
1238         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1239         if (ret)
1240                 return ret;
1241
1242         rbd_dev->mapping.size = size;
1243         rbd_dev->mapping.features = features;
1244
1245         return 0;
1246 }
1247
1248 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1249 {
1250         rbd_dev->mapping.size = 0;
1251         rbd_dev->mapping.features = 0;
1252 }
1253
1254 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1255 {
1256         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1257
1258         return offset & (segment_size - 1);
1259 }
1260
1261 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1262                                 u64 offset, u64 length)
1263 {
1264         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1265
1266         offset &= segment_size - 1;
1267
1268         rbd_assert(length <= U64_MAX - offset);
1269         if (offset + length > segment_size)
1270                 length = segment_size - offset;
1271
1272         return length;
1273 }
1274
1275 /*
1276  * bio helpers
1277  */
1278
1279 static void bio_chain_put(struct bio *chain)
1280 {
1281         struct bio *tmp;
1282
1283         while (chain) {
1284                 tmp = chain;
1285                 chain = chain->bi_next;
1286                 bio_put(tmp);
1287         }
1288 }
1289
1290 /*
1291  * zeros a bio chain, starting at specific offset
1292  */
1293 static void zero_bio_chain(struct bio *chain, int start_ofs)
1294 {
1295         struct bio_vec bv;
1296         struct bvec_iter iter;
1297         unsigned long flags;
1298         void *buf;
1299         int pos = 0;
1300
1301         while (chain) {
1302                 bio_for_each_segment(bv, chain, iter) {
1303                         if (pos + bv.bv_len > start_ofs) {
1304                                 int remainder = max(start_ofs - pos, 0);
1305                                 buf = bvec_kmap_irq(&bv, &flags);
1306                                 memset(buf + remainder, 0,
1307                                        bv.bv_len - remainder);
1308                                 flush_dcache_page(bv.bv_page);
1309                                 bvec_kunmap_irq(buf, &flags);
1310                         }
1311                         pos += bv.bv_len;
1312                 }
1313
1314                 chain = chain->bi_next;
1315         }
1316 }
1317
1318 /*
1319  * similar to zero_bio_chain(), zeros data defined by a page array,
1320  * starting at the given byte offset from the start of the array and
1321  * continuing up to the given end offset.  The pages array is
1322  * assumed to be big enough to hold all bytes up to the end.
1323  */
1324 static void zero_pages(struct page **pages, u64 offset, u64 end)
1325 {
1326         struct page **page = &pages[offset >> PAGE_SHIFT];
1327
1328         rbd_assert(end > offset);
1329         rbd_assert(end - offset <= (u64)SIZE_MAX);
1330         while (offset < end) {
1331                 size_t page_offset;
1332                 size_t length;
1333                 unsigned long flags;
1334                 void *kaddr;
1335
1336                 page_offset = offset & ~PAGE_MASK;
1337                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1338                 local_irq_save(flags);
1339                 kaddr = kmap_atomic(*page);
1340                 memset(kaddr + page_offset, 0, length);
1341                 flush_dcache_page(*page);
1342                 kunmap_atomic(kaddr);
1343                 local_irq_restore(flags);
1344
1345                 offset += length;
1346                 page++;
1347         }
1348 }
1349
1350 /*
1351  * Clone a portion of a bio, starting at the given byte offset
1352  * and continuing for the number of bytes indicated.
1353  */
1354 static struct bio *bio_clone_range(struct bio *bio_src,
1355                                         unsigned int offset,
1356                                         unsigned int len,
1357                                         gfp_t gfpmask)
1358 {
1359         struct bio *bio;
1360
1361         bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1362         if (!bio)
1363                 return NULL;    /* ENOMEM */
1364
1365         bio_advance(bio, offset);
1366         bio->bi_iter.bi_size = len;
1367
1368         return bio;
1369 }
1370
1371 /*
1372  * Clone a portion of a bio chain, starting at the given byte offset
1373  * into the first bio in the source chain and continuing for the
1374  * number of bytes indicated.  The result is another bio chain of
1375  * exactly the given length, or a null pointer on error.
1376  *
1377  * The bio_src and offset parameters are both in-out.  On entry they
1378  * refer to the first source bio and the offset into that bio where
1379  * the start of data to be cloned is located.
1380  *
1381  * On return, bio_src is updated to refer to the bio in the source
1382  * chain that contains first un-cloned byte, and *offset will
1383  * contain the offset of that byte within that bio.
1384  */
1385 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1386                                         unsigned int *offset,
1387                                         unsigned int len,
1388                                         gfp_t gfpmask)
1389 {
1390         struct bio *bi = *bio_src;
1391         unsigned int off = *offset;
1392         struct bio *chain = NULL;
1393         struct bio **end;
1394
1395         /* Build up a chain of clone bios up to the limit */
1396
1397         if (!bi || off >= bi->bi_iter.bi_size || !len)
1398                 return NULL;            /* Nothing to clone */
1399
1400         end = &chain;
1401         while (len) {
1402                 unsigned int bi_size;
1403                 struct bio *bio;
1404
1405                 if (!bi) {
1406                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1407                         goto out_err;   /* EINVAL; ran out of bio's */
1408                 }
1409                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1410                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1411                 if (!bio)
1412                         goto out_err;   /* ENOMEM */
1413
1414                 *end = bio;
1415                 end = &bio->bi_next;
1416
1417                 off += bi_size;
1418                 if (off == bi->bi_iter.bi_size) {
1419                         bi = bi->bi_next;
1420                         off = 0;
1421                 }
1422                 len -= bi_size;
1423         }
1424         *bio_src = bi;
1425         *offset = off;
1426
1427         return chain;
1428 out_err:
1429         bio_chain_put(chain);
1430
1431         return NULL;
1432 }
1433
1434 /*
1435  * The default/initial value for all object request flags is 0.  For
1436  * each flag, once its value is set to 1 it is never reset to 0
1437  * again.
1438  */
1439 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1440 {
1441         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1442                 struct rbd_device *rbd_dev;
1443
1444                 rbd_dev = obj_request->img_request->rbd_dev;
1445                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1446                         obj_request);
1447         }
1448 }
1449
1450 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1451 {
1452         smp_mb();
1453         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1454 }
1455
1456 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1457 {
1458         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1459                 struct rbd_device *rbd_dev = NULL;
1460
1461                 if (obj_request_img_data_test(obj_request))
1462                         rbd_dev = obj_request->img_request->rbd_dev;
1463                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1464                         obj_request);
1465         }
1466 }
1467
1468 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1469 {
1470         smp_mb();
1471         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1472 }
1473
1474 /*
1475  * This sets the KNOWN flag after (possibly) setting the EXISTS
1476  * flag.  The latter is set based on the "exists" value provided.
1477  *
1478  * Note that for our purposes once an object exists it never goes
1479  * away again.  It's possible that the response from two existence
1480  * checks are separated by the creation of the target object, and
1481  * the first ("doesn't exist") response arrives *after* the second
1482  * ("does exist").  In that case we ignore the second one.
1483  */
1484 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1485                                 bool exists)
1486 {
1487         if (exists)
1488                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1489         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1490         smp_mb();
1491 }
1492
1493 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1494 {
1495         smp_mb();
1496         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1497 }
1498
1499 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1500 {
1501         smp_mb();
1502         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1503 }
1504
1505 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1506 {
1507         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1508
1509         return obj_request->img_offset <
1510             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1511 }
1512
1513 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1514 {
1515         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1516                 kref_read(&obj_request->kref));
1517         kref_get(&obj_request->kref);
1518 }
1519
1520 static void rbd_obj_request_destroy(struct kref *kref);
1521 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1522 {
1523         rbd_assert(obj_request != NULL);
1524         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1525                 kref_read(&obj_request->kref));
1526         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1527 }
1528
1529 static void rbd_img_request_get(struct rbd_img_request *img_request)
1530 {
1531         dout("%s: img %p (was %d)\n", __func__, img_request,
1532              kref_read(&img_request->kref));
1533         kref_get(&img_request->kref);
1534 }
1535
1536 static bool img_request_child_test(struct rbd_img_request *img_request);
1537 static void rbd_parent_request_destroy(struct kref *kref);
1538 static void rbd_img_request_destroy(struct kref *kref);
1539 static void rbd_img_request_put(struct rbd_img_request *img_request)
1540 {
1541         rbd_assert(img_request != NULL);
1542         dout("%s: img %p (was %d)\n", __func__, img_request,
1543                 kref_read(&img_request->kref));
1544         if (img_request_child_test(img_request))
1545                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1546         else
1547                 kref_put(&img_request->kref, rbd_img_request_destroy);
1548 }
1549
1550 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1551                                         struct rbd_obj_request *obj_request)
1552 {
1553         rbd_assert(obj_request->img_request == NULL);
1554
1555         /* Image request now owns object's original reference */
1556         obj_request->img_request = img_request;
1557         obj_request->which = img_request->obj_request_count;
1558         rbd_assert(!obj_request_img_data_test(obj_request));
1559         obj_request_img_data_set(obj_request);
1560         rbd_assert(obj_request->which != BAD_WHICH);
1561         img_request->obj_request_count++;
1562         list_add_tail(&obj_request->links, &img_request->obj_requests);
1563         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1564                 obj_request->which);
1565 }
1566
1567 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1568                                         struct rbd_obj_request *obj_request)
1569 {
1570         rbd_assert(obj_request->which != BAD_WHICH);
1571
1572         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1573                 obj_request->which);
1574         list_del(&obj_request->links);
1575         rbd_assert(img_request->obj_request_count > 0);
1576         img_request->obj_request_count--;
1577         rbd_assert(obj_request->which == img_request->obj_request_count);
1578         obj_request->which = BAD_WHICH;
1579         rbd_assert(obj_request_img_data_test(obj_request));
1580         rbd_assert(obj_request->img_request == img_request);
1581         obj_request->img_request = NULL;
1582         obj_request->callback = NULL;
1583         rbd_obj_request_put(obj_request);
1584 }
1585
1586 static bool obj_request_type_valid(enum obj_request_type type)
1587 {
1588         switch (type) {
1589         case OBJ_REQUEST_NODATA:
1590         case OBJ_REQUEST_BIO:
1591         case OBJ_REQUEST_PAGES:
1592                 return true;
1593         default:
1594                 return false;
1595         }
1596 }
1597
1598 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1599
1600 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1601 {
1602         struct ceph_osd_request *osd_req = obj_request->osd_req;
1603
1604         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1605              obj_request, obj_request->object_no, obj_request->offset,
1606              obj_request->length, osd_req);
1607         if (obj_request_img_data_test(obj_request)) {
1608                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1609                 rbd_img_request_get(obj_request->img_request);
1610         }
1611         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1612 }
1613
1614 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1615 {
1616
1617         dout("%s: img %p\n", __func__, img_request);
1618
1619         /*
1620          * If no error occurred, compute the aggregate transfer
1621          * count for the image request.  We could instead use
1622          * atomic64_cmpxchg() to update it as each object request
1623          * completes; not clear which way is better off hand.
1624          */
1625         if (!img_request->result) {
1626                 struct rbd_obj_request *obj_request;
1627                 u64 xferred = 0;
1628
1629                 for_each_obj_request(img_request, obj_request)
1630                         xferred += obj_request->xferred;
1631                 img_request->xferred = xferred;
1632         }
1633
1634         if (img_request->callback)
1635                 img_request->callback(img_request);
1636         else
1637                 rbd_img_request_put(img_request);
1638 }
1639
1640 /*
1641  * The default/initial value for all image request flags is 0.  Each
1642  * is conditionally set to 1 at image request initialization time
1643  * and currently never change thereafter.
1644  */
1645 static void img_request_write_set(struct rbd_img_request *img_request)
1646 {
1647         set_bit(IMG_REQ_WRITE, &img_request->flags);
1648         smp_mb();
1649 }
1650
1651 static bool img_request_write_test(struct rbd_img_request *img_request)
1652 {
1653         smp_mb();
1654         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1655 }
1656
1657 /*
1658  * Set the discard flag when the img_request is an discard request
1659  */
1660 static void img_request_discard_set(struct rbd_img_request *img_request)
1661 {
1662         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1663         smp_mb();
1664 }
1665
1666 static bool img_request_discard_test(struct rbd_img_request *img_request)
1667 {
1668         smp_mb();
1669         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1670 }
1671
1672 static void img_request_child_set(struct rbd_img_request *img_request)
1673 {
1674         set_bit(IMG_REQ_CHILD, &img_request->flags);
1675         smp_mb();
1676 }
1677
1678 static void img_request_child_clear(struct rbd_img_request *img_request)
1679 {
1680         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1681         smp_mb();
1682 }
1683
1684 static bool img_request_child_test(struct rbd_img_request *img_request)
1685 {
1686         smp_mb();
1687         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1688 }
1689
1690 static void img_request_layered_set(struct rbd_img_request *img_request)
1691 {
1692         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1693         smp_mb();
1694 }
1695
1696 static void img_request_layered_clear(struct rbd_img_request *img_request)
1697 {
1698         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1699         smp_mb();
1700 }
1701
1702 static bool img_request_layered_test(struct rbd_img_request *img_request)
1703 {
1704         smp_mb();
1705         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1706 }
1707
1708 static enum obj_operation_type
1709 rbd_img_request_op_type(struct rbd_img_request *img_request)
1710 {
1711         if (img_request_write_test(img_request))
1712                 return OBJ_OP_WRITE;
1713         else if (img_request_discard_test(img_request))
1714                 return OBJ_OP_DISCARD;
1715         else
1716                 return OBJ_OP_READ;
1717 }
1718
1719 static void
1720 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1721 {
1722         u64 xferred = obj_request->xferred;
1723         u64 length = obj_request->length;
1724
1725         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1726                 obj_request, obj_request->img_request, obj_request->result,
1727                 xferred, length);
1728         /*
1729          * ENOENT means a hole in the image.  We zero-fill the entire
1730          * length of the request.  A short read also implies zero-fill
1731          * to the end of the request.  An error requires the whole
1732          * length of the request to be reported finished with an error
1733          * to the block layer.  In each case we update the xferred
1734          * count to indicate the whole request was satisfied.
1735          */
1736         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1737         if (obj_request->result == -ENOENT) {
1738                 if (obj_request->type == OBJ_REQUEST_BIO)
1739                         zero_bio_chain(obj_request->bio_list, 0);
1740                 else
1741                         zero_pages(obj_request->pages, 0, length);
1742                 obj_request->result = 0;
1743         } else if (xferred < length && !obj_request->result) {
1744                 if (obj_request->type == OBJ_REQUEST_BIO)
1745                         zero_bio_chain(obj_request->bio_list, xferred);
1746                 else
1747                         zero_pages(obj_request->pages, xferred, length);
1748         }
1749         obj_request->xferred = length;
1750         obj_request_done_set(obj_request);
1751 }
1752
1753 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1754 {
1755         dout("%s: obj %p cb %p\n", __func__, obj_request,
1756                 obj_request->callback);
1757         if (obj_request->callback)
1758                 obj_request->callback(obj_request);
1759         else
1760                 complete_all(&obj_request->completion);
1761 }
1762
1763 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1764 {
1765         obj_request->result = err;
1766         obj_request->xferred = 0;
1767         /*
1768          * kludge - mirror rbd_obj_request_submit() to match a put in
1769          * rbd_img_obj_callback()
1770          */
1771         if (obj_request_img_data_test(obj_request)) {
1772                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1773                 rbd_img_request_get(obj_request->img_request);
1774         }
1775         obj_request_done_set(obj_request);
1776         rbd_obj_request_complete(obj_request);
1777 }
1778
1779 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1780 {
1781         struct rbd_img_request *img_request = NULL;
1782         struct rbd_device *rbd_dev = NULL;
1783         bool layered = false;
1784
1785         if (obj_request_img_data_test(obj_request)) {
1786                 img_request = obj_request->img_request;
1787                 layered = img_request && img_request_layered_test(img_request);
1788                 rbd_dev = img_request->rbd_dev;
1789         }
1790
1791         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1792                 obj_request, img_request, obj_request->result,
1793                 obj_request->xferred, obj_request->length);
1794         if (layered && obj_request->result == -ENOENT &&
1795                         obj_request->img_offset < rbd_dev->parent_overlap)
1796                 rbd_img_parent_read(obj_request);
1797         else if (img_request)
1798                 rbd_img_obj_request_read_callback(obj_request);
1799         else
1800                 obj_request_done_set(obj_request);
1801 }
1802
1803 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1804 {
1805         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1806                 obj_request->result, obj_request->length);
1807         /*
1808          * There is no such thing as a successful short write.  Set
1809          * it to our originally-requested length.
1810          */
1811         obj_request->xferred = obj_request->length;
1812         obj_request_done_set(obj_request);
1813 }
1814
1815 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1816 {
1817         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1818                 obj_request->result, obj_request->length);
1819         /*
1820          * There is no such thing as a successful short discard.  Set
1821          * it to our originally-requested length.
1822          */
1823         obj_request->xferred = obj_request->length;
1824         /* discarding a non-existent object is not a problem */
1825         if (obj_request->result == -ENOENT)
1826                 obj_request->result = 0;
1827         obj_request_done_set(obj_request);
1828 }
1829
1830 /*
1831  * For a simple stat call there's nothing to do.  We'll do more if
1832  * this is part of a write sequence for a layered image.
1833  */
1834 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1835 {
1836         dout("%s: obj %p\n", __func__, obj_request);
1837         obj_request_done_set(obj_request);
1838 }
1839
1840 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1841 {
1842         dout("%s: obj %p\n", __func__, obj_request);
1843
1844         if (obj_request_img_data_test(obj_request))
1845                 rbd_osd_copyup_callback(obj_request);
1846         else
1847                 obj_request_done_set(obj_request);
1848 }
1849
1850 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1851 {
1852         struct rbd_obj_request *obj_request = osd_req->r_priv;
1853         u16 opcode;
1854
1855         dout("%s: osd_req %p\n", __func__, osd_req);
1856         rbd_assert(osd_req == obj_request->osd_req);
1857         if (obj_request_img_data_test(obj_request)) {
1858                 rbd_assert(obj_request->img_request);
1859                 rbd_assert(obj_request->which != BAD_WHICH);
1860         } else {
1861                 rbd_assert(obj_request->which == BAD_WHICH);
1862         }
1863
1864         if (osd_req->r_result < 0)
1865                 obj_request->result = osd_req->r_result;
1866
1867         /*
1868          * We support a 64-bit length, but ultimately it has to be
1869          * passed to the block layer, which just supports a 32-bit
1870          * length field.
1871          */
1872         obj_request->xferred = osd_req->r_ops[0].outdata_len;
1873         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1874
1875         opcode = osd_req->r_ops[0].op;
1876         switch (opcode) {
1877         case CEPH_OSD_OP_READ:
1878                 rbd_osd_read_callback(obj_request);
1879                 break;
1880         case CEPH_OSD_OP_SETALLOCHINT:
1881                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1882                            osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1883                 /* fall through */
1884         case CEPH_OSD_OP_WRITE:
1885         case CEPH_OSD_OP_WRITEFULL:
1886                 rbd_osd_write_callback(obj_request);
1887                 break;
1888         case CEPH_OSD_OP_STAT:
1889                 rbd_osd_stat_callback(obj_request);
1890                 break;
1891         case CEPH_OSD_OP_DELETE:
1892         case CEPH_OSD_OP_TRUNCATE:
1893         case CEPH_OSD_OP_ZERO:
1894                 rbd_osd_discard_callback(obj_request);
1895                 break;
1896         case CEPH_OSD_OP_CALL:
1897                 rbd_osd_call_callback(obj_request);
1898                 break;
1899         default:
1900                 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1901                          obj_request->object_no, opcode);
1902                 break;
1903         }
1904
1905         if (obj_request_done_test(obj_request))
1906                 rbd_obj_request_complete(obj_request);
1907 }
1908
1909 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1910 {
1911         struct ceph_osd_request *osd_req = obj_request->osd_req;
1912
1913         rbd_assert(obj_request_img_data_test(obj_request));
1914         osd_req->r_snapid = obj_request->img_request->snap_id;
1915 }
1916
1917 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1918 {
1919         struct ceph_osd_request *osd_req = obj_request->osd_req;
1920
1921         ktime_get_real_ts(&osd_req->r_mtime);
1922         osd_req->r_data_offset = obj_request->offset;
1923 }
1924
1925 static struct ceph_osd_request *
1926 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1927                      struct ceph_snap_context *snapc,
1928                      int num_ops, unsigned int flags,
1929                      struct rbd_obj_request *obj_request)
1930 {
1931         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1932         struct ceph_osd_request *req;
1933         const char *name_format = rbd_dev->image_format == 1 ?
1934                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1935
1936         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1937         if (!req)
1938                 return NULL;
1939
1940         req->r_flags = flags;
1941         req->r_callback = rbd_osd_req_callback;
1942         req->r_priv = obj_request;
1943
1944         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1945         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1946                         rbd_dev->header.object_prefix, obj_request->object_no))
1947                 goto err_req;
1948
1949         if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1950                 goto err_req;
1951
1952         return req;
1953
1954 err_req:
1955         ceph_osdc_put_request(req);
1956         return NULL;
1957 }
1958
1959 /*
1960  * Create an osd request.  A read request has one osd op (read).
1961  * A write request has either one (watch) or two (hint+write) osd ops.
1962  * (All rbd data writes are prefixed with an allocation hint op, but
1963  * technically osd watch is a write request, hence this distinction.)
1964  */
1965 static struct ceph_osd_request *rbd_osd_req_create(
1966                                         struct rbd_device *rbd_dev,
1967                                         enum obj_operation_type op_type,
1968                                         unsigned int num_ops,
1969                                         struct rbd_obj_request *obj_request)
1970 {
1971         struct ceph_snap_context *snapc = NULL;
1972
1973         if (obj_request_img_data_test(obj_request) &&
1974                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1975                 struct rbd_img_request *img_request = obj_request->img_request;
1976                 if (op_type == OBJ_OP_WRITE) {
1977                         rbd_assert(img_request_write_test(img_request));
1978                 } else {
1979                         rbd_assert(img_request_discard_test(img_request));
1980                 }
1981                 snapc = img_request->snapc;
1982         }
1983
1984         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1985
1986         return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1987             (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1988             CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1989 }
1990
1991 /*
1992  * Create a copyup osd request based on the information in the object
1993  * request supplied.  A copyup request has two or three osd ops, a
1994  * copyup method call, potentially a hint op, and a write or truncate
1995  * or zero op.
1996  */
1997 static struct ceph_osd_request *
1998 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1999 {
2000         struct rbd_img_request *img_request;
2001         int num_osd_ops = 3;
2002
2003         rbd_assert(obj_request_img_data_test(obj_request));
2004         img_request = obj_request->img_request;
2005         rbd_assert(img_request);
2006         rbd_assert(img_request_write_test(img_request) ||
2007                         img_request_discard_test(img_request));
2008
2009         if (img_request_discard_test(img_request))
2010                 num_osd_ops = 2;
2011
2012         return __rbd_osd_req_create(img_request->rbd_dev,
2013                                     img_request->snapc, num_osd_ops,
2014                                     CEPH_OSD_FLAG_WRITE, obj_request);
2015 }
2016
2017 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2018 {
2019         ceph_osdc_put_request(osd_req);
2020 }
2021
2022 static struct rbd_obj_request *
2023 rbd_obj_request_create(enum obj_request_type type)
2024 {
2025         struct rbd_obj_request *obj_request;
2026
2027         rbd_assert(obj_request_type_valid(type));
2028
2029         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2030         if (!obj_request)
2031                 return NULL;
2032
2033         obj_request->which = BAD_WHICH;
2034         obj_request->type = type;
2035         INIT_LIST_HEAD(&obj_request->links);
2036         init_completion(&obj_request->completion);
2037         kref_init(&obj_request->kref);
2038
2039         dout("%s %p\n", __func__, obj_request);
2040         return obj_request;
2041 }
2042
2043 static void rbd_obj_request_destroy(struct kref *kref)
2044 {
2045         struct rbd_obj_request *obj_request;
2046
2047         obj_request = container_of(kref, struct rbd_obj_request, kref);
2048
2049         dout("%s: obj %p\n", __func__, obj_request);
2050
2051         rbd_assert(obj_request->img_request == NULL);
2052         rbd_assert(obj_request->which == BAD_WHICH);
2053
2054         if (obj_request->osd_req)
2055                 rbd_osd_req_destroy(obj_request->osd_req);
2056
2057         rbd_assert(obj_request_type_valid(obj_request->type));
2058         switch (obj_request->type) {
2059         case OBJ_REQUEST_NODATA:
2060                 break;          /* Nothing to do */
2061         case OBJ_REQUEST_BIO:
2062                 if (obj_request->bio_list)
2063                         bio_chain_put(obj_request->bio_list);
2064                 break;
2065         case OBJ_REQUEST_PAGES:
2066                 /* img_data requests don't own their page array */
2067                 if (obj_request->pages &&
2068                     !obj_request_img_data_test(obj_request))
2069                         ceph_release_page_vector(obj_request->pages,
2070                                                 obj_request->page_count);
2071                 break;
2072         }
2073
2074         kmem_cache_free(rbd_obj_request_cache, obj_request);
2075 }
2076
2077 /* It's OK to call this for a device with no parent */
2078
2079 static void rbd_spec_put(struct rbd_spec *spec);
2080 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2081 {
2082         rbd_dev_remove_parent(rbd_dev);
2083         rbd_spec_put(rbd_dev->parent_spec);
2084         rbd_dev->parent_spec = NULL;
2085         rbd_dev->parent_overlap = 0;
2086 }
2087
2088 /*
2089  * Parent image reference counting is used to determine when an
2090  * image's parent fields can be safely torn down--after there are no
2091  * more in-flight requests to the parent image.  When the last
2092  * reference is dropped, cleaning them up is safe.
2093  */
2094 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2095 {
2096         int counter;
2097
2098         if (!rbd_dev->parent_spec)
2099                 return;
2100
2101         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2102         if (counter > 0)
2103                 return;
2104
2105         /* Last reference; clean up parent data structures */
2106
2107         if (!counter)
2108                 rbd_dev_unparent(rbd_dev);
2109         else
2110                 rbd_warn(rbd_dev, "parent reference underflow");
2111 }
2112
2113 /*
2114  * If an image has a non-zero parent overlap, get a reference to its
2115  * parent.
2116  *
2117  * Returns true if the rbd device has a parent with a non-zero
2118  * overlap and a reference for it was successfully taken, or
2119  * false otherwise.
2120  */
2121 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2122 {
2123         int counter = 0;
2124
2125         if (!rbd_dev->parent_spec)
2126                 return false;
2127
2128         down_read(&rbd_dev->header_rwsem);
2129         if (rbd_dev->parent_overlap)
2130                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2131         up_read(&rbd_dev->header_rwsem);
2132
2133         if (counter < 0)
2134                 rbd_warn(rbd_dev, "parent reference overflow");
2135
2136         return counter > 0;
2137 }
2138
2139 /*
2140  * Caller is responsible for filling in the list of object requests
2141  * that comprises the image request, and the Linux request pointer
2142  * (if there is one).
2143  */
2144 static struct rbd_img_request *rbd_img_request_create(
2145                                         struct rbd_device *rbd_dev,
2146                                         u64 offset, u64 length,
2147                                         enum obj_operation_type op_type,
2148                                         struct ceph_snap_context *snapc)
2149 {
2150         struct rbd_img_request *img_request;
2151
2152         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2153         if (!img_request)
2154                 return NULL;
2155
2156         img_request->rq = NULL;
2157         img_request->rbd_dev = rbd_dev;
2158         img_request->offset = offset;
2159         img_request->length = length;
2160         img_request->flags = 0;
2161         if (op_type == OBJ_OP_DISCARD) {
2162                 img_request_discard_set(img_request);
2163                 img_request->snapc = snapc;
2164         } else if (op_type == OBJ_OP_WRITE) {
2165                 img_request_write_set(img_request);
2166                 img_request->snapc = snapc;
2167         } else {
2168                 img_request->snap_id = rbd_dev->spec->snap_id;
2169         }
2170         if (rbd_dev_parent_get(rbd_dev))
2171                 img_request_layered_set(img_request);
2172         spin_lock_init(&img_request->completion_lock);
2173         img_request->next_completion = 0;
2174         img_request->callback = NULL;
2175         img_request->result = 0;
2176         img_request->obj_request_count = 0;
2177         INIT_LIST_HEAD(&img_request->obj_requests);
2178         kref_init(&img_request->kref);
2179
2180         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2181                 obj_op_name(op_type), offset, length, img_request);
2182
2183         return img_request;
2184 }
2185
2186 static void rbd_img_request_destroy(struct kref *kref)
2187 {
2188         struct rbd_img_request *img_request;
2189         struct rbd_obj_request *obj_request;
2190         struct rbd_obj_request *next_obj_request;
2191
2192         img_request = container_of(kref, struct rbd_img_request, kref);
2193
2194         dout("%s: img %p\n", __func__, img_request);
2195
2196         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2197                 rbd_img_obj_request_del(img_request, obj_request);
2198         rbd_assert(img_request->obj_request_count == 0);
2199
2200         if (img_request_layered_test(img_request)) {
2201                 img_request_layered_clear(img_request);
2202                 rbd_dev_parent_put(img_request->rbd_dev);
2203         }
2204
2205         if (img_request_write_test(img_request) ||
2206                 img_request_discard_test(img_request))
2207                 ceph_put_snap_context(img_request->snapc);
2208
2209         kmem_cache_free(rbd_img_request_cache, img_request);
2210 }
2211
2212 static struct rbd_img_request *rbd_parent_request_create(
2213                                         struct rbd_obj_request *obj_request,
2214                                         u64 img_offset, u64 length)
2215 {
2216         struct rbd_img_request *parent_request;
2217         struct rbd_device *rbd_dev;
2218
2219         rbd_assert(obj_request->img_request);
2220         rbd_dev = obj_request->img_request->rbd_dev;
2221
2222         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2223                                                 length, OBJ_OP_READ, NULL);
2224         if (!parent_request)
2225                 return NULL;
2226
2227         img_request_child_set(parent_request);
2228         rbd_obj_request_get(obj_request);
2229         parent_request->obj_request = obj_request;
2230
2231         return parent_request;
2232 }
2233
2234 static void rbd_parent_request_destroy(struct kref *kref)
2235 {
2236         struct rbd_img_request *parent_request;
2237         struct rbd_obj_request *orig_request;
2238
2239         parent_request = container_of(kref, struct rbd_img_request, kref);
2240         orig_request = parent_request->obj_request;
2241
2242         parent_request->obj_request = NULL;
2243         rbd_obj_request_put(orig_request);
2244         img_request_child_clear(parent_request);
2245
2246         rbd_img_request_destroy(kref);
2247 }
2248
2249 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2250 {
2251         struct rbd_img_request *img_request;
2252         unsigned int xferred;
2253         int result;
2254         bool more;
2255
2256         rbd_assert(obj_request_img_data_test(obj_request));
2257         img_request = obj_request->img_request;
2258
2259         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2260         xferred = (unsigned int)obj_request->xferred;
2261         result = obj_request->result;
2262         if (result) {
2263                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2264                 enum obj_operation_type op_type;
2265
2266                 if (img_request_discard_test(img_request))
2267                         op_type = OBJ_OP_DISCARD;
2268                 else if (img_request_write_test(img_request))
2269                         op_type = OBJ_OP_WRITE;
2270                 else
2271                         op_type = OBJ_OP_READ;
2272
2273                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2274                         obj_op_name(op_type), obj_request->length,
2275                         obj_request->img_offset, obj_request->offset);
2276                 rbd_warn(rbd_dev, "  result %d xferred %x",
2277                         result, xferred);
2278                 if (!img_request->result)
2279                         img_request->result = result;
2280                 /*
2281                  * Need to end I/O on the entire obj_request worth of
2282                  * bytes in case of error.
2283                  */
2284                 xferred = obj_request->length;
2285         }
2286
2287         if (img_request_child_test(img_request)) {
2288                 rbd_assert(img_request->obj_request != NULL);
2289                 more = obj_request->which < img_request->obj_request_count - 1;
2290         } else {
2291                 blk_status_t status = errno_to_blk_status(result);
2292
2293                 rbd_assert(img_request->rq != NULL);
2294
2295                 more = blk_update_request(img_request->rq, status, xferred);
2296                 if (!more)
2297                         __blk_mq_end_request(img_request->rq, status);
2298         }
2299
2300         return more;
2301 }
2302
2303 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2304 {
2305         struct rbd_img_request *img_request;
2306         u32 which = obj_request->which;
2307         bool more = true;
2308
2309         rbd_assert(obj_request_img_data_test(obj_request));
2310         img_request = obj_request->img_request;
2311
2312         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2313         rbd_assert(img_request != NULL);
2314         rbd_assert(img_request->obj_request_count > 0);
2315         rbd_assert(which != BAD_WHICH);
2316         rbd_assert(which < img_request->obj_request_count);
2317
2318         spin_lock_irq(&img_request->completion_lock);
2319         if (which != img_request->next_completion)
2320                 goto out;
2321
2322         for_each_obj_request_from(img_request, obj_request) {
2323                 rbd_assert(more);
2324                 rbd_assert(which < img_request->obj_request_count);
2325
2326                 if (!obj_request_done_test(obj_request))
2327                         break;
2328                 more = rbd_img_obj_end_request(obj_request);
2329                 which++;
2330         }
2331
2332         rbd_assert(more ^ (which == img_request->obj_request_count));
2333         img_request->next_completion = which;
2334 out:
2335         spin_unlock_irq(&img_request->completion_lock);
2336         rbd_img_request_put(img_request);
2337
2338         if (!more)
2339                 rbd_img_request_complete(img_request);
2340 }
2341
2342 /*
2343  * Add individual osd ops to the given ceph_osd_request and prepare
2344  * them for submission. num_ops is the current number of
2345  * osd operations already to the object request.
2346  */
2347 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2348                                 struct ceph_osd_request *osd_request,
2349                                 enum obj_operation_type op_type,
2350                                 unsigned int num_ops)
2351 {
2352         struct rbd_img_request *img_request = obj_request->img_request;
2353         struct rbd_device *rbd_dev = img_request->rbd_dev;
2354         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2355         u64 offset = obj_request->offset;
2356         u64 length = obj_request->length;
2357         u64 img_end;
2358         u16 opcode;
2359
2360         if (op_type == OBJ_OP_DISCARD) {
2361                 if (!offset && length == object_size &&
2362                     (!img_request_layered_test(img_request) ||
2363                      !obj_request_overlaps_parent(obj_request))) {
2364                         opcode = CEPH_OSD_OP_DELETE;
2365                 } else if ((offset + length == object_size)) {
2366                         opcode = CEPH_OSD_OP_TRUNCATE;
2367                 } else {
2368                         down_read(&rbd_dev->header_rwsem);
2369                         img_end = rbd_dev->header.image_size;
2370                         up_read(&rbd_dev->header_rwsem);
2371
2372                         if (obj_request->img_offset + length == img_end)
2373                                 opcode = CEPH_OSD_OP_TRUNCATE;
2374                         else
2375                                 opcode = CEPH_OSD_OP_ZERO;
2376                 }
2377         } else if (op_type == OBJ_OP_WRITE) {
2378                 if (!offset && length == object_size)
2379                         opcode = CEPH_OSD_OP_WRITEFULL;
2380                 else
2381                         opcode = CEPH_OSD_OP_WRITE;
2382                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2383                                         object_size, object_size);
2384                 num_ops++;
2385         } else {
2386                 opcode = CEPH_OSD_OP_READ;
2387         }
2388
2389         if (opcode == CEPH_OSD_OP_DELETE)
2390                 osd_req_op_init(osd_request, num_ops, opcode, 0);
2391         else
2392                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2393                                        offset, length, 0, 0);
2394
2395         if (obj_request->type == OBJ_REQUEST_BIO)
2396                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2397                                         obj_request->bio_list, length);
2398         else if (obj_request->type == OBJ_REQUEST_PAGES)
2399                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2400                                         obj_request->pages, length,
2401                                         offset & ~PAGE_MASK, false, false);
2402
2403         /* Discards are also writes */
2404         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2405                 rbd_osd_req_format_write(obj_request);
2406         else
2407                 rbd_osd_req_format_read(obj_request);
2408 }
2409
2410 /*
2411  * Split up an image request into one or more object requests, each
2412  * to a different object.  The "type" parameter indicates whether
2413  * "data_desc" is the pointer to the head of a list of bio
2414  * structures, or the base of a page array.  In either case this
2415  * function assumes data_desc describes memory sufficient to hold
2416  * all data described by the image request.
2417  */
2418 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2419                                         enum obj_request_type type,
2420                                         void *data_desc)
2421 {
2422         struct rbd_device *rbd_dev = img_request->rbd_dev;
2423         struct rbd_obj_request *obj_request = NULL;
2424         struct rbd_obj_request *next_obj_request;
2425         struct bio *bio_list = NULL;
2426         unsigned int bio_offset = 0;
2427         struct page **pages = NULL;
2428         enum obj_operation_type op_type;
2429         u64 img_offset;
2430         u64 resid;
2431
2432         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2433                 (int)type, data_desc);
2434
2435         img_offset = img_request->offset;
2436         resid = img_request->length;
2437         rbd_assert(resid > 0);
2438         op_type = rbd_img_request_op_type(img_request);
2439
2440         if (type == OBJ_REQUEST_BIO) {
2441                 bio_list = data_desc;
2442                 rbd_assert(img_offset ==
2443                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2444         } else if (type == OBJ_REQUEST_PAGES) {
2445                 pages = data_desc;
2446         }
2447
2448         while (resid) {
2449                 struct ceph_osd_request *osd_req;
2450                 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2451                 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2452                 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2453
2454                 obj_request = rbd_obj_request_create(type);
2455                 if (!obj_request)
2456                         goto out_unwind;
2457
2458                 obj_request->object_no = object_no;
2459                 obj_request->offset = offset;
2460                 obj_request->length = length;
2461
2462                 /*
2463                  * set obj_request->img_request before creating the
2464                  * osd_request so that it gets the right snapc
2465                  */
2466                 rbd_img_obj_request_add(img_request, obj_request);
2467
2468                 if (type == OBJ_REQUEST_BIO) {
2469                         unsigned int clone_size;
2470
2471                         rbd_assert(length <= (u64)UINT_MAX);
2472                         clone_size = (unsigned int)length;
2473                         obj_request->bio_list =
2474                                         bio_chain_clone_range(&bio_list,
2475                                                                 &bio_offset,
2476                                                                 clone_size,
2477                                                                 GFP_NOIO);
2478                         if (!obj_request->bio_list)
2479                                 goto out_unwind;
2480                 } else if (type == OBJ_REQUEST_PAGES) {
2481                         unsigned int page_count;
2482
2483                         obj_request->pages = pages;
2484                         page_count = (u32)calc_pages_for(offset, length);
2485                         obj_request->page_count = page_count;
2486                         if ((offset + length) & ~PAGE_MASK)
2487                                 page_count--;   /* more on last page */
2488                         pages += page_count;
2489                 }
2490
2491                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2492                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2493                                         obj_request);
2494                 if (!osd_req)
2495                         goto out_unwind;
2496
2497                 obj_request->osd_req = osd_req;
2498                 obj_request->callback = rbd_img_obj_callback;
2499                 obj_request->img_offset = img_offset;
2500
2501                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2502
2503                 img_offset += length;
2504                 resid -= length;
2505         }
2506
2507         return 0;
2508
2509 out_unwind:
2510         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2511                 rbd_img_obj_request_del(img_request, obj_request);
2512
2513         return -ENOMEM;
2514 }
2515
2516 static void
2517 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2518 {
2519         struct rbd_img_request *img_request;
2520         struct rbd_device *rbd_dev;
2521         struct page **pages;
2522         u32 page_count;
2523
2524         dout("%s: obj %p\n", __func__, obj_request);
2525
2526         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2527                 obj_request->type == OBJ_REQUEST_NODATA);
2528         rbd_assert(obj_request_img_data_test(obj_request));
2529         img_request = obj_request->img_request;
2530         rbd_assert(img_request);
2531
2532         rbd_dev = img_request->rbd_dev;
2533         rbd_assert(rbd_dev);
2534
2535         pages = obj_request->copyup_pages;
2536         rbd_assert(pages != NULL);
2537         obj_request->copyup_pages = NULL;
2538         page_count = obj_request->copyup_page_count;
2539         rbd_assert(page_count);
2540         obj_request->copyup_page_count = 0;
2541         ceph_release_page_vector(pages, page_count);
2542
2543         /*
2544          * We want the transfer count to reflect the size of the
2545          * original write request.  There is no such thing as a
2546          * successful short write, so if the request was successful
2547          * we can just set it to the originally-requested length.
2548          */
2549         if (!obj_request->result)
2550                 obj_request->xferred = obj_request->length;
2551
2552         obj_request_done_set(obj_request);
2553 }
2554
2555 static void
2556 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2557 {
2558         struct rbd_obj_request *orig_request;
2559         struct ceph_osd_request *osd_req;
2560         struct rbd_device *rbd_dev;
2561         struct page **pages;
2562         enum obj_operation_type op_type;
2563         u32 page_count;
2564         int img_result;
2565         u64 parent_length;
2566
2567         rbd_assert(img_request_child_test(img_request));
2568
2569         /* First get what we need from the image request */
2570
2571         pages = img_request->copyup_pages;
2572         rbd_assert(pages != NULL);
2573         img_request->copyup_pages = NULL;
2574         page_count = img_request->copyup_page_count;
2575         rbd_assert(page_count);
2576         img_request->copyup_page_count = 0;
2577
2578         orig_request = img_request->obj_request;
2579         rbd_assert(orig_request != NULL);
2580         rbd_assert(obj_request_type_valid(orig_request->type));
2581         img_result = img_request->result;
2582         parent_length = img_request->length;
2583         rbd_assert(img_result || parent_length == img_request->xferred);
2584         rbd_img_request_put(img_request);
2585
2586         rbd_assert(orig_request->img_request);
2587         rbd_dev = orig_request->img_request->rbd_dev;
2588         rbd_assert(rbd_dev);
2589
2590         /*
2591          * If the overlap has become 0 (most likely because the
2592          * image has been flattened) we need to free the pages
2593          * and re-submit the original write request.
2594          */
2595         if (!rbd_dev->parent_overlap) {
2596                 ceph_release_page_vector(pages, page_count);
2597                 rbd_obj_request_submit(orig_request);
2598                 return;
2599         }
2600
2601         if (img_result)
2602                 goto out_err;
2603
2604         /*
2605          * The original osd request is of no use to use any more.
2606          * We need a new one that can hold the three ops in a copyup
2607          * request.  Allocate the new copyup osd request for the
2608          * original request, and release the old one.
2609          */
2610         img_result = -ENOMEM;
2611         osd_req = rbd_osd_req_create_copyup(orig_request);
2612         if (!osd_req)
2613                 goto out_err;
2614         rbd_osd_req_destroy(orig_request->osd_req);
2615         orig_request->osd_req = osd_req;
2616         orig_request->copyup_pages = pages;
2617         orig_request->copyup_page_count = page_count;
2618
2619         /* Initialize the copyup op */
2620
2621         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2622         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2623                                                 false, false);
2624
2625         /* Add the other op(s) */
2626
2627         op_type = rbd_img_request_op_type(orig_request->img_request);
2628         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2629
2630         /* All set, send it off. */
2631
2632         rbd_obj_request_submit(orig_request);
2633         return;
2634
2635 out_err:
2636         ceph_release_page_vector(pages, page_count);
2637         rbd_obj_request_error(orig_request, img_result);
2638 }
2639
2640 /*
2641  * Read from the parent image the range of data that covers the
2642  * entire target of the given object request.  This is used for
2643  * satisfying a layered image write request when the target of an
2644  * object request from the image request does not exist.
2645  *
2646  * A page array big enough to hold the returned data is allocated
2647  * and supplied to rbd_img_request_fill() as the "data descriptor."
2648  * When the read completes, this page array will be transferred to
2649  * the original object request for the copyup operation.
2650  *
2651  * If an error occurs, it is recorded as the result of the original
2652  * object request in rbd_img_obj_exists_callback().
2653  */
2654 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2655 {
2656         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2657         struct rbd_img_request *parent_request = NULL;
2658         u64 img_offset;
2659         u64 length;
2660         struct page **pages = NULL;
2661         u32 page_count;
2662         int result;
2663
2664         rbd_assert(rbd_dev->parent != NULL);
2665
2666         /*
2667          * Determine the byte range covered by the object in the
2668          * child image to which the original request was to be sent.
2669          */
2670         img_offset = obj_request->img_offset - obj_request->offset;
2671         length = rbd_obj_bytes(&rbd_dev->header);
2672
2673         /*
2674          * There is no defined parent data beyond the parent
2675          * overlap, so limit what we read at that boundary if
2676          * necessary.
2677          */
2678         if (img_offset + length > rbd_dev->parent_overlap) {
2679                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2680                 length = rbd_dev->parent_overlap - img_offset;
2681         }
2682
2683         /*
2684          * Allocate a page array big enough to receive the data read
2685          * from the parent.
2686          */
2687         page_count = (u32)calc_pages_for(0, length);
2688         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2689         if (IS_ERR(pages)) {
2690                 result = PTR_ERR(pages);
2691                 pages = NULL;
2692                 goto out_err;
2693         }
2694
2695         result = -ENOMEM;
2696         parent_request = rbd_parent_request_create(obj_request,
2697                                                 img_offset, length);
2698         if (!parent_request)
2699                 goto out_err;
2700
2701         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2702         if (result)
2703                 goto out_err;
2704
2705         parent_request->copyup_pages = pages;
2706         parent_request->copyup_page_count = page_count;
2707         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2708
2709         result = rbd_img_request_submit(parent_request);
2710         if (!result)
2711                 return 0;
2712
2713         parent_request->copyup_pages = NULL;
2714         parent_request->copyup_page_count = 0;
2715         parent_request->obj_request = NULL;
2716         rbd_obj_request_put(obj_request);
2717 out_err:
2718         if (pages)
2719                 ceph_release_page_vector(pages, page_count);
2720         if (parent_request)
2721                 rbd_img_request_put(parent_request);
2722         return result;
2723 }
2724
2725 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2726 {
2727         struct rbd_obj_request *orig_request;
2728         struct rbd_device *rbd_dev;
2729         int result;
2730
2731         rbd_assert(!obj_request_img_data_test(obj_request));
2732
2733         /*
2734          * All we need from the object request is the original
2735          * request and the result of the STAT op.  Grab those, then
2736          * we're done with the request.
2737          */
2738         orig_request = obj_request->obj_request;
2739         obj_request->obj_request = NULL;
2740         rbd_obj_request_put(orig_request);
2741         rbd_assert(orig_request);
2742         rbd_assert(orig_request->img_request);
2743
2744         result = obj_request->result;
2745         obj_request->result = 0;
2746
2747         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2748                 obj_request, orig_request, result,
2749                 obj_request->xferred, obj_request->length);
2750         rbd_obj_request_put(obj_request);
2751
2752         /*
2753          * If the overlap has become 0 (most likely because the
2754          * image has been flattened) we need to re-submit the
2755          * original request.
2756          */
2757         rbd_dev = orig_request->img_request->rbd_dev;
2758         if (!rbd_dev->parent_overlap) {
2759                 rbd_obj_request_submit(orig_request);
2760                 return;
2761         }
2762
2763         /*
2764          * Our only purpose here is to determine whether the object
2765          * exists, and we don't want to treat the non-existence as
2766          * an error.  If something else comes back, transfer the
2767          * error to the original request and complete it now.
2768          */
2769         if (!result) {
2770                 obj_request_existence_set(orig_request, true);
2771         } else if (result == -ENOENT) {
2772                 obj_request_existence_set(orig_request, false);
2773         } else {
2774                 goto fail_orig_request;
2775         }
2776
2777         /*
2778          * Resubmit the original request now that we have recorded
2779          * whether the target object exists.
2780          */
2781         result = rbd_img_obj_request_submit(orig_request);
2782         if (result)
2783                 goto fail_orig_request;
2784
2785         return;
2786
2787 fail_orig_request:
2788         rbd_obj_request_error(orig_request, result);
2789 }
2790
2791 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2792 {
2793         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2794         struct rbd_obj_request *stat_request;
2795         struct page **pages;
2796         u32 page_count;
2797         size_t size;
2798         int ret;
2799
2800         stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2801         if (!stat_request)
2802                 return -ENOMEM;
2803
2804         stat_request->object_no = obj_request->object_no;
2805
2806         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2807                                                    stat_request);
2808         if (!stat_request->osd_req) {
2809                 ret = -ENOMEM;
2810                 goto fail_stat_request;
2811         }
2812
2813         /*
2814          * The response data for a STAT call consists of:
2815          *     le64 length;
2816          *     struct {
2817          *         le32 tv_sec;
2818          *         le32 tv_nsec;
2819          *     } mtime;
2820          */
2821         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2822         page_count = (u32)calc_pages_for(0, size);
2823         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2824         if (IS_ERR(pages)) {
2825                 ret = PTR_ERR(pages);
2826                 goto fail_stat_request;
2827         }
2828
2829         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2830         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2831                                      false, false);
2832
2833         rbd_obj_request_get(obj_request);
2834         stat_request->obj_request = obj_request;
2835         stat_request->pages = pages;
2836         stat_request->page_count = page_count;
2837         stat_request->callback = rbd_img_obj_exists_callback;
2838
2839         rbd_obj_request_submit(stat_request);
2840         return 0;
2841
2842 fail_stat_request:
2843         rbd_obj_request_put(stat_request);
2844         return ret;
2845 }
2846
2847 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2848 {
2849         struct rbd_img_request *img_request = obj_request->img_request;
2850         struct rbd_device *rbd_dev = img_request->rbd_dev;
2851
2852         /* Reads */
2853         if (!img_request_write_test(img_request) &&
2854             !img_request_discard_test(img_request))
2855                 return true;
2856
2857         /* Non-layered writes */
2858         if (!img_request_layered_test(img_request))
2859                 return true;
2860
2861         /*
2862          * Layered writes outside of the parent overlap range don't
2863          * share any data with the parent.
2864          */
2865         if (!obj_request_overlaps_parent(obj_request))
2866                 return true;
2867
2868         /*
2869          * Entire-object layered writes - we will overwrite whatever
2870          * parent data there is anyway.
2871          */
2872         if (!obj_request->offset &&
2873             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2874                 return true;
2875
2876         /*
2877          * If the object is known to already exist, its parent data has
2878          * already been copied.
2879          */
2880         if (obj_request_known_test(obj_request) &&
2881             obj_request_exists_test(obj_request))
2882                 return true;
2883
2884         return false;
2885 }
2886
2887 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2888 {
2889         rbd_assert(obj_request_img_data_test(obj_request));
2890         rbd_assert(obj_request_type_valid(obj_request->type));
2891         rbd_assert(obj_request->img_request);
2892
2893         if (img_obj_request_simple(obj_request)) {
2894                 rbd_obj_request_submit(obj_request);
2895                 return 0;
2896         }
2897
2898         /*
2899          * It's a layered write.  The target object might exist but
2900          * we may not know that yet.  If we know it doesn't exist,
2901          * start by reading the data for the full target object from
2902          * the parent so we can use it for a copyup to the target.
2903          */
2904         if (obj_request_known_test(obj_request))
2905                 return rbd_img_obj_parent_read_full(obj_request);
2906
2907         /* We don't know whether the target exists.  Go find out. */
2908
2909         return rbd_img_obj_exists_submit(obj_request);
2910 }
2911
2912 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2913 {
2914         struct rbd_obj_request *obj_request;
2915         struct rbd_obj_request *next_obj_request;
2916         int ret = 0;
2917
2918         dout("%s: img %p\n", __func__, img_request);
2919
2920         rbd_img_request_get(img_request);
2921         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2922                 ret = rbd_img_obj_request_submit(obj_request);
2923                 if (ret)
2924                         goto out_put_ireq;
2925         }
2926
2927 out_put_ireq:
2928         rbd_img_request_put(img_request);
2929         return ret;
2930 }
2931
2932 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2933 {
2934         struct rbd_obj_request *obj_request;
2935         struct rbd_device *rbd_dev;
2936         u64 obj_end;
2937         u64 img_xferred;
2938         int img_result;
2939
2940         rbd_assert(img_request_child_test(img_request));
2941
2942         /* First get what we need from the image request and release it */
2943
2944         obj_request = img_request->obj_request;
2945         img_xferred = img_request->xferred;
2946         img_result = img_request->result;
2947         rbd_img_request_put(img_request);
2948
2949         /*
2950          * If the overlap has become 0 (most likely because the
2951          * image has been flattened) we need to re-submit the
2952          * original request.
2953          */
2954         rbd_assert(obj_request);
2955         rbd_assert(obj_request->img_request);
2956         rbd_dev = obj_request->img_request->rbd_dev;
2957         if (!rbd_dev->parent_overlap) {
2958                 rbd_obj_request_submit(obj_request);
2959                 return;
2960         }
2961
2962         obj_request->result = img_result;
2963         if (obj_request->result)
2964                 goto out;
2965
2966         /*
2967          * We need to zero anything beyond the parent overlap
2968          * boundary.  Since rbd_img_obj_request_read_callback()
2969          * will zero anything beyond the end of a short read, an
2970          * easy way to do this is to pretend the data from the
2971          * parent came up short--ending at the overlap boundary.
2972          */
2973         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2974         obj_end = obj_request->img_offset + obj_request->length;
2975         if (obj_end > rbd_dev->parent_overlap) {
2976                 u64 xferred = 0;
2977
2978                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2979                         xferred = rbd_dev->parent_overlap -
2980                                         obj_request->img_offset;
2981
2982                 obj_request->xferred = min(img_xferred, xferred);
2983         } else {
2984                 obj_request->xferred = img_xferred;
2985         }
2986 out:
2987         rbd_img_obj_request_read_callback(obj_request);
2988         rbd_obj_request_complete(obj_request);
2989 }
2990
2991 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2992 {
2993         struct rbd_img_request *img_request;
2994         int result;
2995
2996         rbd_assert(obj_request_img_data_test(obj_request));
2997         rbd_assert(obj_request->img_request != NULL);
2998         rbd_assert(obj_request->result == (s32) -ENOENT);
2999         rbd_assert(obj_request_type_valid(obj_request->type));
3000
3001         /* rbd_read_finish(obj_request, obj_request->length); */
3002         img_request = rbd_parent_request_create(obj_request,
3003                                                 obj_request->img_offset,
3004                                                 obj_request->length);
3005         result = -ENOMEM;
3006         if (!img_request)
3007                 goto out_err;
3008
3009         if (obj_request->type == OBJ_REQUEST_BIO)
3010                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3011                                                 obj_request->bio_list);
3012         else
3013                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3014                                                 obj_request->pages);
3015         if (result)
3016                 goto out_err;
3017
3018         img_request->callback = rbd_img_parent_read_callback;
3019         result = rbd_img_request_submit(img_request);
3020         if (result)
3021                 goto out_err;
3022
3023         return;
3024 out_err:
3025         if (img_request)
3026                 rbd_img_request_put(img_request);
3027         obj_request->result = result;
3028         obj_request->xferred = 0;
3029         obj_request_done_set(obj_request);
3030 }
3031
3032 static const struct rbd_client_id rbd_empty_cid;
3033
3034 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3035                           const struct rbd_client_id *rhs)
3036 {
3037         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3038 }
3039
3040 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3041 {
3042         struct rbd_client_id cid;
3043
3044         mutex_lock(&rbd_dev->watch_mutex);
3045         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3046         cid.handle = rbd_dev->watch_cookie;
3047         mutex_unlock(&rbd_dev->watch_mutex);
3048         return cid;
3049 }
3050
3051 /*
3052  * lock_rwsem must be held for write
3053  */
3054 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3055                               const struct rbd_client_id *cid)
3056 {
3057         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3058              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3059              cid->gid, cid->handle);
3060         rbd_dev->owner_cid = *cid; /* struct */
3061 }
3062
3063 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3064 {
3065         mutex_lock(&rbd_dev->watch_mutex);
3066         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3067         mutex_unlock(&rbd_dev->watch_mutex);
3068 }
3069
3070 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3071 {
3072         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3073
3074         strcpy(rbd_dev->lock_cookie, cookie);
3075         rbd_set_owner_cid(rbd_dev, &cid);
3076         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3077 }
3078
3079 /*
3080  * lock_rwsem must be held for write
3081  */
3082 static int rbd_lock(struct rbd_device *rbd_dev)
3083 {
3084         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3085         char cookie[32];
3086         int ret;
3087
3088         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3089                 rbd_dev->lock_cookie[0] != '\0');
3090
3091         format_lock_cookie(rbd_dev, cookie);
3092         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3093                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3094                             RBD_LOCK_TAG, "", 0);
3095         if (ret)
3096                 return ret;
3097
3098         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3099         __rbd_lock(rbd_dev, cookie);
3100         return 0;
3101 }
3102
3103 /*
3104  * lock_rwsem must be held for write
3105  */
3106 static void rbd_unlock(struct rbd_device *rbd_dev)
3107 {
3108         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3109         int ret;
3110
3111         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3112                 rbd_dev->lock_cookie[0] == '\0');
3113
3114         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3115                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3116         if (ret && ret != -ENOENT)
3117                 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3118
3119         /* treat errors as the image is unlocked */
3120         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3121         rbd_dev->lock_cookie[0] = '\0';
3122         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3123         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3124 }
3125
3126 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3127                                 enum rbd_notify_op notify_op,
3128                                 struct page ***preply_pages,
3129                                 size_t *preply_len)
3130 {
3131         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3132         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3133         int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3134         char buf[buf_size];
3135         void *p = buf;
3136
3137         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3138
3139         /* encode *LockPayload NotifyMessage (op + ClientId) */
3140         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3141         ceph_encode_32(&p, notify_op);
3142         ceph_encode_64(&p, cid.gid);
3143         ceph_encode_64(&p, cid.handle);
3144
3145         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3146                                 &rbd_dev->header_oloc, buf, buf_size,
3147                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3148 }
3149
3150 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3151                                enum rbd_notify_op notify_op)
3152 {
3153         struct page **reply_pages;
3154         size_t reply_len;
3155
3156         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3157         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3158 }
3159
3160 static void rbd_notify_acquired_lock(struct work_struct *work)
3161 {
3162         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3163                                                   acquired_lock_work);
3164
3165         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3166 }
3167
3168 static void rbd_notify_released_lock(struct work_struct *work)
3169 {
3170         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3171                                                   released_lock_work);
3172
3173         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3174 }
3175
3176 static int rbd_request_lock(struct rbd_device *rbd_dev)
3177 {
3178         struct page **reply_pages;
3179         size_t reply_len;
3180         bool lock_owner_responded = false;
3181         int ret;
3182
3183         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3184
3185         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3186                                    &reply_pages, &reply_len);
3187         if (ret && ret != -ETIMEDOUT) {
3188                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3189                 goto out;
3190         }
3191
3192         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3193                 void *p = page_address(reply_pages[0]);
3194                 void *const end = p + reply_len;
3195                 u32 n;
3196
3197                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3198                 while (n--) {
3199                         u8 struct_v;
3200                         u32 len;
3201
3202                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3203                         p += 8 + 8; /* skip gid and cookie */
3204
3205                         ceph_decode_32_safe(&p, end, len, e_inval);
3206                         if (!len)
3207                                 continue;
3208
3209                         if (lock_owner_responded) {
3210                                 rbd_warn(rbd_dev,
3211                                          "duplicate lock owners detected");
3212                                 ret = -EIO;
3213                                 goto out;
3214                         }
3215
3216                         lock_owner_responded = true;
3217                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3218                                                   &struct_v, &len);
3219                         if (ret) {
3220                                 rbd_warn(rbd_dev,
3221                                          "failed to decode ResponseMessage: %d",
3222                                          ret);
3223                                 goto e_inval;
3224                         }
3225
3226                         ret = ceph_decode_32(&p);
3227                 }
3228         }
3229
3230         if (!lock_owner_responded) {
3231                 rbd_warn(rbd_dev, "no lock owners detected");
3232                 ret = -ETIMEDOUT;
3233         }
3234
3235 out:
3236         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3237         return ret;
3238
3239 e_inval:
3240         ret = -EINVAL;
3241         goto out;
3242 }
3243
3244 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3245 {
3246         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3247
3248         cancel_delayed_work(&rbd_dev->lock_dwork);
3249         if (wake_all)
3250                 wake_up_all(&rbd_dev->lock_waitq);
3251         else
3252                 wake_up(&rbd_dev->lock_waitq);
3253 }
3254
3255 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3256                                struct ceph_locker **lockers, u32 *num_lockers)
3257 {
3258         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3259         u8 lock_type;
3260         char *lock_tag;
3261         int ret;
3262
3263         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3264
3265         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3266                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3267                                  &lock_type, &lock_tag, lockers, num_lockers);
3268         if (ret)
3269                 return ret;
3270
3271         if (*num_lockers == 0) {
3272                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3273                 goto out;
3274         }
3275
3276         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3277                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3278                          lock_tag);
3279                 ret = -EBUSY;
3280                 goto out;
3281         }
3282
3283         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3284                 rbd_warn(rbd_dev, "shared lock type detected");
3285                 ret = -EBUSY;
3286                 goto out;
3287         }
3288
3289         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3290                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3291                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3292                          (*lockers)[0].id.cookie);
3293                 ret = -EBUSY;
3294                 goto out;
3295         }
3296
3297 out:
3298         kfree(lock_tag);
3299         return ret;
3300 }
3301
3302 static int find_watcher(struct rbd_device *rbd_dev,
3303                         const struct ceph_locker *locker)
3304 {
3305         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3306         struct ceph_watch_item *watchers;
3307         u32 num_watchers;
3308         u64 cookie;
3309         int i;
3310         int ret;
3311
3312         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3313                                       &rbd_dev->header_oloc, &watchers,
3314                                       &num_watchers);
3315         if (ret)
3316                 return ret;
3317
3318         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3319         for (i = 0; i < num_watchers; i++) {
3320                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3321                             sizeof(locker->info.addr)) &&
3322                     watchers[i].cookie == cookie) {
3323                         struct rbd_client_id cid = {
3324                                 .gid = le64_to_cpu(watchers[i].name.num),
3325                                 .handle = cookie,
3326                         };
3327
3328                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3329                              rbd_dev, cid.gid, cid.handle);
3330                         rbd_set_owner_cid(rbd_dev, &cid);
3331                         ret = 1;
3332                         goto out;
3333                 }
3334         }
3335
3336         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3337         ret = 0;
3338 out:
3339         kfree(watchers);
3340         return ret;
3341 }
3342
3343 /*
3344  * lock_rwsem must be held for write
3345  */
3346 static int rbd_try_lock(struct rbd_device *rbd_dev)
3347 {
3348         struct ceph_client *client = rbd_dev->rbd_client->client;
3349         struct ceph_locker *lockers;
3350         u32 num_lockers;
3351         int ret;
3352
3353         for (;;) {
3354                 ret = rbd_lock(rbd_dev);
3355                 if (ret != -EBUSY)
3356                         return ret;
3357
3358                 /* determine if the current lock holder is still alive */
3359                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3360                 if (ret)
3361                         return ret;
3362
3363                 if (num_lockers == 0)
3364                         goto again;
3365
3366                 ret = find_watcher(rbd_dev, lockers);
3367                 if (ret) {
3368                         if (ret > 0)
3369                                 ret = 0; /* have to request lock */
3370                         goto out;
3371                 }
3372
3373                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3374                          ENTITY_NAME(lockers[0].id.name));
3375
3376                 ret = ceph_monc_blacklist_add(&client->monc,
3377                                               &lockers[0].info.addr);
3378                 if (ret) {
3379                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3380                                  ENTITY_NAME(lockers[0].id.name), ret);
3381                         goto out;
3382                 }
3383
3384                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3385                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3386                                           lockers[0].id.cookie,
3387                                           &lockers[0].id.name);
3388                 if (ret && ret != -ENOENT)
3389                         goto out;
3390
3391 again:
3392                 ceph_free_lockers(lockers, num_lockers);
3393         }
3394
3395 out:
3396         ceph_free_lockers(lockers, num_lockers);
3397         return ret;
3398 }
3399
3400 /*
3401  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3402  */
3403 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3404                                                 int *pret)
3405 {
3406         enum rbd_lock_state lock_state;
3407
3408         down_read(&rbd_dev->lock_rwsem);
3409         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3410              rbd_dev->lock_state);
3411         if (__rbd_is_lock_owner(rbd_dev)) {
3412                 lock_state = rbd_dev->lock_state;
3413                 up_read(&rbd_dev->lock_rwsem);
3414                 return lock_state;
3415         }
3416
3417         up_read(&rbd_dev->lock_rwsem);
3418         down_write(&rbd_dev->lock_rwsem);
3419         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3420              rbd_dev->lock_state);
3421         if (!__rbd_is_lock_owner(rbd_dev)) {
3422                 *pret = rbd_try_lock(rbd_dev);
3423                 if (*pret)
3424                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3425         }
3426
3427         lock_state = rbd_dev->lock_state;
3428         up_write(&rbd_dev->lock_rwsem);
3429         return lock_state;
3430 }
3431
3432 static void rbd_acquire_lock(struct work_struct *work)
3433 {
3434         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3435                                             struct rbd_device, lock_dwork);
3436         enum rbd_lock_state lock_state;
3437         int ret = 0;
3438
3439         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3440 again:
3441         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3442         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3443                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3444                         wake_requests(rbd_dev, true);
3445                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3446                      rbd_dev, lock_state, ret);
3447                 return;
3448         }
3449
3450         ret = rbd_request_lock(rbd_dev);
3451         if (ret == -ETIMEDOUT) {
3452                 goto again; /* treat this as a dead client */
3453         } else if (ret == -EROFS) {
3454                 rbd_warn(rbd_dev, "peer will not release lock");
3455                 /*
3456                  * If this is rbd_add_acquire_lock(), we want to fail
3457                  * immediately -- reuse BLACKLISTED flag.  Otherwise we
3458                  * want to block.
3459                  */
3460                 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3461                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3462                         /* wake "rbd map --exclusive" process */
3463                         wake_requests(rbd_dev, false);
3464                 }
3465         } else if (ret < 0) {
3466                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3467                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3468                                  RBD_RETRY_DELAY);
3469         } else {
3470                 /*
3471                  * lock owner acked, but resend if we don't see them
3472                  * release the lock
3473                  */
3474                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3475                      rbd_dev);
3476                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3477                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3478         }
3479 }
3480
3481 /*
3482  * lock_rwsem must be held for write
3483  */
3484 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3485 {
3486         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3487              rbd_dev->lock_state);
3488         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3489                 return false;
3490
3491         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3492         downgrade_write(&rbd_dev->lock_rwsem);
3493         /*
3494          * Ensure that all in-flight IO is flushed.
3495          *
3496          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3497          * may be shared with other devices.
3498          */
3499         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3500         up_read(&rbd_dev->lock_rwsem);
3501
3502         down_write(&rbd_dev->lock_rwsem);
3503         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3504              rbd_dev->lock_state);
3505         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3506                 return false;
3507
3508         rbd_unlock(rbd_dev);
3509         /*
3510          * Give others a chance to grab the lock - we would re-acquire
3511          * almost immediately if we got new IO during ceph_osdc_sync()
3512          * otherwise.  We need to ack our own notifications, so this
3513          * lock_dwork will be requeued from rbd_wait_state_locked()
3514          * after wake_requests() in rbd_handle_released_lock().
3515          */
3516         cancel_delayed_work(&rbd_dev->lock_dwork);
3517         return true;
3518 }
3519
3520 static void rbd_release_lock_work(struct work_struct *work)
3521 {
3522         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3523                                                   unlock_work);
3524
3525         down_write(&rbd_dev->lock_rwsem);
3526         rbd_release_lock(rbd_dev);
3527         up_write(&rbd_dev->lock_rwsem);
3528 }
3529
3530 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3531                                      void **p)
3532 {
3533         struct rbd_client_id cid = { 0 };
3534
3535         if (struct_v >= 2) {
3536                 cid.gid = ceph_decode_64(p);
3537                 cid.handle = ceph_decode_64(p);
3538         }
3539
3540         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3541              cid.handle);
3542         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3543                 down_write(&rbd_dev->lock_rwsem);
3544                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3545                         /*
3546                          * we already know that the remote client is
3547                          * the owner
3548                          */
3549                         up_write(&rbd_dev->lock_rwsem);
3550                         return;
3551                 }
3552
3553                 rbd_set_owner_cid(rbd_dev, &cid);
3554                 downgrade_write(&rbd_dev->lock_rwsem);
3555         } else {
3556                 down_read(&rbd_dev->lock_rwsem);
3557         }
3558
3559         if (!__rbd_is_lock_owner(rbd_dev))
3560                 wake_requests(rbd_dev, false);
3561         up_read(&rbd_dev->lock_rwsem);
3562 }
3563
3564 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3565                                      void **p)
3566 {
3567         struct rbd_client_id cid = { 0 };
3568
3569         if (struct_v >= 2) {
3570                 cid.gid = ceph_decode_64(p);
3571                 cid.handle = ceph_decode_64(p);
3572         }
3573
3574         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3575              cid.handle);
3576         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3577                 down_write(&rbd_dev->lock_rwsem);
3578                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3579                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3580                              __func__, rbd_dev, cid.gid, cid.handle,
3581                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3582                         up_write(&rbd_dev->lock_rwsem);
3583                         return;
3584                 }
3585
3586                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3587                 downgrade_write(&rbd_dev->lock_rwsem);
3588         } else {
3589                 down_read(&rbd_dev->lock_rwsem);
3590         }
3591
3592         if (!__rbd_is_lock_owner(rbd_dev))
3593                 wake_requests(rbd_dev, false);
3594         up_read(&rbd_dev->lock_rwsem);
3595 }
3596
3597 /*
3598  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3599  * ResponseMessage is needed.
3600  */
3601 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3602                                    void **p)
3603 {
3604         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3605         struct rbd_client_id cid = { 0 };
3606         int result = 1;
3607
3608         if (struct_v >= 2) {
3609                 cid.gid = ceph_decode_64(p);
3610                 cid.handle = ceph_decode_64(p);
3611         }
3612
3613         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3614              cid.handle);
3615         if (rbd_cid_equal(&cid, &my_cid))
3616                 return result;
3617
3618         down_read(&rbd_dev->lock_rwsem);
3619         if (__rbd_is_lock_owner(rbd_dev)) {
3620                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3621                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3622                         goto out_unlock;
3623
3624                 /*
3625                  * encode ResponseMessage(0) so the peer can detect
3626                  * a missing owner
3627                  */
3628                 result = 0;
3629
3630                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3631                         if (!rbd_dev->opts->exclusive) {
3632                                 dout("%s rbd_dev %p queueing unlock_work\n",
3633                                      __func__, rbd_dev);
3634                                 queue_work(rbd_dev->task_wq,
3635                                            &rbd_dev->unlock_work);
3636                         } else {
3637                                 /* refuse to release the lock */
3638                                 result = -EROFS;
3639                         }
3640                 }
3641         }
3642
3643 out_unlock:
3644         up_read(&rbd_dev->lock_rwsem);
3645         return result;
3646 }
3647
3648 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3649                                      u64 notify_id, u64 cookie, s32 *result)
3650 {
3651         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3652         int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3653         char buf[buf_size];
3654         int ret;
3655
3656         if (result) {
3657                 void *p = buf;
3658
3659                 /* encode ResponseMessage */
3660                 ceph_start_encoding(&p, 1, 1,
3661                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3662                 ceph_encode_32(&p, *result);
3663         } else {
3664                 buf_size = 0;
3665         }
3666
3667         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3668                                    &rbd_dev->header_oloc, notify_id, cookie,
3669                                    buf, buf_size);
3670         if (ret)
3671                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3672 }
3673
3674 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3675                                    u64 cookie)
3676 {
3677         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3678         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3679 }
3680
3681 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3682                                           u64 notify_id, u64 cookie, s32 result)
3683 {
3684         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3685         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3686 }
3687
3688 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3689                          u64 notifier_id, void *data, size_t data_len)
3690 {
3691         struct rbd_device *rbd_dev = arg;
3692         void *p = data;
3693         void *const end = p + data_len;
3694         u8 struct_v = 0;
3695         u32 len;
3696         u32 notify_op;
3697         int ret;
3698
3699         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3700              __func__, rbd_dev, cookie, notify_id, data_len);
3701         if (data_len) {
3702                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3703                                           &struct_v, &len);
3704                 if (ret) {
3705                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3706                                  ret);
3707                         return;
3708                 }
3709
3710                 notify_op = ceph_decode_32(&p);
3711         } else {
3712                 /* legacy notification for header updates */
3713                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3714                 len = 0;
3715         }
3716
3717         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3718         switch (notify_op) {
3719         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3720                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3721                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3722                 break;
3723         case RBD_NOTIFY_OP_RELEASED_LOCK:
3724                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3725                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3726                 break;
3727         case RBD_NOTIFY_OP_REQUEST_LOCK:
3728                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3729                 if (ret <= 0)
3730                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3731                                                       cookie, ret);
3732                 else
3733                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3734                 break;
3735         case RBD_NOTIFY_OP_HEADER_UPDATE:
3736                 ret = rbd_dev_refresh(rbd_dev);
3737                 if (ret)
3738                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3739
3740                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3741                 break;
3742         default:
3743                 if (rbd_is_lock_owner(rbd_dev))
3744                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3745                                                       cookie, -EOPNOTSUPP);
3746                 else
3747                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3748                 break;
3749         }
3750 }
3751
3752 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3753
3754 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3755 {
3756         struct rbd_device *rbd_dev = arg;
3757
3758         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3759
3760         down_write(&rbd_dev->lock_rwsem);
3761         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3762         up_write(&rbd_dev->lock_rwsem);
3763
3764         mutex_lock(&rbd_dev->watch_mutex);
3765         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3766                 __rbd_unregister_watch(rbd_dev);
3767                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3768
3769                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3770         }
3771         mutex_unlock(&rbd_dev->watch_mutex);
3772 }
3773
3774 /*
3775  * watch_mutex must be locked
3776  */
3777 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3778 {
3779         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3780         struct ceph_osd_linger_request *handle;
3781
3782         rbd_assert(!rbd_dev->watch_handle);
3783         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3784
3785         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3786                                  &rbd_dev->header_oloc, rbd_watch_cb,
3787                                  rbd_watch_errcb, rbd_dev);
3788         if (IS_ERR(handle))
3789                 return PTR_ERR(handle);
3790
3791         rbd_dev->watch_handle = handle;
3792         return 0;
3793 }
3794
3795 /*
3796  * watch_mutex must be locked
3797  */
3798 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3799 {
3800         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3801         int ret;
3802
3803         rbd_assert(rbd_dev->watch_handle);
3804         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3805
3806         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3807         if (ret)
3808                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3809
3810         rbd_dev->watch_handle = NULL;
3811 }
3812
3813 static int rbd_register_watch(struct rbd_device *rbd_dev)
3814 {
3815         int ret;
3816
3817         mutex_lock(&rbd_dev->watch_mutex);
3818         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3819         ret = __rbd_register_watch(rbd_dev);
3820         if (ret)
3821                 goto out;
3822
3823         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3824         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3825
3826 out:
3827         mutex_unlock(&rbd_dev->watch_mutex);
3828         return ret;
3829 }
3830
3831 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3832 {
3833         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3834
3835         cancel_work_sync(&rbd_dev->acquired_lock_work);
3836         cancel_work_sync(&rbd_dev->released_lock_work);
3837         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3838         cancel_work_sync(&rbd_dev->unlock_work);
3839 }
3840
3841 /*
3842  * header_rwsem must not be held to avoid a deadlock with
3843  * rbd_dev_refresh() when flushing notifies.
3844  */
3845 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3846 {
3847         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3848         cancel_tasks_sync(rbd_dev);
3849
3850         mutex_lock(&rbd_dev->watch_mutex);
3851         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3852                 __rbd_unregister_watch(rbd_dev);
3853         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3854         mutex_unlock(&rbd_dev->watch_mutex);
3855
3856         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3857         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3858 }
3859
3860 /*
3861  * lock_rwsem must be held for write
3862  */
3863 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3864 {
3865         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3866         char cookie[32];
3867         int ret;
3868
3869         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3870
3871         format_lock_cookie(rbd_dev, cookie);
3872         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3873                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
3874                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3875                                   RBD_LOCK_TAG, cookie);
3876         if (ret) {
3877                 if (ret != -EOPNOTSUPP)
3878                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3879                                  ret);
3880
3881                 /*
3882                  * Lock cookie cannot be updated on older OSDs, so do
3883                  * a manual release and queue an acquire.
3884                  */
3885                 if (rbd_release_lock(rbd_dev))
3886                         queue_delayed_work(rbd_dev->task_wq,
3887                                            &rbd_dev->lock_dwork, 0);
3888         } else {
3889                 __rbd_lock(rbd_dev, cookie);
3890         }
3891 }
3892
3893 static void rbd_reregister_watch(struct work_struct *work)
3894 {
3895         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3896                                             struct rbd_device, watch_dwork);
3897         int ret;
3898
3899         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3900
3901         mutex_lock(&rbd_dev->watch_mutex);
3902         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3903                 mutex_unlock(&rbd_dev->watch_mutex);
3904                 return;
3905         }
3906
3907         ret = __rbd_register_watch(rbd_dev);
3908         if (ret) {
3909                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3910                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3911                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3912                         wake_requests(rbd_dev, true);
3913                 } else {
3914                         queue_delayed_work(rbd_dev->task_wq,
3915                                            &rbd_dev->watch_dwork,
3916                                            RBD_RETRY_DELAY);
3917                 }
3918                 mutex_unlock(&rbd_dev->watch_mutex);
3919                 return;
3920         }
3921
3922         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3923         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3924         mutex_unlock(&rbd_dev->watch_mutex);
3925
3926         down_write(&rbd_dev->lock_rwsem);
3927         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3928                 rbd_reacquire_lock(rbd_dev);
3929         up_write(&rbd_dev->lock_rwsem);
3930
3931         ret = rbd_dev_refresh(rbd_dev);
3932         if (ret)
3933                 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3934 }
3935
3936 /*
3937  * Synchronous osd object method call.  Returns the number of bytes
3938  * returned in the outbound buffer, or a negative error code.
3939  */
3940 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3941                              struct ceph_object_id *oid,
3942                              struct ceph_object_locator *oloc,
3943                              const char *method_name,
3944                              const void *outbound,
3945                              size_t outbound_size,
3946                              void *inbound,
3947                              size_t inbound_size)
3948 {
3949         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3950         struct page *req_page = NULL;
3951         struct page *reply_page;
3952         int ret;
3953
3954         /*
3955          * Method calls are ultimately read operations.  The result
3956          * should placed into the inbound buffer provided.  They
3957          * also supply outbound data--parameters for the object
3958          * method.  Currently if this is present it will be a
3959          * snapshot id.
3960          */
3961         if (outbound) {
3962                 if (outbound_size > PAGE_SIZE)
3963                         return -E2BIG;
3964
3965                 req_page = alloc_page(GFP_KERNEL);
3966                 if (!req_page)
3967                         return -ENOMEM;
3968
3969                 memcpy(page_address(req_page), outbound, outbound_size);
3970         }
3971
3972         reply_page = alloc_page(GFP_KERNEL);
3973         if (!reply_page) {
3974                 if (req_page)
3975                         __free_page(req_page);
3976                 return -ENOMEM;
3977         }
3978
3979         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3980                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
3981                              reply_page, &inbound_size);
3982         if (!ret) {
3983                 memcpy(inbound, page_address(reply_page), inbound_size);
3984                 ret = inbound_size;
3985         }
3986
3987         if (req_page)
3988                 __free_page(req_page);
3989         __free_page(reply_page);
3990         return ret;
3991 }
3992
3993 /*
3994  * lock_rwsem must be held for read
3995  */
3996 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3997 {
3998         DEFINE_WAIT(wait);
3999
4000         do {
4001                 /*
4002                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
4003                  * and cancel_delayed_work() in wake_requests().
4004                  */
4005                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4006                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4007                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4008                                           TASK_UNINTERRUPTIBLE);
4009                 up_read(&rbd_dev->lock_rwsem);
4010                 schedule();
4011                 down_read(&rbd_dev->lock_rwsem);
4012         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4013                  !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4014
4015         finish_wait(&rbd_dev->lock_waitq, &wait);
4016 }
4017
4018 static void rbd_queue_workfn(struct work_struct *work)
4019 {
4020         struct request *rq = blk_mq_rq_from_pdu(work);
4021         struct rbd_device *rbd_dev = rq->q->queuedata;
4022         struct rbd_img_request *img_request;
4023         struct ceph_snap_context *snapc = NULL;
4024         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4025         u64 length = blk_rq_bytes(rq);
4026         enum obj_operation_type op_type;
4027         u64 mapping_size;
4028         bool must_be_locked;
4029         int result;
4030
4031         switch (req_op(rq)) {
4032         case REQ_OP_DISCARD:
4033         case REQ_OP_WRITE_ZEROES:
4034                 op_type = OBJ_OP_DISCARD;
4035                 break;
4036         case REQ_OP_WRITE:
4037                 op_type = OBJ_OP_WRITE;
4038                 break;
4039         case REQ_OP_READ:
4040                 op_type = OBJ_OP_READ;
4041                 break;
4042         default:
4043                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4044                 result = -EIO;
4045                 goto err;
4046         }
4047
4048         /* Ignore/skip any zero-length requests */
4049
4050         if (!length) {
4051                 dout("%s: zero-length request\n", __func__);
4052                 result = 0;
4053                 goto err_rq;
4054         }
4055
4056         /* Only reads are allowed to a read-only device */
4057
4058         if (op_type != OBJ_OP_READ) {
4059                 if (rbd_dev->mapping.read_only) {
4060                         result = -EROFS;
4061                         goto err_rq;
4062                 }
4063                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4064         }
4065
4066         /*
4067          * Quit early if the mapped snapshot no longer exists.  It's
4068          * still possible the snapshot will have disappeared by the
4069          * time our request arrives at the osd, but there's no sense in
4070          * sending it if we already know.
4071          */
4072         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4073                 dout("request for non-existent snapshot");
4074                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4075                 result = -ENXIO;
4076                 goto err_rq;
4077         }
4078
4079         if (offset && length > U64_MAX - offset + 1) {
4080                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4081                          length);
4082                 result = -EINVAL;
4083                 goto err_rq;    /* Shouldn't happen */
4084         }
4085
4086         blk_mq_start_request(rq);
4087
4088         down_read(&rbd_dev->header_rwsem);
4089         mapping_size = rbd_dev->mapping.size;
4090         if (op_type != OBJ_OP_READ) {
4091                 snapc = rbd_dev->header.snapc;
4092                 ceph_get_snap_context(snapc);
4093         }
4094         up_read(&rbd_dev->header_rwsem);
4095
4096         if (offset + length > mapping_size) {
4097                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4098                          length, mapping_size);
4099                 result = -EIO;
4100                 goto err_rq;
4101         }
4102
4103         must_be_locked =
4104             (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4105             (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4106         if (must_be_locked) {
4107                 down_read(&rbd_dev->lock_rwsem);
4108                 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4109                     !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4110                         if (rbd_dev->opts->exclusive) {
4111                                 rbd_warn(rbd_dev, "exclusive lock required");
4112                                 result = -EROFS;
4113                                 goto err_unlock;
4114                         }
4115                         rbd_wait_state_locked(rbd_dev);
4116                 }
4117                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4118                         result = -EBLACKLISTED;
4119                         goto err_unlock;
4120                 }
4121         }
4122
4123         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4124                                              snapc);
4125         if (!img_request) {
4126                 result = -ENOMEM;
4127                 goto err_unlock;
4128         }
4129         img_request->rq = rq;
4130         snapc = NULL; /* img_request consumes a ref */
4131
4132         if (op_type == OBJ_OP_DISCARD)
4133                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4134                                               NULL);
4135         else
4136                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4137                                               rq->bio);
4138         if (result)
4139                 goto err_img_request;
4140
4141         result = rbd_img_request_submit(img_request);
4142         if (result)
4143                 goto err_img_request;
4144
4145         if (must_be_locked)
4146                 up_read(&rbd_dev->lock_rwsem);
4147         return;
4148
4149 err_img_request:
4150         rbd_img_request_put(img_request);
4151 err_unlock:
4152         if (must_be_locked)
4153                 up_read(&rbd_dev->lock_rwsem);
4154 err_rq:
4155         if (result)
4156                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4157                          obj_op_name(op_type), length, offset, result);
4158         ceph_put_snap_context(snapc);
4159 err:
4160         blk_mq_end_request(rq, errno_to_blk_status(result));
4161 }
4162
4163 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4164                 const struct blk_mq_queue_data *bd)
4165 {
4166         struct request *rq = bd->rq;
4167         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4168
4169         queue_work(rbd_wq, work);
4170         return BLK_STS_OK;
4171 }
4172
4173 static void rbd_free_disk(struct rbd_device *rbd_dev)
4174 {
4175         blk_cleanup_queue(rbd_dev->disk->queue);
4176         blk_mq_free_tag_set(&rbd_dev->tag_set);
4177         put_disk(rbd_dev->disk);
4178         rbd_dev->disk = NULL;
4179 }
4180
4181 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4182                              struct ceph_object_id *oid,
4183                              struct ceph_object_locator *oloc,
4184                              void *buf, int buf_len)
4185
4186 {
4187         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4188         struct ceph_osd_request *req;
4189         struct page **pages;
4190         int num_pages = calc_pages_for(0, buf_len);
4191         int ret;
4192
4193         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4194         if (!req)
4195                 return -ENOMEM;
4196
4197         ceph_oid_copy(&req->r_base_oid, oid);
4198         ceph_oloc_copy(&req->r_base_oloc, oloc);
4199         req->r_flags = CEPH_OSD_FLAG_READ;
4200
4201         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4202         if (ret)
4203                 goto out_req;
4204
4205         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4206         if (IS_ERR(pages)) {
4207                 ret = PTR_ERR(pages);
4208                 goto out_req;
4209         }
4210
4211         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4212         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4213                                          true);
4214
4215         ceph_osdc_start_request(osdc, req, false);
4216         ret = ceph_osdc_wait_request(osdc, req);
4217         if (ret >= 0)
4218                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4219
4220 out_req:
4221         ceph_osdc_put_request(req);
4222         return ret;
4223 }
4224
4225 /*
4226  * Read the complete header for the given rbd device.  On successful
4227  * return, the rbd_dev->header field will contain up-to-date
4228  * information about the image.
4229  */
4230 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4231 {
4232         struct rbd_image_header_ondisk *ondisk = NULL;
4233         u32 snap_count = 0;
4234         u64 names_size = 0;
4235         u32 want_count;
4236         int ret;
4237
4238         /*
4239          * The complete header will include an array of its 64-bit
4240          * snapshot ids, followed by the names of those snapshots as
4241          * a contiguous block of NUL-terminated strings.  Note that
4242          * the number of snapshots could change by the time we read
4243          * it in, in which case we re-read it.
4244          */
4245         do {
4246                 size_t size;
4247
4248                 kfree(ondisk);
4249
4250                 size = sizeof (*ondisk);
4251                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4252                 size += names_size;
4253                 ondisk = kmalloc(size, GFP_KERNEL);
4254                 if (!ondisk)
4255                         return -ENOMEM;
4256
4257                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4258                                         &rbd_dev->header_oloc, ondisk, size);
4259                 if (ret < 0)
4260                         goto out;
4261                 if ((size_t)ret < size) {
4262                         ret = -ENXIO;
4263                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4264                                 size, ret);
4265                         goto out;
4266                 }
4267                 if (!rbd_dev_ondisk_valid(ondisk)) {
4268                         ret = -ENXIO;
4269                         rbd_warn(rbd_dev, "invalid header");
4270                         goto out;
4271                 }
4272
4273                 names_size = le64_to_cpu(ondisk->snap_names_len);
4274                 want_count = snap_count;
4275                 snap_count = le32_to_cpu(ondisk->snap_count);
4276         } while (snap_count != want_count);
4277
4278         ret = rbd_header_from_disk(rbd_dev, ondisk);
4279 out:
4280         kfree(ondisk);
4281
4282         return ret;
4283 }
4284
4285 /*
4286  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4287  * has disappeared from the (just updated) snapshot context.
4288  */
4289 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4290 {
4291         u64 snap_id;
4292
4293         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4294                 return;
4295
4296         snap_id = rbd_dev->spec->snap_id;
4297         if (snap_id == CEPH_NOSNAP)
4298                 return;
4299
4300         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4301                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4302 }
4303
4304 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4305 {
4306         sector_t size;
4307
4308         /*
4309          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4310          * try to update its size.  If REMOVING is set, updating size
4311          * is just useless work since the device can't be opened.
4312          */
4313         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4314             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4315                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4316                 dout("setting size to %llu sectors", (unsigned long long)size);
4317                 set_capacity(rbd_dev->disk, size);
4318                 revalidate_disk(rbd_dev->disk);
4319         }
4320 }
4321
4322 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4323 {
4324         u64 mapping_size;
4325         int ret;
4326
4327         down_write(&rbd_dev->header_rwsem);
4328         mapping_size = rbd_dev->mapping.size;
4329
4330         ret = rbd_dev_header_info(rbd_dev);
4331         if (ret)
4332                 goto out;
4333
4334         /*
4335          * If there is a parent, see if it has disappeared due to the
4336          * mapped image getting flattened.
4337          */
4338         if (rbd_dev->parent) {
4339                 ret = rbd_dev_v2_parent_info(rbd_dev);
4340                 if (ret)
4341                         goto out;
4342         }
4343
4344         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4345                 rbd_dev->mapping.size = rbd_dev->header.image_size;
4346         } else {
4347                 /* validate mapped snapshot's EXISTS flag */
4348                 rbd_exists_validate(rbd_dev);
4349         }
4350
4351 out:
4352         up_write(&rbd_dev->header_rwsem);
4353         if (!ret && mapping_size != rbd_dev->mapping.size)
4354                 rbd_dev_update_size(rbd_dev);
4355
4356         return ret;
4357 }
4358
4359 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4360                 unsigned int hctx_idx, unsigned int numa_node)
4361 {
4362         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4363
4364         INIT_WORK(work, rbd_queue_workfn);
4365         return 0;
4366 }
4367
4368 static const struct blk_mq_ops rbd_mq_ops = {
4369         .queue_rq       = rbd_queue_rq,
4370         .init_request   = rbd_init_request,
4371 };
4372
4373 static int rbd_init_disk(struct rbd_device *rbd_dev)
4374 {
4375         struct gendisk *disk;
4376         struct request_queue *q;
4377         u64 segment_size;
4378         int err;
4379
4380         /* create gendisk info */
4381         disk = alloc_disk(single_major ?
4382                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4383                           RBD_MINORS_PER_MAJOR);
4384         if (!disk)
4385                 return -ENOMEM;
4386
4387         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4388                  rbd_dev->dev_id);
4389         disk->major = rbd_dev->major;
4390         disk->first_minor = rbd_dev->minor;
4391         if (single_major)
4392                 disk->flags |= GENHD_FL_EXT_DEVT;
4393         disk->fops = &rbd_bd_ops;
4394         disk->private_data = rbd_dev;
4395
4396         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4397         rbd_dev->tag_set.ops = &rbd_mq_ops;
4398         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4399         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4400         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4401         rbd_dev->tag_set.nr_hw_queues = 1;
4402         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4403
4404         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4405         if (err)
4406                 goto out_disk;
4407
4408         q = blk_mq_init_queue(&rbd_dev->tag_set);
4409         if (IS_ERR(q)) {
4410                 err = PTR_ERR(q);
4411                 goto out_tag_set;
4412         }
4413
4414         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4415         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4416
4417         /* set io sizes to object size */
4418         segment_size = rbd_obj_bytes(&rbd_dev->header);
4419         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4420         q->limits.max_sectors = queue_max_hw_sectors(q);
4421         blk_queue_max_segments(q, USHRT_MAX);
4422         blk_queue_max_segment_size(q, segment_size);
4423         blk_queue_io_min(q, segment_size);
4424         blk_queue_io_opt(q, segment_size);
4425
4426         /* enable the discard support */
4427         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4428         q->limits.discard_granularity = segment_size;
4429         q->limits.discard_alignment = segment_size;
4430         blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4431         blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4432
4433         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4434                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4435
4436         /*
4437          * disk_release() expects a queue ref from add_disk() and will
4438          * put it.  Hold an extra ref until add_disk() is called.
4439          */
4440         WARN_ON(!blk_get_queue(q));
4441         disk->queue = q;
4442         q->queuedata = rbd_dev;
4443
4444         rbd_dev->disk = disk;
4445
4446         return 0;
4447 out_tag_set:
4448         blk_mq_free_tag_set(&rbd_dev->tag_set);
4449 out_disk:
4450         put_disk(disk);
4451         return err;
4452 }
4453
4454 /*
4455   sysfs
4456 */
4457
4458 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4459 {
4460         return container_of(dev, struct rbd_device, dev);
4461 }
4462
4463 static ssize_t rbd_size_show(struct device *dev,
4464                              struct device_attribute *attr, char *buf)
4465 {
4466         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4467
4468         return sprintf(buf, "%llu\n",
4469                 (unsigned long long)rbd_dev->mapping.size);
4470 }
4471
4472 /*
4473  * Note this shows the features for whatever's mapped, which is not
4474  * necessarily the base image.
4475  */
4476 static ssize_t rbd_features_show(struct device *dev,
4477                              struct device_attribute *attr, char *buf)
4478 {
4479         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4480
4481         return sprintf(buf, "0x%016llx\n",
4482                         (unsigned long long)rbd_dev->mapping.features);
4483 }
4484
4485 static ssize_t rbd_major_show(struct device *dev,
4486                               struct device_attribute *attr, char *buf)
4487 {
4488         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4489
4490         if (rbd_dev->major)
4491                 return sprintf(buf, "%d\n", rbd_dev->major);
4492
4493         return sprintf(buf, "(none)\n");
4494 }
4495
4496 static ssize_t rbd_minor_show(struct device *dev,
4497                               struct device_attribute *attr, char *buf)
4498 {
4499         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4500
4501         return sprintf(buf, "%d\n", rbd_dev->minor);
4502 }
4503
4504 static ssize_t rbd_client_addr_show(struct device *dev,
4505                                     struct device_attribute *attr, char *buf)
4506 {
4507         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4508         struct ceph_entity_addr *client_addr =
4509             ceph_client_addr(rbd_dev->rbd_client->client);
4510
4511         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4512                        le32_to_cpu(client_addr->nonce));
4513 }
4514
4515 static ssize_t rbd_client_id_show(struct device *dev,
4516                                   struct device_attribute *attr, char *buf)
4517 {
4518         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4519
4520         return sprintf(buf, "client%lld\n",
4521                        ceph_client_gid(rbd_dev->rbd_client->client));
4522 }
4523
4524 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4525                                      struct device_attribute *attr, char *buf)
4526 {
4527         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4528
4529         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4530 }
4531
4532 static ssize_t rbd_config_info_show(struct device *dev,
4533                                     struct device_attribute *attr, char *buf)
4534 {
4535         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4536
4537         if (!capable(CAP_SYS_ADMIN))
4538                 return -EPERM;
4539
4540         return sprintf(buf, "%s\n", rbd_dev->config_info);
4541 }
4542
4543 static ssize_t rbd_pool_show(struct device *dev,
4544                              struct device_attribute *attr, char *buf)
4545 {
4546         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4547
4548         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4549 }
4550
4551 static ssize_t rbd_pool_id_show(struct device *dev,
4552                              struct device_attribute *attr, char *buf)
4553 {
4554         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4555
4556         return sprintf(buf, "%llu\n",
4557                         (unsigned long long) rbd_dev->spec->pool_id);
4558 }
4559
4560 static ssize_t rbd_name_show(struct device *dev,
4561                              struct device_attribute *attr, char *buf)
4562 {
4563         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4564
4565         if (rbd_dev->spec->image_name)
4566                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4567
4568         return sprintf(buf, "(unknown)\n");
4569 }
4570
4571 static ssize_t rbd_image_id_show(struct device *dev,
4572                              struct device_attribute *attr, char *buf)
4573 {
4574         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4575
4576         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4577 }
4578
4579 /*
4580  * Shows the name of the currently-mapped snapshot (or
4581  * RBD_SNAP_HEAD_NAME for the base image).
4582  */
4583 static ssize_t rbd_snap_show(struct device *dev,
4584                              struct device_attribute *attr,
4585                              char *buf)
4586 {
4587         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4588
4589         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4590 }
4591
4592 static ssize_t rbd_snap_id_show(struct device *dev,
4593                                 struct device_attribute *attr, char *buf)
4594 {
4595         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4596
4597         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4598 }
4599
4600 /*
4601  * For a v2 image, shows the chain of parent images, separated by empty
4602  * lines.  For v1 images or if there is no parent, shows "(no parent
4603  * image)".
4604  */
4605 static ssize_t rbd_parent_show(struct device *dev,
4606                                struct device_attribute *attr,
4607                                char *buf)
4608 {
4609         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4610         ssize_t count = 0;
4611
4612         if (!rbd_dev->parent)
4613                 return sprintf(buf, "(no parent image)\n");
4614
4615         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4616                 struct rbd_spec *spec = rbd_dev->parent_spec;
4617
4618                 count += sprintf(&buf[count], "%s"
4619                             "pool_id %llu\npool_name %s\n"
4620                             "image_id %s\nimage_name %s\n"
4621                             "snap_id %llu\nsnap_name %s\n"
4622                             "overlap %llu\n",
4623                             !count ? "" : "\n", /* first? */
4624                             spec->pool_id, spec->pool_name,
4625                             spec->image_id, spec->image_name ?: "(unknown)",
4626                             spec->snap_id, spec->snap_name,
4627                             rbd_dev->parent_overlap);
4628         }
4629
4630         return count;
4631 }
4632
4633 static ssize_t rbd_image_refresh(struct device *dev,
4634                                  struct device_attribute *attr,
4635                                  const char *buf,
4636                                  size_t size)
4637 {
4638         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4639         int ret;
4640
4641         if (!capable(CAP_SYS_ADMIN))
4642                 return -EPERM;
4643
4644         ret = rbd_dev_refresh(rbd_dev);
4645         if (ret)
4646                 return ret;
4647
4648         return size;
4649 }
4650
4651 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4652 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4653 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4654 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4655 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4656 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4657 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4658 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4659 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4660 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4661 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4662 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4663 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4664 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4665 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4666 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4667
4668 static struct attribute *rbd_attrs[] = {
4669         &dev_attr_size.attr,
4670         &dev_attr_features.attr,
4671         &dev_attr_major.attr,
4672         &dev_attr_minor.attr,
4673         &dev_attr_client_addr.attr,
4674         &dev_attr_client_id.attr,
4675         &dev_attr_cluster_fsid.attr,
4676         &dev_attr_config_info.attr,
4677         &dev_attr_pool.attr,
4678         &dev_attr_pool_id.attr,
4679         &dev_attr_name.attr,
4680         &dev_attr_image_id.attr,
4681         &dev_attr_current_snap.attr,
4682         &dev_attr_snap_id.attr,
4683         &dev_attr_parent.attr,
4684         &dev_attr_refresh.attr,
4685         NULL
4686 };
4687
4688 static struct attribute_group rbd_attr_group = {
4689         .attrs = rbd_attrs,
4690 };
4691
4692 static const struct attribute_group *rbd_attr_groups[] = {
4693         &rbd_attr_group,
4694         NULL
4695 };
4696
4697 static void rbd_dev_release(struct device *dev);
4698
4699 static const struct device_type rbd_device_type = {
4700         .name           = "rbd",
4701         .groups         = rbd_attr_groups,
4702         .release        = rbd_dev_release,
4703 };
4704
4705 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4706 {
4707         kref_get(&spec->kref);
4708
4709         return spec;
4710 }
4711
4712 static void rbd_spec_free(struct kref *kref);
4713 static void rbd_spec_put(struct rbd_spec *spec)
4714 {
4715         if (spec)
4716                 kref_put(&spec->kref, rbd_spec_free);
4717 }
4718
4719 static struct rbd_spec *rbd_spec_alloc(void)
4720 {
4721         struct rbd_spec *spec;
4722
4723         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4724         if (!spec)
4725                 return NULL;
4726
4727         spec->pool_id = CEPH_NOPOOL;
4728         spec->snap_id = CEPH_NOSNAP;
4729         kref_init(&spec->kref);
4730
4731         return spec;
4732 }
4733
4734 static void rbd_spec_free(struct kref *kref)
4735 {
4736         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4737
4738         kfree(spec->pool_name);
4739         kfree(spec->image_id);
4740         kfree(spec->image_name);
4741         kfree(spec->snap_name);
4742         kfree(spec);
4743 }
4744
4745 static void rbd_dev_free(struct rbd_device *rbd_dev)
4746 {
4747         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4748         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4749
4750         ceph_oid_destroy(&rbd_dev->header_oid);
4751         ceph_oloc_destroy(&rbd_dev->header_oloc);
4752         kfree(rbd_dev->config_info);
4753
4754         rbd_put_client(rbd_dev->rbd_client);
4755         rbd_spec_put(rbd_dev->spec);
4756         kfree(rbd_dev->opts);
4757         kfree(rbd_dev);
4758 }
4759
4760 static void rbd_dev_release(struct device *dev)
4761 {
4762         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4763         bool need_put = !!rbd_dev->opts;
4764
4765         if (need_put) {
4766                 destroy_workqueue(rbd_dev->task_wq);
4767                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4768         }
4769
4770         rbd_dev_free(rbd_dev);
4771
4772         /*
4773          * This is racy, but way better than putting module outside of
4774          * the release callback.  The race window is pretty small, so
4775          * doing something similar to dm (dm-builtin.c) is overkill.
4776          */
4777         if (need_put)
4778                 module_put(THIS_MODULE);
4779 }
4780
4781 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4782                                            struct rbd_spec *spec)
4783 {
4784         struct rbd_device *rbd_dev;
4785
4786         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4787         if (!rbd_dev)
4788                 return NULL;
4789
4790         spin_lock_init(&rbd_dev->lock);
4791         INIT_LIST_HEAD(&rbd_dev->node);
4792         init_rwsem(&rbd_dev->header_rwsem);
4793
4794         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4795         ceph_oid_init(&rbd_dev->header_oid);
4796         rbd_dev->header_oloc.pool = spec->pool_id;
4797
4798         mutex_init(&rbd_dev->watch_mutex);
4799         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4800         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4801
4802         init_rwsem(&rbd_dev->lock_rwsem);
4803         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4804         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4805         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4806         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4807         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4808         init_waitqueue_head(&rbd_dev->lock_waitq);
4809
4810         rbd_dev->dev.bus = &rbd_bus_type;
4811         rbd_dev->dev.type = &rbd_device_type;
4812         rbd_dev->dev.parent = &rbd_root_dev;
4813         device_initialize(&rbd_dev->dev);
4814
4815         rbd_dev->rbd_client = rbdc;
4816         rbd_dev->spec = spec;
4817
4818         return rbd_dev;
4819 }
4820
4821 /*
4822  * Create a mapping rbd_dev.
4823  */
4824 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4825                                          struct rbd_spec *spec,
4826                                          struct rbd_options *opts)
4827 {
4828         struct rbd_device *rbd_dev;
4829
4830         rbd_dev = __rbd_dev_create(rbdc, spec);
4831         if (!rbd_dev)
4832                 return NULL;
4833
4834         rbd_dev->opts = opts;
4835
4836         /* get an id and fill in device name */
4837         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4838                                          minor_to_rbd_dev_id(1 << MINORBITS),
4839                                          GFP_KERNEL);
4840         if (rbd_dev->dev_id < 0)
4841                 goto fail_rbd_dev;
4842
4843         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4844         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4845                                                    rbd_dev->name);
4846         if (!rbd_dev->task_wq)
4847                 goto fail_dev_id;
4848
4849         /* we have a ref from do_rbd_add() */
4850         __module_get(THIS_MODULE);
4851
4852         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4853         return rbd_dev;
4854
4855 fail_dev_id:
4856         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4857 fail_rbd_dev:
4858         rbd_dev_free(rbd_dev);
4859         return NULL;
4860 }
4861
4862 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4863 {
4864         if (rbd_dev)
4865                 put_device(&rbd_dev->dev);
4866 }
4867
4868 /*
4869  * Get the size and object order for an image snapshot, or if
4870  * snap_id is CEPH_NOSNAP, gets this information for the base
4871  * image.
4872  */
4873 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4874                                 u8 *order, u64 *snap_size)
4875 {
4876         __le64 snapid = cpu_to_le64(snap_id);
4877         int ret;
4878         struct {
4879                 u8 order;
4880                 __le64 size;
4881         } __attribute__ ((packed)) size_buf = { 0 };
4882
4883         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4884                                   &rbd_dev->header_oloc, "get_size",
4885                                   &snapid, sizeof(snapid),
4886                                   &size_buf, sizeof(size_buf));
4887         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4888         if (ret < 0)
4889                 return ret;
4890         if (ret < sizeof (size_buf))
4891                 return -ERANGE;
4892
4893         if (order) {
4894                 *order = size_buf.order;
4895                 dout("  order %u", (unsigned int)*order);
4896         }
4897         *snap_size = le64_to_cpu(size_buf.size);
4898
4899         dout("  snap_id 0x%016llx snap_size = %llu\n",
4900                 (unsigned long long)snap_id,
4901                 (unsigned long long)*snap_size);
4902
4903         return 0;
4904 }
4905
4906 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4907 {
4908         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4909                                         &rbd_dev->header.obj_order,
4910                                         &rbd_dev->header.image_size);
4911 }
4912
4913 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4914 {
4915         void *reply_buf;
4916         int ret;
4917         void *p;
4918
4919         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4920         if (!reply_buf)
4921                 return -ENOMEM;
4922
4923         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4924                                   &rbd_dev->header_oloc, "get_object_prefix",
4925                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4926         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4927         if (ret < 0)
4928                 goto out;
4929
4930         p = reply_buf;
4931         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4932                                                 p + ret, NULL, GFP_NOIO);
4933         ret = 0;
4934
4935         if (IS_ERR(rbd_dev->header.object_prefix)) {
4936                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4937                 rbd_dev->header.object_prefix = NULL;
4938         } else {
4939                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4940         }
4941 out:
4942         kfree(reply_buf);
4943
4944         return ret;
4945 }
4946
4947 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4948                 u64 *snap_features)
4949 {
4950         __le64 snapid = cpu_to_le64(snap_id);
4951         struct {
4952                 __le64 features;
4953                 __le64 incompat;
4954         } __attribute__ ((packed)) features_buf = { 0 };
4955         u64 unsup;
4956         int ret;
4957
4958         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4959                                   &rbd_dev->header_oloc, "get_features",
4960                                   &snapid, sizeof(snapid),
4961                                   &features_buf, sizeof(features_buf));
4962         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4963         if (ret < 0)
4964                 return ret;
4965         if (ret < sizeof (features_buf))
4966                 return -ERANGE;
4967
4968         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4969         if (unsup) {
4970                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4971                          unsup);
4972                 return -ENXIO;
4973         }
4974
4975         *snap_features = le64_to_cpu(features_buf.features);
4976
4977         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4978                 (unsigned long long)snap_id,
4979                 (unsigned long long)*snap_features,
4980                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4981
4982         return 0;
4983 }
4984
4985 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4986 {
4987         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4988                                                 &rbd_dev->header.features);
4989 }
4990
4991 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4992 {
4993         struct rbd_spec *parent_spec;
4994         size_t size;
4995         void *reply_buf = NULL;
4996         __le64 snapid;
4997         void *p;
4998         void *end;
4999         u64 pool_id;
5000         char *image_id;
5001         u64 snap_id;
5002         u64 overlap;
5003         int ret;
5004
5005         parent_spec = rbd_spec_alloc();
5006         if (!parent_spec)
5007                 return -ENOMEM;
5008
5009         size = sizeof (__le64) +                                /* pool_id */
5010                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
5011                 sizeof (__le64) +                               /* snap_id */
5012                 sizeof (__le64);                                /* overlap */
5013         reply_buf = kmalloc(size, GFP_KERNEL);
5014         if (!reply_buf) {
5015                 ret = -ENOMEM;
5016                 goto out_err;
5017         }
5018
5019         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5020         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5021                                   &rbd_dev->header_oloc, "get_parent",
5022                                   &snapid, sizeof(snapid), reply_buf, size);
5023         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5024         if (ret < 0)
5025                 goto out_err;
5026
5027         p = reply_buf;
5028         end = reply_buf + ret;
5029         ret = -ERANGE;
5030         ceph_decode_64_safe(&p, end, pool_id, out_err);
5031         if (pool_id == CEPH_NOPOOL) {
5032                 /*
5033                  * Either the parent never existed, or we have
5034                  * record of it but the image got flattened so it no
5035                  * longer has a parent.  When the parent of a
5036                  * layered image disappears we immediately set the
5037                  * overlap to 0.  The effect of this is that all new
5038                  * requests will be treated as if the image had no
5039                  * parent.
5040                  */
5041                 if (rbd_dev->parent_overlap) {
5042                         rbd_dev->parent_overlap = 0;
5043                         rbd_dev_parent_put(rbd_dev);
5044                         pr_info("%s: clone image has been flattened\n",
5045                                 rbd_dev->disk->disk_name);
5046                 }
5047
5048                 goto out;       /* No parent?  No problem. */
5049         }
5050
5051         /* The ceph file layout needs to fit pool id in 32 bits */
5052
5053         ret = -EIO;
5054         if (pool_id > (u64)U32_MAX) {
5055                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5056                         (unsigned long long)pool_id, U32_MAX);
5057                 goto out_err;
5058         }
5059
5060         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5061         if (IS_ERR(image_id)) {
5062                 ret = PTR_ERR(image_id);
5063                 goto out_err;
5064         }
5065         ceph_decode_64_safe(&p, end, snap_id, out_err);
5066         ceph_decode_64_safe(&p, end, overlap, out_err);
5067
5068         /*
5069          * The parent won't change (except when the clone is
5070          * flattened, already handled that).  So we only need to
5071          * record the parent spec we have not already done so.
5072          */
5073         if (!rbd_dev->parent_spec) {
5074                 parent_spec->pool_id = pool_id;
5075                 parent_spec->image_id = image_id;
5076                 parent_spec->snap_id = snap_id;
5077                 rbd_dev->parent_spec = parent_spec;
5078                 parent_spec = NULL;     /* rbd_dev now owns this */
5079         } else {
5080                 kfree(image_id);
5081         }
5082
5083         /*
5084          * We always update the parent overlap.  If it's zero we issue
5085          * a warning, as we will proceed as if there was no parent.
5086          */
5087         if (!overlap) {
5088                 if (parent_spec) {
5089                         /* refresh, careful to warn just once */
5090                         if (rbd_dev->parent_overlap)
5091                                 rbd_warn(rbd_dev,
5092                                     "clone now standalone (overlap became 0)");
5093                 } else {
5094                         /* initial probe */
5095                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5096                 }
5097         }
5098         rbd_dev->parent_overlap = overlap;
5099
5100 out:
5101         ret = 0;
5102 out_err:
5103         kfree(reply_buf);
5104         rbd_spec_put(parent_spec);
5105
5106         return ret;
5107 }
5108
5109 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5110 {
5111         struct {
5112                 __le64 stripe_unit;
5113                 __le64 stripe_count;
5114         } __attribute__ ((packed)) striping_info_buf = { 0 };
5115         size_t size = sizeof (striping_info_buf);
5116         void *p;
5117         u64 obj_size;
5118         u64 stripe_unit;
5119         u64 stripe_count;
5120         int ret;
5121
5122         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5123                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5124                                 NULL, 0, &striping_info_buf, size);
5125         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5126         if (ret < 0)
5127                 return ret;
5128         if (ret < size)
5129                 return -ERANGE;
5130
5131         /*
5132          * We don't actually support the "fancy striping" feature
5133          * (STRIPINGV2) yet, but if the striping sizes are the
5134          * defaults the behavior is the same as before.  So find
5135          * out, and only fail if the image has non-default values.
5136          */
5137         ret = -EINVAL;
5138         obj_size = rbd_obj_bytes(&rbd_dev->header);
5139         p = &striping_info_buf;
5140         stripe_unit = ceph_decode_64(&p);
5141         if (stripe_unit != obj_size) {
5142                 rbd_warn(rbd_dev, "unsupported stripe unit "
5143                                 "(got %llu want %llu)",
5144                                 stripe_unit, obj_size);
5145                 return -EINVAL;
5146         }
5147         stripe_count = ceph_decode_64(&p);
5148         if (stripe_count != 1) {
5149                 rbd_warn(rbd_dev, "unsupported stripe count "
5150                                 "(got %llu want 1)", stripe_count);
5151                 return -EINVAL;
5152         }
5153         rbd_dev->header.stripe_unit = stripe_unit;
5154         rbd_dev->header.stripe_count = stripe_count;
5155
5156         return 0;
5157 }
5158
5159 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5160 {
5161         __le64 data_pool_id;
5162         int ret;
5163
5164         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5165                                   &rbd_dev->header_oloc, "get_data_pool",
5166                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5167         if (ret < 0)
5168                 return ret;
5169         if (ret < sizeof(data_pool_id))
5170                 return -EBADMSG;
5171
5172         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5173         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5174         return 0;
5175 }
5176
5177 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5178 {
5179         CEPH_DEFINE_OID_ONSTACK(oid);
5180         size_t image_id_size;
5181         char *image_id;
5182         void *p;
5183         void *end;
5184         size_t size;
5185         void *reply_buf = NULL;
5186         size_t len = 0;
5187         char *image_name = NULL;
5188         int ret;
5189
5190         rbd_assert(!rbd_dev->spec->image_name);
5191
5192         len = strlen(rbd_dev->spec->image_id);
5193         image_id_size = sizeof (__le32) + len;
5194         image_id = kmalloc(image_id_size, GFP_KERNEL);
5195         if (!image_id)
5196                 return NULL;
5197
5198         p = image_id;
5199         end = image_id + image_id_size;
5200         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5201
5202         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5203         reply_buf = kmalloc(size, GFP_KERNEL);
5204         if (!reply_buf)
5205                 goto out;
5206
5207         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5208         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5209                                   "dir_get_name", image_id, image_id_size,
5210                                   reply_buf, size);
5211         if (ret < 0)
5212                 goto out;
5213         p = reply_buf;
5214         end = reply_buf + ret;
5215
5216         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5217         if (IS_ERR(image_name))
5218                 image_name = NULL;
5219         else
5220                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5221 out:
5222         kfree(reply_buf);
5223         kfree(image_id);
5224
5225         return image_name;
5226 }
5227
5228 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5229 {
5230         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5231         const char *snap_name;
5232         u32 which = 0;
5233
5234         /* Skip over names until we find the one we are looking for */
5235
5236         snap_name = rbd_dev->header.snap_names;
5237         while (which < snapc->num_snaps) {
5238                 if (!strcmp(name, snap_name))
5239                         return snapc->snaps[which];
5240                 snap_name += strlen(snap_name) + 1;
5241                 which++;
5242         }
5243         return CEPH_NOSNAP;
5244 }
5245
5246 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5247 {
5248         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5249         u32 which;
5250         bool found = false;
5251         u64 snap_id;
5252
5253         for (which = 0; !found && which < snapc->num_snaps; which++) {
5254                 const char *snap_name;
5255
5256                 snap_id = snapc->snaps[which];
5257                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5258                 if (IS_ERR(snap_name)) {
5259                         /* ignore no-longer existing snapshots */
5260                         if (PTR_ERR(snap_name) == -ENOENT)
5261                                 continue;
5262                         else
5263                                 break;
5264                 }
5265                 found = !strcmp(name, snap_name);
5266                 kfree(snap_name);
5267         }
5268         return found ? snap_id : CEPH_NOSNAP;
5269 }
5270
5271 /*
5272  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5273  * no snapshot by that name is found, or if an error occurs.
5274  */
5275 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5276 {
5277         if (rbd_dev->image_format == 1)
5278                 return rbd_v1_snap_id_by_name(rbd_dev, name);
5279
5280         return rbd_v2_snap_id_by_name(rbd_dev, name);
5281 }
5282
5283 /*
5284  * An image being mapped will have everything but the snap id.
5285  */
5286 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5287 {
5288         struct rbd_spec *spec = rbd_dev->spec;
5289
5290         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5291         rbd_assert(spec->image_id && spec->image_name);
5292         rbd_assert(spec->snap_name);
5293
5294         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5295                 u64 snap_id;
5296
5297                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5298                 if (snap_id == CEPH_NOSNAP)
5299                         return -ENOENT;
5300
5301                 spec->snap_id = snap_id;
5302         } else {
5303                 spec->snap_id = CEPH_NOSNAP;
5304         }
5305
5306         return 0;
5307 }
5308
5309 /*
5310  * A parent image will have all ids but none of the names.
5311  *
5312  * All names in an rbd spec are dynamically allocated.  It's OK if we
5313  * can't figure out the name for an image id.
5314  */
5315 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5316 {
5317         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5318         struct rbd_spec *spec = rbd_dev->spec;
5319         const char *pool_name;
5320         const char *image_name;
5321         const char *snap_name;
5322         int ret;
5323
5324         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5325         rbd_assert(spec->image_id);
5326         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5327
5328         /* Get the pool name; we have to make our own copy of this */
5329
5330         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5331         if (!pool_name) {
5332                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5333                 return -EIO;
5334         }
5335         pool_name = kstrdup(pool_name, GFP_KERNEL);
5336         if (!pool_name)
5337                 return -ENOMEM;
5338
5339         /* Fetch the image name; tolerate failure here */
5340
5341         image_name = rbd_dev_image_name(rbd_dev);
5342         if (!image_name)
5343                 rbd_warn(rbd_dev, "unable to get image name");
5344
5345         /* Fetch the snapshot name */
5346
5347         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5348         if (IS_ERR(snap_name)) {
5349                 ret = PTR_ERR(snap_name);
5350                 goto out_err;
5351         }
5352
5353         spec->pool_name = pool_name;
5354         spec->image_name = image_name;
5355         spec->snap_name = snap_name;
5356
5357         return 0;
5358
5359 out_err:
5360         kfree(image_name);
5361         kfree(pool_name);
5362         return ret;
5363 }
5364
5365 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5366 {
5367         size_t size;
5368         int ret;
5369         void *reply_buf;
5370         void *p;
5371         void *end;
5372         u64 seq;
5373         u32 snap_count;
5374         struct ceph_snap_context *snapc;
5375         u32 i;
5376
5377         /*
5378          * We'll need room for the seq value (maximum snapshot id),
5379          * snapshot count, and array of that many snapshot ids.
5380          * For now we have a fixed upper limit on the number we're
5381          * prepared to receive.
5382          */
5383         size = sizeof (__le64) + sizeof (__le32) +
5384                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5385         reply_buf = kzalloc(size, GFP_KERNEL);
5386         if (!reply_buf)
5387                 return -ENOMEM;
5388
5389         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5390                                   &rbd_dev->header_oloc, "get_snapcontext",
5391                                   NULL, 0, reply_buf, size);
5392         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5393         if (ret < 0)
5394                 goto out;
5395
5396         p = reply_buf;
5397         end = reply_buf + ret;
5398         ret = -ERANGE;
5399         ceph_decode_64_safe(&p, end, seq, out);
5400         ceph_decode_32_safe(&p, end, snap_count, out);
5401
5402         /*
5403          * Make sure the reported number of snapshot ids wouldn't go
5404          * beyond the end of our buffer.  But before checking that,
5405          * make sure the computed size of the snapshot context we
5406          * allocate is representable in a size_t.
5407          */
5408         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5409                                  / sizeof (u64)) {
5410                 ret = -EINVAL;
5411                 goto out;
5412         }
5413         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5414                 goto out;
5415         ret = 0;
5416
5417         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5418         if (!snapc) {
5419                 ret = -ENOMEM;
5420                 goto out;
5421         }
5422         snapc->seq = seq;
5423         for (i = 0; i < snap_count; i++)
5424                 snapc->snaps[i] = ceph_decode_64(&p);
5425
5426         ceph_put_snap_context(rbd_dev->header.snapc);
5427         rbd_dev->header.snapc = snapc;
5428
5429         dout("  snap context seq = %llu, snap_count = %u\n",
5430                 (unsigned long long)seq, (unsigned int)snap_count);
5431 out:
5432         kfree(reply_buf);
5433
5434         return ret;
5435 }
5436
5437 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5438                                         u64 snap_id)
5439 {
5440         size_t size;
5441         void *reply_buf;
5442         __le64 snapid;
5443         int ret;
5444         void *p;
5445         void *end;
5446         char *snap_name;
5447
5448         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5449         reply_buf = kmalloc(size, GFP_KERNEL);
5450         if (!reply_buf)
5451                 return ERR_PTR(-ENOMEM);
5452
5453         snapid = cpu_to_le64(snap_id);
5454         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5455                                   &rbd_dev->header_oloc, "get_snapshot_name",
5456                                   &snapid, sizeof(snapid), reply_buf, size);
5457         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5458         if (ret < 0) {
5459                 snap_name = ERR_PTR(ret);
5460                 goto out;
5461         }
5462
5463         p = reply_buf;
5464         end = reply_buf + ret;
5465         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5466         if (IS_ERR(snap_name))
5467                 goto out;
5468
5469         dout("  snap_id 0x%016llx snap_name = %s\n",
5470                 (unsigned long long)snap_id, snap_name);
5471 out:
5472         kfree(reply_buf);
5473
5474         return snap_name;
5475 }
5476
5477 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5478 {
5479         bool first_time = rbd_dev->header.object_prefix == NULL;
5480         int ret;
5481
5482         ret = rbd_dev_v2_image_size(rbd_dev);
5483         if (ret)
5484                 return ret;
5485
5486         if (first_time) {
5487                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5488                 if (ret)
5489                         return ret;
5490         }
5491
5492         ret = rbd_dev_v2_snap_context(rbd_dev);
5493         if (ret && first_time) {
5494                 kfree(rbd_dev->header.object_prefix);
5495                 rbd_dev->header.object_prefix = NULL;
5496         }
5497
5498         return ret;
5499 }
5500
5501 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5502 {
5503         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5504
5505         if (rbd_dev->image_format == 1)
5506                 return rbd_dev_v1_header_info(rbd_dev);
5507
5508         return rbd_dev_v2_header_info(rbd_dev);
5509 }
5510
5511 /*
5512  * Skips over white space at *buf, and updates *buf to point to the
5513  * first found non-space character (if any). Returns the length of
5514  * the token (string of non-white space characters) found.  Note
5515  * that *buf must be terminated with '\0'.
5516  */
5517 static inline size_t next_token(const char **buf)
5518 {
5519         /*
5520         * These are the characters that produce nonzero for
5521         * isspace() in the "C" and "POSIX" locales.
5522         */
5523         const char *spaces = " \f\n\r\t\v";
5524
5525         *buf += strspn(*buf, spaces);   /* Find start of token */
5526
5527         return strcspn(*buf, spaces);   /* Return token length */
5528 }
5529
5530 /*
5531  * Finds the next token in *buf, dynamically allocates a buffer big
5532  * enough to hold a copy of it, and copies the token into the new
5533  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5534  * that a duplicate buffer is created even for a zero-length token.
5535  *
5536  * Returns a pointer to the newly-allocated duplicate, or a null
5537  * pointer if memory for the duplicate was not available.  If
5538  * the lenp argument is a non-null pointer, the length of the token
5539  * (not including the '\0') is returned in *lenp.
5540  *
5541  * If successful, the *buf pointer will be updated to point beyond
5542  * the end of the found token.
5543  *
5544  * Note: uses GFP_KERNEL for allocation.
5545  */
5546 static inline char *dup_token(const char **buf, size_t *lenp)
5547 {
5548         char *dup;
5549         size_t len;
5550
5551         len = next_token(buf);
5552         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5553         if (!dup)
5554                 return NULL;
5555         *(dup + len) = '\0';
5556         *buf += len;
5557
5558         if (lenp)
5559                 *lenp = len;
5560
5561         return dup;
5562 }
5563
5564 /*
5565  * Parse the options provided for an "rbd add" (i.e., rbd image
5566  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5567  * and the data written is passed here via a NUL-terminated buffer.
5568  * Returns 0 if successful or an error code otherwise.
5569  *
5570  * The information extracted from these options is recorded in
5571  * the other parameters which return dynamically-allocated
5572  * structures:
5573  *  ceph_opts
5574  *      The address of a pointer that will refer to a ceph options
5575  *      structure.  Caller must release the returned pointer using
5576  *      ceph_destroy_options() when it is no longer needed.
5577  *  rbd_opts
5578  *      Address of an rbd options pointer.  Fully initialized by
5579  *      this function; caller must release with kfree().
5580  *  spec
5581  *      Address of an rbd image specification pointer.  Fully
5582  *      initialized by this function based on parsed options.
5583  *      Caller must release with rbd_spec_put().
5584  *
5585  * The options passed take this form:
5586  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5587  * where:
5588  *  <mon_addrs>
5589  *      A comma-separated list of one or more monitor addresses.
5590  *      A monitor address is an ip address, optionally followed
5591  *      by a port number (separated by a colon).
5592  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5593  *  <options>
5594  *      A comma-separated list of ceph and/or rbd options.
5595  *  <pool_name>
5596  *      The name of the rados pool containing the rbd image.
5597  *  <image_name>
5598  *      The name of the image in that pool to map.
5599  *  <snap_id>
5600  *      An optional snapshot id.  If provided, the mapping will
5601  *      present data from the image at the time that snapshot was
5602  *      created.  The image head is used if no snapshot id is
5603  *      provided.  Snapshot mappings are always read-only.
5604  */
5605 static int rbd_add_parse_args(const char *buf,
5606                                 struct ceph_options **ceph_opts,
5607                                 struct rbd_options **opts,
5608                                 struct rbd_spec **rbd_spec)
5609 {
5610         size_t len;
5611         char *options;
5612         const char *mon_addrs;
5613         char *snap_name;
5614         size_t mon_addrs_size;
5615         struct rbd_spec *spec = NULL;
5616         struct rbd_options *rbd_opts = NULL;
5617         struct ceph_options *copts;
5618         int ret;
5619
5620         /* The first four tokens are required */
5621
5622         len = next_token(&buf);
5623         if (!len) {
5624                 rbd_warn(NULL, "no monitor address(es) provided");
5625                 return -EINVAL;
5626         }
5627         mon_addrs = buf;
5628         mon_addrs_size = len + 1;
5629         buf += len;
5630
5631         ret = -EINVAL;
5632         options = dup_token(&buf, NULL);
5633         if (!options)
5634                 return -ENOMEM;
5635         if (!*options) {
5636                 rbd_warn(NULL, "no options provided");
5637                 goto out_err;
5638         }
5639
5640         spec = rbd_spec_alloc();
5641         if (!spec)
5642                 goto out_mem;
5643
5644         spec->pool_name = dup_token(&buf, NULL);
5645         if (!spec->pool_name)
5646                 goto out_mem;
5647         if (!*spec->pool_name) {
5648                 rbd_warn(NULL, "no pool name provided");
5649                 goto out_err;
5650         }
5651
5652         spec->image_name = dup_token(&buf, NULL);
5653         if (!spec->image_name)
5654                 goto out_mem;
5655         if (!*spec->image_name) {
5656                 rbd_warn(NULL, "no image name provided");
5657                 goto out_err;
5658         }
5659
5660         /*
5661          * Snapshot name is optional; default is to use "-"
5662          * (indicating the head/no snapshot).
5663          */
5664         len = next_token(&buf);
5665         if (!len) {
5666                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5667                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5668         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5669                 ret = -ENAMETOOLONG;
5670                 goto out_err;
5671         }
5672         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5673         if (!snap_name)
5674                 goto out_mem;
5675         *(snap_name + len) = '\0';
5676         spec->snap_name = snap_name;
5677
5678         /* Initialize all rbd options to the defaults */
5679
5680         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5681         if (!rbd_opts)
5682                 goto out_mem;
5683
5684         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5685         rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5686         rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5687         rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5688
5689         copts = ceph_parse_options(options, mon_addrs,
5690                                         mon_addrs + mon_addrs_size - 1,
5691                                         parse_rbd_opts_token, rbd_opts);
5692         if (IS_ERR(copts)) {
5693                 ret = PTR_ERR(copts);
5694                 goto out_err;
5695         }
5696         kfree(options);
5697
5698         *ceph_opts = copts;
5699         *opts = rbd_opts;
5700         *rbd_spec = spec;
5701
5702         return 0;
5703 out_mem:
5704         ret = -ENOMEM;
5705 out_err:
5706         kfree(rbd_opts);
5707         rbd_spec_put(spec);
5708         kfree(options);
5709
5710         return ret;
5711 }
5712
5713 /*
5714  * Return pool id (>= 0) or a negative error code.
5715  */
5716 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5717 {
5718         struct ceph_options *opts = rbdc->client->options;
5719         u64 newest_epoch;
5720         int tries = 0;
5721         int ret;
5722
5723 again:
5724         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5725         if (ret == -ENOENT && tries++ < 1) {
5726                 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5727                                             &newest_epoch);
5728                 if (ret < 0)
5729                         return ret;
5730
5731                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5732                         ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5733                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5734                                                      newest_epoch,
5735                                                      opts->mount_timeout);
5736                         goto again;
5737                 } else {
5738                         /* the osdmap we have is new enough */
5739                         return -ENOENT;
5740                 }
5741         }
5742
5743         return ret;
5744 }
5745
5746 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5747 {
5748         down_write(&rbd_dev->lock_rwsem);
5749         if (__rbd_is_lock_owner(rbd_dev))
5750                 rbd_unlock(rbd_dev);
5751         up_write(&rbd_dev->lock_rwsem);
5752 }
5753
5754 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5755 {
5756         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5757                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5758                 return -EINVAL;
5759         }
5760
5761         /* FIXME: "rbd map --exclusive" should be in interruptible */
5762         down_read(&rbd_dev->lock_rwsem);
5763         rbd_wait_state_locked(rbd_dev);
5764         up_read(&rbd_dev->lock_rwsem);
5765         if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5766                 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5767                 return -EROFS;
5768         }
5769
5770         return 0;
5771 }
5772
5773 /*
5774  * An rbd format 2 image has a unique identifier, distinct from the
5775  * name given to it by the user.  Internally, that identifier is
5776  * what's used to specify the names of objects related to the image.
5777  *
5778  * A special "rbd id" object is used to map an rbd image name to its
5779  * id.  If that object doesn't exist, then there is no v2 rbd image
5780  * with the supplied name.
5781  *
5782  * This function will record the given rbd_dev's image_id field if
5783  * it can be determined, and in that case will return 0.  If any
5784  * errors occur a negative errno will be returned and the rbd_dev's
5785  * image_id field will be unchanged (and should be NULL).
5786  */
5787 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5788 {
5789         int ret;
5790         size_t size;
5791         CEPH_DEFINE_OID_ONSTACK(oid);
5792         void *response;
5793         char *image_id;
5794
5795         /*
5796          * When probing a parent image, the image id is already
5797          * known (and the image name likely is not).  There's no
5798          * need to fetch the image id again in this case.  We
5799          * do still need to set the image format though.
5800          */
5801         if (rbd_dev->spec->image_id) {
5802                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5803
5804                 return 0;
5805         }
5806
5807         /*
5808          * First, see if the format 2 image id file exists, and if
5809          * so, get the image's persistent id from it.
5810          */
5811         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5812                                rbd_dev->spec->image_name);
5813         if (ret)
5814                 return ret;
5815
5816         dout("rbd id object name is %s\n", oid.name);
5817
5818         /* Response will be an encoded string, which includes a length */
5819
5820         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5821         response = kzalloc(size, GFP_NOIO);
5822         if (!response) {
5823                 ret = -ENOMEM;
5824                 goto out;
5825         }
5826
5827         /* If it doesn't exist we'll assume it's a format 1 image */
5828
5829         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5830                                   "get_id", NULL, 0,
5831                                   response, RBD_IMAGE_ID_LEN_MAX);
5832         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5833         if (ret == -ENOENT) {
5834                 image_id = kstrdup("", GFP_KERNEL);
5835                 ret = image_id ? 0 : -ENOMEM;
5836                 if (!ret)
5837                         rbd_dev->image_format = 1;
5838         } else if (ret >= 0) {
5839                 void *p = response;
5840
5841                 image_id = ceph_extract_encoded_string(&p, p + ret,
5842                                                 NULL, GFP_NOIO);
5843                 ret = PTR_ERR_OR_ZERO(image_id);
5844                 if (!ret)
5845                         rbd_dev->image_format = 2;
5846         }
5847
5848         if (!ret) {
5849                 rbd_dev->spec->image_id = image_id;
5850                 dout("image_id is %s\n", image_id);
5851         }
5852 out:
5853         kfree(response);
5854         ceph_oid_destroy(&oid);
5855         return ret;
5856 }
5857
5858 /*
5859  * Undo whatever state changes are made by v1 or v2 header info
5860  * call.
5861  */
5862 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5863 {
5864         struct rbd_image_header *header;
5865
5866         rbd_dev_parent_put(rbd_dev);
5867
5868         /* Free dynamic fields from the header, then zero it out */
5869
5870         header = &rbd_dev->header;
5871         ceph_put_snap_context(header->snapc);
5872         kfree(header->snap_sizes);
5873         kfree(header->snap_names);
5874         kfree(header->object_prefix);
5875         memset(header, 0, sizeof (*header));
5876 }
5877
5878 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5879 {
5880         int ret;
5881
5882         ret = rbd_dev_v2_object_prefix(rbd_dev);
5883         if (ret)
5884                 goto out_err;
5885
5886         /*
5887          * Get the and check features for the image.  Currently the
5888          * features are assumed to never change.
5889          */
5890         ret = rbd_dev_v2_features(rbd_dev);
5891         if (ret)
5892                 goto out_err;
5893
5894         /* If the image supports fancy striping, get its parameters */
5895
5896         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5897                 ret = rbd_dev_v2_striping_info(rbd_dev);
5898                 if (ret < 0)
5899                         goto out_err;
5900         }
5901
5902         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5903                 ret = rbd_dev_v2_data_pool(rbd_dev);
5904                 if (ret)
5905                         goto out_err;
5906         }
5907
5908         rbd_init_layout(rbd_dev);
5909         return 0;
5910
5911 out_err:
5912         rbd_dev->header.features = 0;
5913         kfree(rbd_dev->header.object_prefix);
5914         rbd_dev->header.object_prefix = NULL;
5915         return ret;
5916 }
5917
5918 /*
5919  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5920  * rbd_dev_image_probe() recursion depth, which means it's also the
5921  * length of the already discovered part of the parent chain.
5922  */
5923 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5924 {
5925         struct rbd_device *parent = NULL;
5926         int ret;
5927
5928         if (!rbd_dev->parent_spec)
5929                 return 0;
5930
5931         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5932                 pr_info("parent chain is too long (%d)\n", depth);
5933                 ret = -EINVAL;
5934                 goto out_err;
5935         }
5936
5937         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5938         if (!parent) {
5939                 ret = -ENOMEM;
5940                 goto out_err;
5941         }
5942
5943         /*
5944          * Images related by parent/child relationships always share
5945          * rbd_client and spec/parent_spec, so bump their refcounts.
5946          */
5947         __rbd_get_client(rbd_dev->rbd_client);
5948         rbd_spec_get(rbd_dev->parent_spec);
5949
5950         ret = rbd_dev_image_probe(parent, depth);
5951         if (ret < 0)
5952                 goto out_err;
5953
5954         rbd_dev->parent = parent;
5955         atomic_set(&rbd_dev->parent_ref, 1);
5956         return 0;
5957
5958 out_err:
5959         rbd_dev_unparent(rbd_dev);
5960         rbd_dev_destroy(parent);
5961         return ret;
5962 }
5963
5964 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5965 {
5966         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5967         rbd_dev_mapping_clear(rbd_dev);
5968         rbd_free_disk(rbd_dev);
5969         if (!single_major)
5970                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5971 }
5972
5973 /*
5974  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5975  * upon return.
5976  */
5977 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5978 {
5979         int ret;
5980
5981         /* Record our major and minor device numbers. */
5982
5983         if (!single_major) {
5984                 ret = register_blkdev(0, rbd_dev->name);
5985                 if (ret < 0)
5986                         goto err_out_unlock;
5987
5988                 rbd_dev->major = ret;
5989                 rbd_dev->minor = 0;
5990         } else {
5991                 rbd_dev->major = rbd_major;
5992                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5993         }
5994
5995         /* Set up the blkdev mapping. */
5996
5997         ret = rbd_init_disk(rbd_dev);
5998         if (ret)
5999                 goto err_out_blkdev;
6000
6001         ret = rbd_dev_mapping_set(rbd_dev);
6002         if (ret)
6003                 goto err_out_disk;
6004
6005         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6006         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
6007
6008         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6009         if (ret)
6010                 goto err_out_mapping;
6011
6012         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6013         up_write(&rbd_dev->header_rwsem);
6014         return 0;
6015
6016 err_out_mapping:
6017         rbd_dev_mapping_clear(rbd_dev);
6018 err_out_disk:
6019         rbd_free_disk(rbd_dev);
6020 err_out_blkdev:
6021         if (!single_major)
6022                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6023 err_out_unlock:
6024         up_write(&rbd_dev->header_rwsem);
6025         return ret;
6026 }
6027
6028 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6029 {
6030         struct rbd_spec *spec = rbd_dev->spec;
6031         int ret;
6032
6033         /* Record the header object name for this rbd image. */
6034
6035         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6036         if (rbd_dev->image_format == 1)
6037                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6038                                        spec->image_name, RBD_SUFFIX);
6039         else
6040                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6041                                        RBD_HEADER_PREFIX, spec->image_id);
6042
6043         return ret;
6044 }
6045
6046 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6047 {
6048         if (rbd_dev->opts)
6049                 rbd_unregister_watch(rbd_dev);
6050
6051         rbd_dev_unprobe(rbd_dev);
6052         rbd_dev->image_format = 0;
6053         kfree(rbd_dev->spec->image_id);
6054         rbd_dev->spec->image_id = NULL;
6055 }
6056
6057 /*
6058  * Probe for the existence of the header object for the given rbd
6059  * device.  If this image is the one being mapped (i.e., not a
6060  * parent), initiate a watch on its header object before using that
6061  * object to get detailed information about the rbd image.
6062  *
6063  * On success, returns with header_rwsem held for write if called
6064  * with @depth == 0.
6065  */
6066 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6067 {
6068         int ret;
6069
6070         /*
6071          * Get the id from the image id object.  Unless there's an
6072          * error, rbd_dev->spec->image_id will be filled in with
6073          * a dynamically-allocated string, and rbd_dev->image_format
6074          * will be set to either 1 or 2.
6075          */
6076         ret = rbd_dev_image_id(rbd_dev);
6077         if (ret)
6078                 return ret;
6079
6080         ret = rbd_dev_header_name(rbd_dev);
6081         if (ret)
6082                 goto err_out_format;
6083
6084         if (!depth) {
6085                 ret = rbd_register_watch(rbd_dev);
6086                 if (ret) {
6087                         if (ret == -ENOENT)
6088                                 pr_info("image %s/%s does not exist\n",
6089                                         rbd_dev->spec->pool_name,
6090                                         rbd_dev->spec->image_name);
6091                         goto err_out_format;
6092                 }
6093         }
6094
6095         if (!depth)
6096                 down_write(&rbd_dev->header_rwsem);
6097
6098         ret = rbd_dev_header_info(rbd_dev);
6099         if (ret)
6100                 goto err_out_probe;
6101
6102         /*
6103          * If this image is the one being mapped, we have pool name and
6104          * id, image name and id, and snap name - need to fill snap id.
6105          * Otherwise this is a parent image, identified by pool, image
6106          * and snap ids - need to fill in names for those ids.
6107          */
6108         if (!depth)
6109                 ret = rbd_spec_fill_snap_id(rbd_dev);
6110         else
6111                 ret = rbd_spec_fill_names(rbd_dev);
6112         if (ret) {
6113                 if (ret == -ENOENT)
6114                         pr_info("snap %s/%s@%s does not exist\n",
6115                                 rbd_dev->spec->pool_name,
6116                                 rbd_dev->spec->image_name,
6117                                 rbd_dev->spec->snap_name);
6118                 goto err_out_probe;
6119         }
6120
6121         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6122                 ret = rbd_dev_v2_parent_info(rbd_dev);
6123                 if (ret)
6124                         goto err_out_probe;
6125
6126                 /*
6127                  * Need to warn users if this image is the one being
6128                  * mapped and has a parent.
6129                  */
6130                 if (!depth && rbd_dev->parent_spec)
6131                         rbd_warn(rbd_dev,
6132                                  "WARNING: kernel layering is EXPERIMENTAL!");
6133         }
6134
6135         ret = rbd_dev_probe_parent(rbd_dev, depth);
6136         if (ret)
6137                 goto err_out_probe;
6138
6139         dout("discovered format %u image, header name is %s\n",
6140                 rbd_dev->image_format, rbd_dev->header_oid.name);
6141         return 0;
6142
6143 err_out_probe:
6144         if (!depth)
6145                 up_write(&rbd_dev->header_rwsem);
6146         if (!depth)
6147                 rbd_unregister_watch(rbd_dev);
6148         rbd_dev_unprobe(rbd_dev);
6149 err_out_format:
6150         rbd_dev->image_format = 0;
6151         kfree(rbd_dev->spec->image_id);
6152         rbd_dev->spec->image_id = NULL;
6153         return ret;
6154 }
6155
6156 static ssize_t do_rbd_add(struct bus_type *bus,
6157                           const char *buf,
6158                           size_t count)
6159 {
6160         struct rbd_device *rbd_dev = NULL;
6161         struct ceph_options *ceph_opts = NULL;
6162         struct rbd_options *rbd_opts = NULL;
6163         struct rbd_spec *spec = NULL;
6164         struct rbd_client *rbdc;
6165         bool read_only;
6166         int rc;
6167
6168         if (!capable(CAP_SYS_ADMIN))
6169                 return -EPERM;
6170
6171         if (!try_module_get(THIS_MODULE))
6172                 return -ENODEV;
6173
6174         /* parse add command */
6175         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6176         if (rc < 0)
6177                 goto out;
6178
6179         rbdc = rbd_get_client(ceph_opts);
6180         if (IS_ERR(rbdc)) {
6181                 rc = PTR_ERR(rbdc);
6182                 goto err_out_args;
6183         }
6184
6185         /* pick the pool */
6186         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6187         if (rc < 0) {
6188                 if (rc == -ENOENT)
6189                         pr_info("pool %s does not exist\n", spec->pool_name);
6190                 goto err_out_client;
6191         }
6192         spec->pool_id = (u64)rc;
6193
6194         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6195         if (!rbd_dev) {
6196                 rc = -ENOMEM;
6197                 goto err_out_client;
6198         }
6199         rbdc = NULL;            /* rbd_dev now owns this */
6200         spec = NULL;            /* rbd_dev now owns this */
6201         rbd_opts = NULL;        /* rbd_dev now owns this */
6202
6203         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6204         if (!rbd_dev->config_info) {
6205                 rc = -ENOMEM;
6206                 goto err_out_rbd_dev;
6207         }
6208
6209         rc = rbd_dev_image_probe(rbd_dev, 0);
6210         if (rc < 0)
6211                 goto err_out_rbd_dev;
6212
6213         /* If we are mapping a snapshot it must be marked read-only */
6214
6215         read_only = rbd_dev->opts->read_only;
6216         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6217                 read_only = true;
6218         rbd_dev->mapping.read_only = read_only;
6219
6220         rc = rbd_dev_device_setup(rbd_dev);
6221         if (rc)
6222                 goto err_out_image_probe;
6223
6224         if (rbd_dev->opts->exclusive) {
6225                 rc = rbd_add_acquire_lock(rbd_dev);
6226                 if (rc)
6227                         goto err_out_device_setup;
6228         }
6229
6230         /* Everything's ready.  Announce the disk to the world. */
6231
6232         rc = device_add(&rbd_dev->dev);
6233         if (rc)
6234                 goto err_out_image_lock;
6235
6236         add_disk(rbd_dev->disk);
6237         /* see rbd_init_disk() */
6238         blk_put_queue(rbd_dev->disk->queue);
6239
6240         spin_lock(&rbd_dev_list_lock);
6241         list_add_tail(&rbd_dev->node, &rbd_dev_list);
6242         spin_unlock(&rbd_dev_list_lock);
6243
6244         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6245                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6246                 rbd_dev->header.features);
6247         rc = count;
6248 out:
6249         module_put(THIS_MODULE);
6250         return rc;
6251
6252 err_out_image_lock:
6253         rbd_dev_image_unlock(rbd_dev);
6254 err_out_device_setup:
6255         rbd_dev_device_release(rbd_dev);
6256 err_out_image_probe:
6257         rbd_dev_image_release(rbd_dev);
6258 err_out_rbd_dev:
6259         rbd_dev_destroy(rbd_dev);
6260 err_out_client:
6261         rbd_put_client(rbdc);
6262 err_out_args:
6263         rbd_spec_put(spec);
6264         kfree(rbd_opts);
6265         goto out;
6266 }
6267
6268 static ssize_t rbd_add(struct bus_type *bus,
6269                        const char *buf,
6270                        size_t count)
6271 {
6272         if (single_major)
6273                 return -EINVAL;
6274
6275         return do_rbd_add(bus, buf, count);
6276 }
6277
6278 static ssize_t rbd_add_single_major(struct bus_type *bus,
6279                                     const char *buf,
6280                                     size_t count)
6281 {
6282         return do_rbd_add(bus, buf, count);
6283 }
6284
6285 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6286 {
6287         while (rbd_dev->parent) {
6288                 struct rbd_device *first = rbd_dev;
6289                 struct rbd_device *second = first->parent;
6290                 struct rbd_device *third;
6291
6292                 /*
6293                  * Follow to the parent with no grandparent and
6294                  * remove it.
6295                  */
6296                 while (second && (third = second->parent)) {
6297                         first = second;
6298                         second = third;
6299                 }
6300                 rbd_assert(second);
6301                 rbd_dev_image_release(second);
6302                 rbd_dev_destroy(second);
6303                 first->parent = NULL;
6304                 first->parent_overlap = 0;
6305
6306                 rbd_assert(first->parent_spec);
6307                 rbd_spec_put(first->parent_spec);
6308                 first->parent_spec = NULL;
6309         }
6310 }
6311
6312 static ssize_t do_rbd_remove(struct bus_type *bus,
6313                              const char *buf,
6314                              size_t count)
6315 {
6316         struct rbd_device *rbd_dev = NULL;
6317         struct list_head *tmp;
6318         int dev_id;
6319         char opt_buf[6];
6320         bool force = false;
6321         int ret;
6322
6323         if (!capable(CAP_SYS_ADMIN))
6324                 return -EPERM;
6325
6326         dev_id = -1;
6327         opt_buf[0] = '\0';
6328         sscanf(buf, "%d %5s", &dev_id, opt_buf);
6329         if (dev_id < 0) {
6330                 pr_err("dev_id out of range\n");
6331                 return -EINVAL;
6332         }
6333         if (opt_buf[0] != '\0') {
6334                 if (!strcmp(opt_buf, "force")) {
6335                         force = true;
6336                 } else {
6337                         pr_err("bad remove option at '%s'\n", opt_buf);
6338                         return -EINVAL;
6339                 }
6340         }
6341
6342         ret = -ENOENT;
6343         spin_lock(&rbd_dev_list_lock);
6344         list_for_each(tmp, &rbd_dev_list) {
6345                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6346                 if (rbd_dev->dev_id == dev_id) {
6347                         ret = 0;
6348                         break;
6349                 }
6350         }
6351         if (!ret) {
6352                 spin_lock_irq(&rbd_dev->lock);
6353                 if (rbd_dev->open_count && !force)
6354                         ret = -EBUSY;
6355                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6356                                           &rbd_dev->flags))
6357                         ret = -EINPROGRESS;
6358                 spin_unlock_irq(&rbd_dev->lock);
6359         }
6360         spin_unlock(&rbd_dev_list_lock);
6361         if (ret)
6362                 return ret;
6363
6364         if (force) {
6365                 /*
6366                  * Prevent new IO from being queued and wait for existing
6367                  * IO to complete/fail.
6368                  */
6369                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6370                 blk_set_queue_dying(rbd_dev->disk->queue);
6371         }
6372
6373         del_gendisk(rbd_dev->disk);
6374         spin_lock(&rbd_dev_list_lock);
6375         list_del_init(&rbd_dev->node);
6376         spin_unlock(&rbd_dev_list_lock);
6377         device_del(&rbd_dev->dev);
6378
6379         rbd_dev_image_unlock(rbd_dev);
6380         rbd_dev_device_release(rbd_dev);
6381         rbd_dev_image_release(rbd_dev);
6382         rbd_dev_destroy(rbd_dev);
6383         return count;
6384 }
6385
6386 static ssize_t rbd_remove(struct bus_type *bus,
6387                           const char *buf,
6388                           size_t count)
6389 {
6390         if (single_major)
6391                 return -EINVAL;
6392
6393         return do_rbd_remove(bus, buf, count);
6394 }
6395
6396 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6397                                        const char *buf,
6398                                        size_t count)
6399 {
6400         return do_rbd_remove(bus, buf, count);
6401 }
6402
6403 /*
6404  * create control files in sysfs
6405  * /sys/bus/rbd/...
6406  */
6407 static int rbd_sysfs_init(void)
6408 {
6409         int ret;
6410
6411         ret = device_register(&rbd_root_dev);
6412         if (ret < 0)
6413                 return ret;
6414
6415         ret = bus_register(&rbd_bus_type);
6416         if (ret < 0)
6417                 device_unregister(&rbd_root_dev);
6418
6419         return ret;
6420 }
6421
6422 static void rbd_sysfs_cleanup(void)
6423 {
6424         bus_unregister(&rbd_bus_type);
6425         device_unregister(&rbd_root_dev);
6426 }
6427
6428 static int rbd_slab_init(void)
6429 {
6430         rbd_assert(!rbd_img_request_cache);
6431         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6432         if (!rbd_img_request_cache)
6433                 return -ENOMEM;
6434
6435         rbd_assert(!rbd_obj_request_cache);
6436         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6437         if (!rbd_obj_request_cache)
6438                 goto out_err;
6439
6440         rbd_assert(!rbd_bio_clone);
6441         rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6442         if (!rbd_bio_clone)
6443                 goto out_err_clone;
6444
6445         return 0;
6446
6447 out_err_clone:
6448         kmem_cache_destroy(rbd_obj_request_cache);
6449         rbd_obj_request_cache = NULL;
6450 out_err:
6451         kmem_cache_destroy(rbd_img_request_cache);
6452         rbd_img_request_cache = NULL;
6453         return -ENOMEM;
6454 }
6455
6456 static void rbd_slab_exit(void)
6457 {
6458         rbd_assert(rbd_obj_request_cache);
6459         kmem_cache_destroy(rbd_obj_request_cache);
6460         rbd_obj_request_cache = NULL;
6461
6462         rbd_assert(rbd_img_request_cache);
6463         kmem_cache_destroy(rbd_img_request_cache);
6464         rbd_img_request_cache = NULL;
6465
6466         rbd_assert(rbd_bio_clone);
6467         bioset_free(rbd_bio_clone);
6468         rbd_bio_clone = NULL;
6469 }
6470
6471 static int __init rbd_init(void)
6472 {
6473         int rc;
6474
6475         if (!libceph_compatible(NULL)) {
6476                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6477                 return -EINVAL;
6478         }
6479
6480         rc = rbd_slab_init();
6481         if (rc)
6482                 return rc;
6483
6484         /*
6485          * The number of active work items is limited by the number of
6486          * rbd devices * queue depth, so leave @max_active at default.
6487          */
6488         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6489         if (!rbd_wq) {
6490                 rc = -ENOMEM;
6491                 goto err_out_slab;
6492         }
6493
6494         if (single_major) {
6495                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6496                 if (rbd_major < 0) {
6497                         rc = rbd_major;
6498                         goto err_out_wq;
6499                 }
6500         }
6501
6502         rc = rbd_sysfs_init();
6503         if (rc)
6504                 goto err_out_blkdev;
6505
6506         if (single_major)
6507                 pr_info("loaded (major %d)\n", rbd_major);
6508         else
6509                 pr_info("loaded\n");
6510
6511         return 0;
6512
6513 err_out_blkdev:
6514         if (single_major)
6515                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6516 err_out_wq:
6517         destroy_workqueue(rbd_wq);
6518 err_out_slab:
6519         rbd_slab_exit();
6520         return rc;
6521 }
6522
6523 static void __exit rbd_exit(void)
6524 {
6525         ida_destroy(&rbd_dev_id_ida);
6526         rbd_sysfs_cleanup();
6527         if (single_major)
6528                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6529         destroy_workqueue(rbd_wq);
6530         rbd_slab_exit();
6531 }
6532
6533 module_init(rbd_init);
6534 module_exit(rbd_exit);
6535
6536 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6537 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6538 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6539 /* following authorship retained from original osdblk.c */
6540 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6541
6542 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6543 MODULE_LICENSE("GPL");