GNU Linux-libre 4.14.332-gnu1
[releases.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG       /* Activate rbd_assert() calls */
52
53 /*
54  * Increment the given counter and return its updated value.
55  * If the counter is already 0 it will not be incremented.
56  * If the counter is already at its maximum value returns
57  * -EINVAL without updating it.
58  */
59 static int atomic_inc_return_safe(atomic_t *v)
60 {
61         unsigned int counter;
62
63         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
64         if (counter <= (unsigned int)INT_MAX)
65                 return (int)counter;
66
67         atomic_dec(v);
68
69         return -EINVAL;
70 }
71
72 /* Decrement the counter.  Return the resulting value, or -EINVAL */
73 static int atomic_dec_return_safe(atomic_t *v)
74 {
75         int counter;
76
77         counter = atomic_dec_return(v);
78         if (counter >= 0)
79                 return counter;
80
81         atomic_inc(v);
82
83         return -EINVAL;
84 }
85
86 #define RBD_DRV_NAME "rbd"
87
88 #define RBD_MINORS_PER_MAJOR            256
89 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
90
91 #define RBD_MAX_PARENT_CHAIN_LEN        16
92
93 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
94 #define RBD_MAX_SNAP_NAME_LEN   \
95                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
96
97 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
98
99 #define RBD_SNAP_HEAD_NAME      "-"
100
101 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
102
103 /* This allows a single page to hold an image name sent by OSD */
104 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
105 #define RBD_IMAGE_ID_LEN_MAX    64
106
107 #define RBD_OBJ_PREFIX_LEN_MAX  64
108
109 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
110 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING            (1ULL<<0)
115 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
116 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
117 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
118 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
119
120 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
121                                  RBD_FEATURE_STRIPINGV2 |       \
122                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
123                                  RBD_FEATURE_DATA_POOL |        \
124                                  RBD_FEATURE_OPERATIONS)
125
126 /* Features supported by this (client software) implementation. */
127
128 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
129
130 /*
131  * An RBD device name will be "rbd#", where the "rbd" comes from
132  * RBD_DRV_NAME above, and # is a unique integer identifier.
133  */
134 #define DEV_NAME_LEN            32
135
136 /*
137  * block device image metadata (in-memory version)
138  */
139 struct rbd_image_header {
140         /* These six fields never change for a given rbd image */
141         char *object_prefix;
142         __u8 obj_order;
143         u64 stripe_unit;
144         u64 stripe_count;
145         s64 data_pool_id;
146         u64 features;           /* Might be changeable someday? */
147
148         /* The remaining fields need to be updated occasionally */
149         u64 image_size;
150         struct ceph_snap_context *snapc;
151         char *snap_names;       /* format 1 only */
152         u64 *snap_sizes;        /* format 1 only */
153 };
154
155 /*
156  * An rbd image specification.
157  *
158  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
159  * identify an image.  Each rbd_dev structure includes a pointer to
160  * an rbd_spec structure that encapsulates this identity.
161  *
162  * Each of the id's in an rbd_spec has an associated name.  For a
163  * user-mapped image, the names are supplied and the id's associated
164  * with them are looked up.  For a layered image, a parent image is
165  * defined by the tuple, and the names are looked up.
166  *
167  * An rbd_dev structure contains a parent_spec pointer which is
168  * non-null if the image it represents is a child in a layered
169  * image.  This pointer will refer to the rbd_spec structure used
170  * by the parent rbd_dev for its own identity (i.e., the structure
171  * is shared between the parent and child).
172  *
173  * Since these structures are populated once, during the discovery
174  * phase of image construction, they are effectively immutable so
175  * we make no effort to synchronize access to them.
176  *
177  * Note that code herein does not assume the image name is known (it
178  * could be a null pointer).
179  */
180 struct rbd_spec {
181         u64             pool_id;
182         const char      *pool_name;
183
184         const char      *image_id;
185         const char      *image_name;
186
187         u64             snap_id;
188         const char      *snap_name;
189
190         struct kref     kref;
191 };
192
193 /*
194  * an instance of the client.  multiple devices may share an rbd client.
195  */
196 struct rbd_client {
197         struct ceph_client      *client;
198         struct kref             kref;
199         struct list_head        node;
200 };
201
202 struct rbd_img_request;
203 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
204
205 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
206
207 struct rbd_obj_request;
208 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
209
210 enum obj_request_type {
211         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
212 };
213
214 enum obj_operation_type {
215         OBJ_OP_WRITE,
216         OBJ_OP_READ,
217         OBJ_OP_DISCARD,
218 };
219
220 enum obj_req_flags {
221         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
222         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
223         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
224         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
225 };
226
227 struct rbd_obj_request {
228         u64                     object_no;
229         u64                     offset;         /* object start byte */
230         u64                     length;         /* bytes from offset */
231         unsigned long           flags;
232
233         /*
234          * An object request associated with an image will have its
235          * img_data flag set; a standalone object request will not.
236          *
237          * A standalone object request will have which == BAD_WHICH
238          * and a null obj_request pointer.
239          *
240          * An object request initiated in support of a layered image
241          * object (to check for its existence before a write) will
242          * have which == BAD_WHICH and a non-null obj_request pointer.
243          *
244          * Finally, an object request for rbd image data will have
245          * which != BAD_WHICH, and will have a non-null img_request
246          * pointer.  The value of which will be in the range
247          * 0..(img_request->obj_request_count-1).
248          */
249         union {
250                 struct rbd_obj_request  *obj_request;   /* STAT op */
251                 struct {
252                         struct rbd_img_request  *img_request;
253                         u64                     img_offset;
254                         /* links for img_request->obj_requests list */
255                         struct list_head        links;
256                 };
257         };
258         u32                     which;          /* posn image request list */
259
260         enum obj_request_type   type;
261         union {
262                 struct bio      *bio_list;
263                 struct {
264                         struct page     **pages;
265                         u32             page_count;
266                 };
267         };
268         struct page             **copyup_pages;
269         u32                     copyup_page_count;
270
271         struct ceph_osd_request *osd_req;
272
273         u64                     xferred;        /* bytes transferred */
274         int                     result;
275
276         rbd_obj_callback_t      callback;
277         struct completion       completion;
278
279         struct kref             kref;
280 };
281
282 enum img_req_flags {
283         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
284         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
285         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
286         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
287 };
288
289 struct rbd_img_request {
290         struct rbd_device       *rbd_dev;
291         u64                     offset; /* starting image byte offset */
292         u64                     length; /* byte count from offset */
293         unsigned long           flags;
294         union {
295                 u64                     snap_id;        /* for reads */
296                 struct ceph_snap_context *snapc;        /* for writes */
297         };
298         union {
299                 struct request          *rq;            /* block request */
300                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
301         };
302         struct page             **copyup_pages;
303         u32                     copyup_page_count;
304         spinlock_t              completion_lock;/* protects next_completion */
305         u32                     next_completion;
306         rbd_img_callback_t      callback;
307         u64                     xferred;/* aggregate bytes transferred */
308         int                     result; /* first nonzero obj_request result */
309
310         u32                     obj_request_count;
311         struct list_head        obj_requests;   /* rbd_obj_request structs */
312
313         struct kref             kref;
314 };
315
316 #define for_each_obj_request(ireq, oreq) \
317         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
318 #define for_each_obj_request_from(ireq, oreq) \
319         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
320 #define for_each_obj_request_safe(ireq, oreq, n) \
321         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
322
323 enum rbd_watch_state {
324         RBD_WATCH_STATE_UNREGISTERED,
325         RBD_WATCH_STATE_REGISTERED,
326         RBD_WATCH_STATE_ERROR,
327 };
328
329 enum rbd_lock_state {
330         RBD_LOCK_STATE_UNLOCKED,
331         RBD_LOCK_STATE_LOCKED,
332         RBD_LOCK_STATE_RELEASING,
333 };
334
335 /* WatchNotify::ClientId */
336 struct rbd_client_id {
337         u64 gid;
338         u64 handle;
339 };
340
341 struct rbd_mapping {
342         u64                     size;
343         u64                     features;
344         bool                    read_only;
345 };
346
347 /*
348  * a single device
349  */
350 struct rbd_device {
351         int                     dev_id;         /* blkdev unique id */
352
353         int                     major;          /* blkdev assigned major */
354         int                     minor;
355         struct gendisk          *disk;          /* blkdev's gendisk and rq */
356
357         u32                     image_format;   /* Either 1 or 2 */
358         struct rbd_client       *rbd_client;
359
360         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
361
362         spinlock_t              lock;           /* queue, flags, open_count */
363
364         struct rbd_image_header header;
365         unsigned long           flags;          /* possibly lock protected */
366         struct rbd_spec         *spec;
367         struct rbd_options      *opts;
368         char                    *config_info;   /* add{,_single_major} string */
369
370         struct ceph_object_id   header_oid;
371         struct ceph_object_locator header_oloc;
372
373         struct ceph_file_layout layout;         /* used for all rbd requests */
374
375         struct mutex            watch_mutex;
376         enum rbd_watch_state    watch_state;
377         struct ceph_osd_linger_request *watch_handle;
378         u64                     watch_cookie;
379         struct delayed_work     watch_dwork;
380
381         struct rw_semaphore     lock_rwsem;
382         enum rbd_lock_state     lock_state;
383         char                    lock_cookie[32];
384         struct rbd_client_id    owner_cid;
385         struct work_struct      acquired_lock_work;
386         struct work_struct      released_lock_work;
387         struct delayed_work     lock_dwork;
388         struct work_struct      unlock_work;
389         wait_queue_head_t       lock_waitq;
390
391         struct workqueue_struct *task_wq;
392
393         struct rbd_spec         *parent_spec;
394         u64                     parent_overlap;
395         atomic_t                parent_ref;
396         struct rbd_device       *parent;
397
398         /* Block layer tags. */
399         struct blk_mq_tag_set   tag_set;
400
401         /* protects updating the header */
402         struct rw_semaphore     header_rwsem;
403
404         struct rbd_mapping      mapping;
405
406         struct list_head        node;
407
408         /* sysfs related */
409         struct device           dev;
410         unsigned long           open_count;     /* protected by lock */
411 };
412
413 /*
414  * Flag bits for rbd_dev->flags:
415  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
416  *   by rbd_dev->lock
417  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
418  */
419 enum rbd_dev_flags {
420         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
421         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
422         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
423 };
424
425 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
426
427 static LIST_HEAD(rbd_dev_list);    /* devices */
428 static DEFINE_SPINLOCK(rbd_dev_list_lock);
429
430 static LIST_HEAD(rbd_client_list);              /* clients */
431 static DEFINE_SPINLOCK(rbd_client_list_lock);
432
433 /* Slab caches for frequently-allocated structures */
434
435 static struct kmem_cache        *rbd_img_request_cache;
436 static struct kmem_cache        *rbd_obj_request_cache;
437
438 static struct bio_set           *rbd_bio_clone;
439
440 static int rbd_major;
441 static DEFINE_IDA(rbd_dev_id_ida);
442
443 static struct workqueue_struct *rbd_wq;
444
445 /*
446  * Default to false for now, as single-major requires >= 0.75 version of
447  * userspace rbd utility.
448  */
449 static bool single_major = false;
450 module_param(single_major, bool, S_IRUGO);
451 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
452
453 static int rbd_img_request_submit(struct rbd_img_request *img_request);
454
455 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
456                        size_t count);
457 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
458                           size_t count);
459 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
460                                     size_t count);
461 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
462                                        size_t count);
463 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
464 static void rbd_spec_put(struct rbd_spec *spec);
465
466 static int rbd_dev_id_to_minor(int dev_id)
467 {
468         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
469 }
470
471 static int minor_to_rbd_dev_id(int minor)
472 {
473         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
474 }
475
476 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
477 {
478         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
479                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
480 }
481
482 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
483 {
484         bool is_lock_owner;
485
486         down_read(&rbd_dev->lock_rwsem);
487         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
488         up_read(&rbd_dev->lock_rwsem);
489         return is_lock_owner;
490 }
491
492 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
493 {
494         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
495 }
496
497 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
498 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
499 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
500 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
501 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
502
503 static struct attribute *rbd_bus_attrs[] = {
504         &bus_attr_add.attr,
505         &bus_attr_remove.attr,
506         &bus_attr_add_single_major.attr,
507         &bus_attr_remove_single_major.attr,
508         &bus_attr_supported_features.attr,
509         NULL,
510 };
511
512 static umode_t rbd_bus_is_visible(struct kobject *kobj,
513                                   struct attribute *attr, int index)
514 {
515         if (!single_major &&
516             (attr == &bus_attr_add_single_major.attr ||
517              attr == &bus_attr_remove_single_major.attr))
518                 return 0;
519
520         return attr->mode;
521 }
522
523 static const struct attribute_group rbd_bus_group = {
524         .attrs = rbd_bus_attrs,
525         .is_visible = rbd_bus_is_visible,
526 };
527 __ATTRIBUTE_GROUPS(rbd_bus);
528
529 static struct bus_type rbd_bus_type = {
530         .name           = "rbd",
531         .bus_groups     = rbd_bus_groups,
532 };
533
534 static void rbd_root_dev_release(struct device *dev)
535 {
536 }
537
538 static struct device rbd_root_dev = {
539         .init_name =    "rbd",
540         .release =      rbd_root_dev_release,
541 };
542
543 static __printf(2, 3)
544 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
545 {
546         struct va_format vaf;
547         va_list args;
548
549         va_start(args, fmt);
550         vaf.fmt = fmt;
551         vaf.va = &args;
552
553         if (!rbd_dev)
554                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
555         else if (rbd_dev->disk)
556                 printk(KERN_WARNING "%s: %s: %pV\n",
557                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
558         else if (rbd_dev->spec && rbd_dev->spec->image_name)
559                 printk(KERN_WARNING "%s: image %s: %pV\n",
560                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
561         else if (rbd_dev->spec && rbd_dev->spec->image_id)
562                 printk(KERN_WARNING "%s: id %s: %pV\n",
563                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
564         else    /* punt */
565                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
566                         RBD_DRV_NAME, rbd_dev, &vaf);
567         va_end(args);
568 }
569
570 #ifdef RBD_DEBUG
571 #define rbd_assert(expr)                                                \
572                 if (unlikely(!(expr))) {                                \
573                         printk(KERN_ERR "\nAssertion failure in %s() "  \
574                                                 "at line %d:\n\n"       \
575                                         "\trbd_assert(%s);\n\n",        \
576                                         __func__, __LINE__, #expr);     \
577                         BUG();                                          \
578                 }
579 #else /* !RBD_DEBUG */
580 #  define rbd_assert(expr)      ((void) 0)
581 #endif /* !RBD_DEBUG */
582
583 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
584 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
585 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
586 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
587
588 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
589 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
590 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
591 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
592 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
593                                         u64 snap_id);
594 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
595                                 u8 *order, u64 *snap_size);
596 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
597                 u64 *snap_features);
598
599 static int rbd_open(struct block_device *bdev, fmode_t mode)
600 {
601         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602         bool removing = false;
603
604         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
605                 return -EROFS;
606
607         spin_lock_irq(&rbd_dev->lock);
608         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
609                 removing = true;
610         else
611                 rbd_dev->open_count++;
612         spin_unlock_irq(&rbd_dev->lock);
613         if (removing)
614                 return -ENOENT;
615
616         (void) get_device(&rbd_dev->dev);
617
618         return 0;
619 }
620
621 static void rbd_release(struct gendisk *disk, fmode_t mode)
622 {
623         struct rbd_device *rbd_dev = disk->private_data;
624         unsigned long open_count_before;
625
626         spin_lock_irq(&rbd_dev->lock);
627         open_count_before = rbd_dev->open_count--;
628         spin_unlock_irq(&rbd_dev->lock);
629         rbd_assert(open_count_before > 0);
630
631         put_device(&rbd_dev->dev);
632 }
633
634 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
635 {
636         int ret = 0;
637         int val;
638         bool ro;
639         bool ro_changed = false;
640
641         /* get_user() may sleep, so call it before taking rbd_dev->lock */
642         if (get_user(val, (int __user *)(arg)))
643                 return -EFAULT;
644
645         ro = val ? true : false;
646         /* Snapshot doesn't allow to write*/
647         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
648                 return -EROFS;
649
650         spin_lock_irq(&rbd_dev->lock);
651         /* prevent others open this device */
652         if (rbd_dev->open_count > 1) {
653                 ret = -EBUSY;
654                 goto out;
655         }
656
657         if (rbd_dev->mapping.read_only != ro) {
658                 rbd_dev->mapping.read_only = ro;
659                 ro_changed = true;
660         }
661
662 out:
663         spin_unlock_irq(&rbd_dev->lock);
664         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
665         if (ret == 0 && ro_changed)
666                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
667
668         return ret;
669 }
670
671 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
672                         unsigned int cmd, unsigned long arg)
673 {
674         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
675         int ret = 0;
676
677         switch (cmd) {
678         case BLKROSET:
679                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
680                 break;
681         default:
682                 ret = -ENOTTY;
683         }
684
685         return ret;
686 }
687
688 #ifdef CONFIG_COMPAT
689 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
690                                 unsigned int cmd, unsigned long arg)
691 {
692         return rbd_ioctl(bdev, mode, cmd, arg);
693 }
694 #endif /* CONFIG_COMPAT */
695
696 static const struct block_device_operations rbd_bd_ops = {
697         .owner                  = THIS_MODULE,
698         .open                   = rbd_open,
699         .release                = rbd_release,
700         .ioctl                  = rbd_ioctl,
701 #ifdef CONFIG_COMPAT
702         .compat_ioctl           = rbd_compat_ioctl,
703 #endif
704 };
705
706 /*
707  * Initialize an rbd client instance.  Success or not, this function
708  * consumes ceph_opts.  Caller holds client_mutex.
709  */
710 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
711 {
712         struct rbd_client *rbdc;
713         int ret = -ENOMEM;
714
715         dout("%s:\n", __func__);
716         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
717         if (!rbdc)
718                 goto out_opt;
719
720         kref_init(&rbdc->kref);
721         INIT_LIST_HEAD(&rbdc->node);
722
723         rbdc->client = ceph_create_client(ceph_opts, rbdc);
724         if (IS_ERR(rbdc->client))
725                 goto out_rbdc;
726         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
727
728         ret = ceph_open_session(rbdc->client);
729         if (ret < 0)
730                 goto out_client;
731
732         spin_lock(&rbd_client_list_lock);
733         list_add_tail(&rbdc->node, &rbd_client_list);
734         spin_unlock(&rbd_client_list_lock);
735
736         dout("%s: rbdc %p\n", __func__, rbdc);
737
738         return rbdc;
739 out_client:
740         ceph_destroy_client(rbdc->client);
741 out_rbdc:
742         kfree(rbdc);
743 out_opt:
744         if (ceph_opts)
745                 ceph_destroy_options(ceph_opts);
746         dout("%s: error %d\n", __func__, ret);
747
748         return ERR_PTR(ret);
749 }
750
751 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
752 {
753         kref_get(&rbdc->kref);
754
755         return rbdc;
756 }
757
758 /*
759  * Find a ceph client with specific addr and configuration.  If
760  * found, bump its reference count.
761  */
762 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
763 {
764         struct rbd_client *client_node;
765         bool found = false;
766
767         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
768                 return NULL;
769
770         spin_lock(&rbd_client_list_lock);
771         list_for_each_entry(client_node, &rbd_client_list, node) {
772                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
773                         __rbd_get_client(client_node);
774
775                         found = true;
776                         break;
777                 }
778         }
779         spin_unlock(&rbd_client_list_lock);
780
781         return found ? client_node : NULL;
782 }
783
784 /*
785  * (Per device) rbd map options
786  */
787 enum {
788         Opt_queue_depth,
789         Opt_last_int,
790         /* int args above */
791         Opt_last_string,
792         /* string args above */
793         Opt_read_only,
794         Opt_read_write,
795         Opt_lock_on_read,
796         Opt_exclusive,
797         Opt_err
798 };
799
800 static match_table_t rbd_opts_tokens = {
801         {Opt_queue_depth, "queue_depth=%d"},
802         /* int args above */
803         /* string args above */
804         {Opt_read_only, "read_only"},
805         {Opt_read_only, "ro"},          /* Alternate spelling */
806         {Opt_read_write, "read_write"},
807         {Opt_read_write, "rw"},         /* Alternate spelling */
808         {Opt_lock_on_read, "lock_on_read"},
809         {Opt_exclusive, "exclusive"},
810         {Opt_err, NULL}
811 };
812
813 struct rbd_options {
814         int     queue_depth;
815         bool    read_only;
816         bool    lock_on_read;
817         bool    exclusive;
818 };
819
820 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
821 #define RBD_READ_ONLY_DEFAULT   false
822 #define RBD_LOCK_ON_READ_DEFAULT false
823 #define RBD_EXCLUSIVE_DEFAULT   false
824
825 static int parse_rbd_opts_token(char *c, void *private)
826 {
827         struct rbd_options *rbd_opts = private;
828         substring_t argstr[MAX_OPT_ARGS];
829         int token, intval, ret;
830
831         token = match_token(c, rbd_opts_tokens, argstr);
832         if (token < Opt_last_int) {
833                 ret = match_int(&argstr[0], &intval);
834                 if (ret < 0) {
835                         pr_err("bad mount option arg (not int) at '%s'\n", c);
836                         return ret;
837                 }
838                 dout("got int token %d val %d\n", token, intval);
839         } else if (token > Opt_last_int && token < Opt_last_string) {
840                 dout("got string token %d val %s\n", token, argstr[0].from);
841         } else {
842                 dout("got token %d\n", token);
843         }
844
845         switch (token) {
846         case Opt_queue_depth:
847                 if (intval < 1) {
848                         pr_err("queue_depth out of range\n");
849                         return -EINVAL;
850                 }
851                 rbd_opts->queue_depth = intval;
852                 break;
853         case Opt_read_only:
854                 rbd_opts->read_only = true;
855                 break;
856         case Opt_read_write:
857                 rbd_opts->read_only = false;
858                 break;
859         case Opt_lock_on_read:
860                 rbd_opts->lock_on_read = true;
861                 break;
862         case Opt_exclusive:
863                 rbd_opts->exclusive = true;
864                 break;
865         default:
866                 /* libceph prints "bad option" msg */
867                 return -EINVAL;
868         }
869
870         return 0;
871 }
872
873 static char* obj_op_name(enum obj_operation_type op_type)
874 {
875         switch (op_type) {
876         case OBJ_OP_READ:
877                 return "read";
878         case OBJ_OP_WRITE:
879                 return "write";
880         case OBJ_OP_DISCARD:
881                 return "discard";
882         default:
883                 return "???";
884         }
885 }
886
887 /*
888  * Get a ceph client with specific addr and configuration, if one does
889  * not exist create it.  Either way, ceph_opts is consumed by this
890  * function.
891  */
892 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
893 {
894         struct rbd_client *rbdc;
895
896         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
897         rbdc = rbd_client_find(ceph_opts);
898         if (rbdc)       /* using an existing client */
899                 ceph_destroy_options(ceph_opts);
900         else
901                 rbdc = rbd_client_create(ceph_opts);
902         mutex_unlock(&client_mutex);
903
904         return rbdc;
905 }
906
907 /*
908  * Destroy ceph client
909  *
910  * Caller must hold rbd_client_list_lock.
911  */
912 static void rbd_client_release(struct kref *kref)
913 {
914         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
915
916         dout("%s: rbdc %p\n", __func__, rbdc);
917         spin_lock(&rbd_client_list_lock);
918         list_del(&rbdc->node);
919         spin_unlock(&rbd_client_list_lock);
920
921         ceph_destroy_client(rbdc->client);
922         kfree(rbdc);
923 }
924
925 /*
926  * Drop reference to ceph client node. If it's not referenced anymore, release
927  * it.
928  */
929 static void rbd_put_client(struct rbd_client *rbdc)
930 {
931         if (rbdc)
932                 kref_put(&rbdc->kref, rbd_client_release);
933 }
934
935 static bool rbd_image_format_valid(u32 image_format)
936 {
937         return image_format == 1 || image_format == 2;
938 }
939
940 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
941 {
942         size_t size;
943         u32 snap_count;
944
945         /* The header has to start with the magic rbd header text */
946         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
947                 return false;
948
949         /* The bio layer requires at least sector-sized I/O */
950
951         if (ondisk->options.order < SECTOR_SHIFT)
952                 return false;
953
954         /* If we use u64 in a few spots we may be able to loosen this */
955
956         if (ondisk->options.order > 8 * sizeof (int) - 1)
957                 return false;
958
959         /*
960          * The size of a snapshot header has to fit in a size_t, and
961          * that limits the number of snapshots.
962          */
963         snap_count = le32_to_cpu(ondisk->snap_count);
964         size = SIZE_MAX - sizeof (struct ceph_snap_context);
965         if (snap_count > size / sizeof (__le64))
966                 return false;
967
968         /*
969          * Not only that, but the size of the entire the snapshot
970          * header must also be representable in a size_t.
971          */
972         size -= snap_count * sizeof (__le64);
973         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
974                 return false;
975
976         return true;
977 }
978
979 /*
980  * returns the size of an object in the image
981  */
982 static u32 rbd_obj_bytes(struct rbd_image_header *header)
983 {
984         return 1U << header->obj_order;
985 }
986
987 static void rbd_init_layout(struct rbd_device *rbd_dev)
988 {
989         if (rbd_dev->header.stripe_unit == 0 ||
990             rbd_dev->header.stripe_count == 0) {
991                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
992                 rbd_dev->header.stripe_count = 1;
993         }
994
995         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
996         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
997         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
998         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
999                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1000         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1001 }
1002
1003 /*
1004  * Fill an rbd image header with information from the given format 1
1005  * on-disk header.
1006  */
1007 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1008                                  struct rbd_image_header_ondisk *ondisk)
1009 {
1010         struct rbd_image_header *header = &rbd_dev->header;
1011         bool first_time = header->object_prefix == NULL;
1012         struct ceph_snap_context *snapc;
1013         char *object_prefix = NULL;
1014         char *snap_names = NULL;
1015         u64 *snap_sizes = NULL;
1016         u32 snap_count;
1017         int ret = -ENOMEM;
1018         u32 i;
1019
1020         /* Allocate this now to avoid having to handle failure below */
1021
1022         if (first_time) {
1023                 object_prefix = kstrndup(ondisk->object_prefix,
1024                                          sizeof(ondisk->object_prefix),
1025                                          GFP_KERNEL);
1026                 if (!object_prefix)
1027                         return -ENOMEM;
1028         }
1029
1030         /* Allocate the snapshot context and fill it in */
1031
1032         snap_count = le32_to_cpu(ondisk->snap_count);
1033         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1034         if (!snapc)
1035                 goto out_err;
1036         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1037         if (snap_count) {
1038                 struct rbd_image_snap_ondisk *snaps;
1039                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1040
1041                 /* We'll keep a copy of the snapshot names... */
1042
1043                 if (snap_names_len > (u64)SIZE_MAX)
1044                         goto out_2big;
1045                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1046                 if (!snap_names)
1047                         goto out_err;
1048
1049                 /* ...as well as the array of their sizes. */
1050                 snap_sizes = kmalloc_array(snap_count,
1051                                            sizeof(*header->snap_sizes),
1052                                            GFP_KERNEL);
1053                 if (!snap_sizes)
1054                         goto out_err;
1055
1056                 /*
1057                  * Copy the names, and fill in each snapshot's id
1058                  * and size.
1059                  *
1060                  * Note that rbd_dev_v1_header_info() guarantees the
1061                  * ondisk buffer we're working with has
1062                  * snap_names_len bytes beyond the end of the
1063                  * snapshot id array, this memcpy() is safe.
1064                  */
1065                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1066                 snaps = ondisk->snaps;
1067                 for (i = 0; i < snap_count; i++) {
1068                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1069                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1070                 }
1071         }
1072
1073         /* We won't fail any more, fill in the header */
1074
1075         if (first_time) {
1076                 header->object_prefix = object_prefix;
1077                 header->obj_order = ondisk->options.order;
1078                 rbd_init_layout(rbd_dev);
1079         } else {
1080                 ceph_put_snap_context(header->snapc);
1081                 kfree(header->snap_names);
1082                 kfree(header->snap_sizes);
1083         }
1084
1085         /* The remaining fields always get updated (when we refresh) */
1086
1087         header->image_size = le64_to_cpu(ondisk->image_size);
1088         header->snapc = snapc;
1089         header->snap_names = snap_names;
1090         header->snap_sizes = snap_sizes;
1091
1092         return 0;
1093 out_2big:
1094         ret = -EIO;
1095 out_err:
1096         kfree(snap_sizes);
1097         kfree(snap_names);
1098         ceph_put_snap_context(snapc);
1099         kfree(object_prefix);
1100
1101         return ret;
1102 }
1103
1104 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1105 {
1106         const char *snap_name;
1107
1108         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1109
1110         /* Skip over names until we find the one we are looking for */
1111
1112         snap_name = rbd_dev->header.snap_names;
1113         while (which--)
1114                 snap_name += strlen(snap_name) + 1;
1115
1116         return kstrdup(snap_name, GFP_KERNEL);
1117 }
1118
1119 /*
1120  * Snapshot id comparison function for use with qsort()/bsearch().
1121  * Note that result is for snapshots in *descending* order.
1122  */
1123 static int snapid_compare_reverse(const void *s1, const void *s2)
1124 {
1125         u64 snap_id1 = *(u64 *)s1;
1126         u64 snap_id2 = *(u64 *)s2;
1127
1128         if (snap_id1 < snap_id2)
1129                 return 1;
1130         return snap_id1 == snap_id2 ? 0 : -1;
1131 }
1132
1133 /*
1134  * Search a snapshot context to see if the given snapshot id is
1135  * present.
1136  *
1137  * Returns the position of the snapshot id in the array if it's found,
1138  * or BAD_SNAP_INDEX otherwise.
1139  *
1140  * Note: The snapshot array is in kept sorted (by the osd) in
1141  * reverse order, highest snapshot id first.
1142  */
1143 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1144 {
1145         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1146         u64 *found;
1147
1148         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1149                                 sizeof (snap_id), snapid_compare_reverse);
1150
1151         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1152 }
1153
1154 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1155                                         u64 snap_id)
1156 {
1157         u32 which;
1158         const char *snap_name;
1159
1160         which = rbd_dev_snap_index(rbd_dev, snap_id);
1161         if (which == BAD_SNAP_INDEX)
1162                 return ERR_PTR(-ENOENT);
1163
1164         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1165         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1166 }
1167
1168 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1169 {
1170         if (snap_id == CEPH_NOSNAP)
1171                 return RBD_SNAP_HEAD_NAME;
1172
1173         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1174         if (rbd_dev->image_format == 1)
1175                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1176
1177         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1178 }
1179
1180 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1181                                 u64 *snap_size)
1182 {
1183         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1184         if (snap_id == CEPH_NOSNAP) {
1185                 *snap_size = rbd_dev->header.image_size;
1186         } else if (rbd_dev->image_format == 1) {
1187                 u32 which;
1188
1189                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1190                 if (which == BAD_SNAP_INDEX)
1191                         return -ENOENT;
1192
1193                 *snap_size = rbd_dev->header.snap_sizes[which];
1194         } else {
1195                 u64 size = 0;
1196                 int ret;
1197
1198                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1199                 if (ret)
1200                         return ret;
1201
1202                 *snap_size = size;
1203         }
1204         return 0;
1205 }
1206
1207 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1208                         u64 *snap_features)
1209 {
1210         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1211         if (snap_id == CEPH_NOSNAP) {
1212                 *snap_features = rbd_dev->header.features;
1213         } else if (rbd_dev->image_format == 1) {
1214                 *snap_features = 0;     /* No features for format 1 */
1215         } else {
1216                 u64 features = 0;
1217                 int ret;
1218
1219                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1220                 if (ret)
1221                         return ret;
1222
1223                 *snap_features = features;
1224         }
1225         return 0;
1226 }
1227
1228 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1229 {
1230         u64 snap_id = rbd_dev->spec->snap_id;
1231         u64 size = 0;
1232         u64 features = 0;
1233         int ret;
1234
1235         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1236         if (ret)
1237                 return ret;
1238         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1239         if (ret)
1240                 return ret;
1241
1242         rbd_dev->mapping.size = size;
1243         rbd_dev->mapping.features = features;
1244
1245         return 0;
1246 }
1247
1248 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1249 {
1250         rbd_dev->mapping.size = 0;
1251         rbd_dev->mapping.features = 0;
1252 }
1253
1254 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1255 {
1256         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1257
1258         return offset & (segment_size - 1);
1259 }
1260
1261 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1262                                 u64 offset, u64 length)
1263 {
1264         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1265
1266         offset &= segment_size - 1;
1267
1268         rbd_assert(length <= U64_MAX - offset);
1269         if (offset + length > segment_size)
1270                 length = segment_size - offset;
1271
1272         return length;
1273 }
1274
1275 /*
1276  * bio helpers
1277  */
1278
1279 static void bio_chain_put(struct bio *chain)
1280 {
1281         struct bio *tmp;
1282
1283         while (chain) {
1284                 tmp = chain;
1285                 chain = chain->bi_next;
1286                 bio_put(tmp);
1287         }
1288 }
1289
1290 /*
1291  * zeros a bio chain, starting at specific offset
1292  */
1293 static void zero_bio_chain(struct bio *chain, int start_ofs)
1294 {
1295         struct bio_vec bv;
1296         struct bvec_iter iter;
1297         unsigned long flags;
1298         void *buf;
1299         int pos = 0;
1300
1301         while (chain) {
1302                 bio_for_each_segment(bv, chain, iter) {
1303                         if (pos + bv.bv_len > start_ofs) {
1304                                 int remainder = max(start_ofs - pos, 0);
1305                                 buf = bvec_kmap_irq(&bv, &flags);
1306                                 memset(buf + remainder, 0,
1307                                        bv.bv_len - remainder);
1308                                 flush_dcache_page(bv.bv_page);
1309                                 bvec_kunmap_irq(buf, &flags);
1310                         }
1311                         pos += bv.bv_len;
1312                 }
1313
1314                 chain = chain->bi_next;
1315         }
1316 }
1317
1318 /*
1319  * similar to zero_bio_chain(), zeros data defined by a page array,
1320  * starting at the given byte offset from the start of the array and
1321  * continuing up to the given end offset.  The pages array is
1322  * assumed to be big enough to hold all bytes up to the end.
1323  */
1324 static void zero_pages(struct page **pages, u64 offset, u64 end)
1325 {
1326         struct page **page = &pages[offset >> PAGE_SHIFT];
1327
1328         rbd_assert(end > offset);
1329         rbd_assert(end - offset <= (u64)SIZE_MAX);
1330         while (offset < end) {
1331                 size_t page_offset;
1332                 size_t length;
1333                 unsigned long flags;
1334                 void *kaddr;
1335
1336                 page_offset = offset & ~PAGE_MASK;
1337                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1338                 local_irq_save(flags);
1339                 kaddr = kmap_atomic(*page);
1340                 memset(kaddr + page_offset, 0, length);
1341                 flush_dcache_page(*page);
1342                 kunmap_atomic(kaddr);
1343                 local_irq_restore(flags);
1344
1345                 offset += length;
1346                 page++;
1347         }
1348 }
1349
1350 /*
1351  * Clone a portion of a bio, starting at the given byte offset
1352  * and continuing for the number of bytes indicated.
1353  */
1354 static struct bio *bio_clone_range(struct bio *bio_src,
1355                                         unsigned int offset,
1356                                         unsigned int len,
1357                                         gfp_t gfpmask)
1358 {
1359         struct bio *bio;
1360
1361         bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1362         if (!bio)
1363                 return NULL;    /* ENOMEM */
1364
1365         bio_advance(bio, offset);
1366         bio->bi_iter.bi_size = len;
1367
1368         return bio;
1369 }
1370
1371 /*
1372  * Clone a portion of a bio chain, starting at the given byte offset
1373  * into the first bio in the source chain and continuing for the
1374  * number of bytes indicated.  The result is another bio chain of
1375  * exactly the given length, or a null pointer on error.
1376  *
1377  * The bio_src and offset parameters are both in-out.  On entry they
1378  * refer to the first source bio and the offset into that bio where
1379  * the start of data to be cloned is located.
1380  *
1381  * On return, bio_src is updated to refer to the bio in the source
1382  * chain that contains first un-cloned byte, and *offset will
1383  * contain the offset of that byte within that bio.
1384  */
1385 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1386                                         unsigned int *offset,
1387                                         unsigned int len,
1388                                         gfp_t gfpmask)
1389 {
1390         struct bio *bi = *bio_src;
1391         unsigned int off = *offset;
1392         struct bio *chain = NULL;
1393         struct bio **end;
1394
1395         /* Build up a chain of clone bios up to the limit */
1396
1397         if (!bi || off >= bi->bi_iter.bi_size || !len)
1398                 return NULL;            /* Nothing to clone */
1399
1400         end = &chain;
1401         while (len) {
1402                 unsigned int bi_size;
1403                 struct bio *bio;
1404
1405                 if (!bi) {
1406                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1407                         goto out_err;   /* EINVAL; ran out of bio's */
1408                 }
1409                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1410                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1411                 if (!bio)
1412                         goto out_err;   /* ENOMEM */
1413
1414                 *end = bio;
1415                 end = &bio->bi_next;
1416
1417                 off += bi_size;
1418                 if (off == bi->bi_iter.bi_size) {
1419                         bi = bi->bi_next;
1420                         off = 0;
1421                 }
1422                 len -= bi_size;
1423         }
1424         *bio_src = bi;
1425         *offset = off;
1426
1427         return chain;
1428 out_err:
1429         bio_chain_put(chain);
1430
1431         return NULL;
1432 }
1433
1434 /*
1435  * The default/initial value for all object request flags is 0.  For
1436  * each flag, once its value is set to 1 it is never reset to 0
1437  * again.
1438  */
1439 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1440 {
1441         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1442                 struct rbd_device *rbd_dev;
1443
1444                 rbd_dev = obj_request->img_request->rbd_dev;
1445                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1446                         obj_request);
1447         }
1448 }
1449
1450 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1451 {
1452         smp_mb();
1453         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1454 }
1455
1456 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1457 {
1458         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1459                 struct rbd_device *rbd_dev = NULL;
1460
1461                 if (obj_request_img_data_test(obj_request))
1462                         rbd_dev = obj_request->img_request->rbd_dev;
1463                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1464                         obj_request);
1465         }
1466 }
1467
1468 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1469 {
1470         smp_mb();
1471         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1472 }
1473
1474 /*
1475  * This sets the KNOWN flag after (possibly) setting the EXISTS
1476  * flag.  The latter is set based on the "exists" value provided.
1477  *
1478  * Note that for our purposes once an object exists it never goes
1479  * away again.  It's possible that the response from two existence
1480  * checks are separated by the creation of the target object, and
1481  * the first ("doesn't exist") response arrives *after* the second
1482  * ("does exist").  In that case we ignore the second one.
1483  */
1484 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1485                                 bool exists)
1486 {
1487         if (exists)
1488                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1489         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1490         smp_mb();
1491 }
1492
1493 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1494 {
1495         smp_mb();
1496         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1497 }
1498
1499 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1500 {
1501         smp_mb();
1502         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1503 }
1504
1505 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1506 {
1507         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1508
1509         return obj_request->img_offset <
1510             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1511 }
1512
1513 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1514 {
1515         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1516                 kref_read(&obj_request->kref));
1517         kref_get(&obj_request->kref);
1518 }
1519
1520 static void rbd_obj_request_destroy(struct kref *kref);
1521 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1522 {
1523         rbd_assert(obj_request != NULL);
1524         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1525                 kref_read(&obj_request->kref));
1526         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1527 }
1528
1529 static void rbd_img_request_get(struct rbd_img_request *img_request)
1530 {
1531         dout("%s: img %p (was %d)\n", __func__, img_request,
1532              kref_read(&img_request->kref));
1533         kref_get(&img_request->kref);
1534 }
1535
1536 static bool img_request_child_test(struct rbd_img_request *img_request);
1537 static void rbd_parent_request_destroy(struct kref *kref);
1538 static void rbd_img_request_destroy(struct kref *kref);
1539 static void rbd_img_request_put(struct rbd_img_request *img_request)
1540 {
1541         rbd_assert(img_request != NULL);
1542         dout("%s: img %p (was %d)\n", __func__, img_request,
1543                 kref_read(&img_request->kref));
1544         if (img_request_child_test(img_request))
1545                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1546         else
1547                 kref_put(&img_request->kref, rbd_img_request_destroy);
1548 }
1549
1550 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1551                                         struct rbd_obj_request *obj_request)
1552 {
1553         rbd_assert(obj_request->img_request == NULL);
1554
1555         /* Image request now owns object's original reference */
1556         obj_request->img_request = img_request;
1557         obj_request->which = img_request->obj_request_count;
1558         rbd_assert(!obj_request_img_data_test(obj_request));
1559         obj_request_img_data_set(obj_request);
1560         rbd_assert(obj_request->which != BAD_WHICH);
1561         img_request->obj_request_count++;
1562         list_add_tail(&obj_request->links, &img_request->obj_requests);
1563         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1564                 obj_request->which);
1565 }
1566
1567 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1568                                         struct rbd_obj_request *obj_request)
1569 {
1570         rbd_assert(obj_request->which != BAD_WHICH);
1571
1572         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1573                 obj_request->which);
1574         list_del(&obj_request->links);
1575         rbd_assert(img_request->obj_request_count > 0);
1576         img_request->obj_request_count--;
1577         rbd_assert(obj_request->which == img_request->obj_request_count);
1578         obj_request->which = BAD_WHICH;
1579         rbd_assert(obj_request_img_data_test(obj_request));
1580         rbd_assert(obj_request->img_request == img_request);
1581         obj_request->img_request = NULL;
1582         obj_request->callback = NULL;
1583         rbd_obj_request_put(obj_request);
1584 }
1585
1586 static bool obj_request_type_valid(enum obj_request_type type)
1587 {
1588         switch (type) {
1589         case OBJ_REQUEST_NODATA:
1590         case OBJ_REQUEST_BIO:
1591         case OBJ_REQUEST_PAGES:
1592                 return true;
1593         default:
1594                 return false;
1595         }
1596 }
1597
1598 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1599
1600 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1601 {
1602         struct ceph_osd_request *osd_req = obj_request->osd_req;
1603
1604         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1605              obj_request, obj_request->object_no, obj_request->offset,
1606              obj_request->length, osd_req);
1607         if (obj_request_img_data_test(obj_request)) {
1608                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1609                 rbd_img_request_get(obj_request->img_request);
1610         }
1611         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1612 }
1613
1614 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1615 {
1616
1617         dout("%s: img %p\n", __func__, img_request);
1618
1619         /*
1620          * If no error occurred, compute the aggregate transfer
1621          * count for the image request.  We could instead use
1622          * atomic64_cmpxchg() to update it as each object request
1623          * completes; not clear which way is better off hand.
1624          */
1625         if (!img_request->result) {
1626                 struct rbd_obj_request *obj_request;
1627                 u64 xferred = 0;
1628
1629                 for_each_obj_request(img_request, obj_request)
1630                         xferred += obj_request->xferred;
1631                 img_request->xferred = xferred;
1632         }
1633
1634         if (img_request->callback)
1635                 img_request->callback(img_request);
1636         else
1637                 rbd_img_request_put(img_request);
1638 }
1639
1640 /*
1641  * The default/initial value for all image request flags is 0.  Each
1642  * is conditionally set to 1 at image request initialization time
1643  * and currently never change thereafter.
1644  */
1645 static void img_request_write_set(struct rbd_img_request *img_request)
1646 {
1647         set_bit(IMG_REQ_WRITE, &img_request->flags);
1648         smp_mb();
1649 }
1650
1651 static bool img_request_write_test(struct rbd_img_request *img_request)
1652 {
1653         smp_mb();
1654         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1655 }
1656
1657 /*
1658  * Set the discard flag when the img_request is an discard request
1659  */
1660 static void img_request_discard_set(struct rbd_img_request *img_request)
1661 {
1662         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1663         smp_mb();
1664 }
1665
1666 static bool img_request_discard_test(struct rbd_img_request *img_request)
1667 {
1668         smp_mb();
1669         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1670 }
1671
1672 static void img_request_child_set(struct rbd_img_request *img_request)
1673 {
1674         set_bit(IMG_REQ_CHILD, &img_request->flags);
1675         smp_mb();
1676 }
1677
1678 static void img_request_child_clear(struct rbd_img_request *img_request)
1679 {
1680         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1681         smp_mb();
1682 }
1683
1684 static bool img_request_child_test(struct rbd_img_request *img_request)
1685 {
1686         smp_mb();
1687         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1688 }
1689
1690 static void img_request_layered_set(struct rbd_img_request *img_request)
1691 {
1692         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1693         smp_mb();
1694 }
1695
1696 static void img_request_layered_clear(struct rbd_img_request *img_request)
1697 {
1698         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1699         smp_mb();
1700 }
1701
1702 static bool img_request_layered_test(struct rbd_img_request *img_request)
1703 {
1704         smp_mb();
1705         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1706 }
1707
1708 static enum obj_operation_type
1709 rbd_img_request_op_type(struct rbd_img_request *img_request)
1710 {
1711         if (img_request_write_test(img_request))
1712                 return OBJ_OP_WRITE;
1713         else if (img_request_discard_test(img_request))
1714                 return OBJ_OP_DISCARD;
1715         else
1716                 return OBJ_OP_READ;
1717 }
1718
1719 static void
1720 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1721 {
1722         u64 xferred = obj_request->xferred;
1723         u64 length = obj_request->length;
1724
1725         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1726                 obj_request, obj_request->img_request, obj_request->result,
1727                 xferred, length);
1728         /*
1729          * ENOENT means a hole in the image.  We zero-fill the entire
1730          * length of the request.  A short read also implies zero-fill
1731          * to the end of the request.  An error requires the whole
1732          * length of the request to be reported finished with an error
1733          * to the block layer.  In each case we update the xferred
1734          * count to indicate the whole request was satisfied.
1735          */
1736         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1737         if (obj_request->result == -ENOENT) {
1738                 if (obj_request->type == OBJ_REQUEST_BIO)
1739                         zero_bio_chain(obj_request->bio_list, 0);
1740                 else
1741                         zero_pages(obj_request->pages, 0, length);
1742                 obj_request->result = 0;
1743         } else if (xferred < length && !obj_request->result) {
1744                 if (obj_request->type == OBJ_REQUEST_BIO)
1745                         zero_bio_chain(obj_request->bio_list, xferred);
1746                 else
1747                         zero_pages(obj_request->pages, xferred, length);
1748         }
1749         obj_request->xferred = length;
1750         obj_request_done_set(obj_request);
1751 }
1752
1753 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1754 {
1755         dout("%s: obj %p cb %p\n", __func__, obj_request,
1756                 obj_request->callback);
1757         if (obj_request->callback)
1758                 obj_request->callback(obj_request);
1759         else
1760                 complete_all(&obj_request->completion);
1761 }
1762
1763 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1764 {
1765         obj_request->result = err;
1766         obj_request->xferred = 0;
1767         /*
1768          * kludge - mirror rbd_obj_request_submit() to match a put in
1769          * rbd_img_obj_callback()
1770          */
1771         if (obj_request_img_data_test(obj_request)) {
1772                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1773                 rbd_img_request_get(obj_request->img_request);
1774         }
1775         obj_request_done_set(obj_request);
1776         rbd_obj_request_complete(obj_request);
1777 }
1778
1779 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1780 {
1781         struct rbd_img_request *img_request = NULL;
1782         struct rbd_device *rbd_dev = NULL;
1783         bool layered = false;
1784
1785         if (obj_request_img_data_test(obj_request)) {
1786                 img_request = obj_request->img_request;
1787                 layered = img_request && img_request_layered_test(img_request);
1788                 rbd_dev = img_request->rbd_dev;
1789         }
1790
1791         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1792                 obj_request, img_request, obj_request->result,
1793                 obj_request->xferred, obj_request->length);
1794         if (layered && obj_request->result == -ENOENT &&
1795                         obj_request->img_offset < rbd_dev->parent_overlap)
1796                 rbd_img_parent_read(obj_request);
1797         else if (img_request)
1798                 rbd_img_obj_request_read_callback(obj_request);
1799         else
1800                 obj_request_done_set(obj_request);
1801 }
1802
1803 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1804 {
1805         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1806                 obj_request->result, obj_request->length);
1807         /*
1808          * There is no such thing as a successful short write.  Set
1809          * it to our originally-requested length.
1810          */
1811         obj_request->xferred = obj_request->length;
1812         obj_request_done_set(obj_request);
1813 }
1814
1815 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1816 {
1817         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1818                 obj_request->result, obj_request->length);
1819         /*
1820          * There is no such thing as a successful short discard.  Set
1821          * it to our originally-requested length.
1822          */
1823         obj_request->xferred = obj_request->length;
1824         /* discarding a non-existent object is not a problem */
1825         if (obj_request->result == -ENOENT)
1826                 obj_request->result = 0;
1827         obj_request_done_set(obj_request);
1828 }
1829
1830 /*
1831  * For a simple stat call there's nothing to do.  We'll do more if
1832  * this is part of a write sequence for a layered image.
1833  */
1834 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1835 {
1836         dout("%s: obj %p\n", __func__, obj_request);
1837         obj_request_done_set(obj_request);
1838 }
1839
1840 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1841 {
1842         dout("%s: obj %p\n", __func__, obj_request);
1843
1844         if (obj_request_img_data_test(obj_request))
1845                 rbd_osd_copyup_callback(obj_request);
1846         else
1847                 obj_request_done_set(obj_request);
1848 }
1849
1850 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1851 {
1852         struct rbd_obj_request *obj_request = osd_req->r_priv;
1853         u16 opcode;
1854
1855         dout("%s: osd_req %p\n", __func__, osd_req);
1856         rbd_assert(osd_req == obj_request->osd_req);
1857         if (obj_request_img_data_test(obj_request)) {
1858                 rbd_assert(obj_request->img_request);
1859                 rbd_assert(obj_request->which != BAD_WHICH);
1860         } else {
1861                 rbd_assert(obj_request->which == BAD_WHICH);
1862         }
1863
1864         if (osd_req->r_result < 0)
1865                 obj_request->result = osd_req->r_result;
1866
1867         /*
1868          * We support a 64-bit length, but ultimately it has to be
1869          * passed to the block layer, which just supports a 32-bit
1870          * length field.
1871          */
1872         obj_request->xferred = osd_req->r_ops[0].outdata_len;
1873         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1874
1875         opcode = osd_req->r_ops[0].op;
1876         switch (opcode) {
1877         case CEPH_OSD_OP_READ:
1878                 rbd_osd_read_callback(obj_request);
1879                 break;
1880         case CEPH_OSD_OP_SETALLOCHINT:
1881                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1882                            osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1883                 /* fall through */
1884         case CEPH_OSD_OP_WRITE:
1885         case CEPH_OSD_OP_WRITEFULL:
1886                 rbd_osd_write_callback(obj_request);
1887                 break;
1888         case CEPH_OSD_OP_STAT:
1889                 rbd_osd_stat_callback(obj_request);
1890                 break;
1891         case CEPH_OSD_OP_DELETE:
1892         case CEPH_OSD_OP_TRUNCATE:
1893         case CEPH_OSD_OP_ZERO:
1894                 rbd_osd_discard_callback(obj_request);
1895                 break;
1896         case CEPH_OSD_OP_CALL:
1897                 rbd_osd_call_callback(obj_request);
1898                 break;
1899         default:
1900                 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1901                          obj_request->object_no, opcode);
1902                 break;
1903         }
1904
1905         if (obj_request_done_test(obj_request))
1906                 rbd_obj_request_complete(obj_request);
1907 }
1908
1909 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1910 {
1911         struct ceph_osd_request *osd_req = obj_request->osd_req;
1912
1913         rbd_assert(obj_request_img_data_test(obj_request));
1914         osd_req->r_snapid = obj_request->img_request->snap_id;
1915 }
1916
1917 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1918 {
1919         struct ceph_osd_request *osd_req = obj_request->osd_req;
1920
1921         ktime_get_real_ts(&osd_req->r_mtime);
1922         osd_req->r_data_offset = obj_request->offset;
1923 }
1924
1925 static struct ceph_osd_request *
1926 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1927                      struct ceph_snap_context *snapc,
1928                      int num_ops, unsigned int flags,
1929                      struct rbd_obj_request *obj_request)
1930 {
1931         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1932         struct ceph_osd_request *req;
1933         const char *name_format = rbd_dev->image_format == 1 ?
1934                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1935
1936         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1937         if (!req)
1938                 return NULL;
1939
1940         req->r_flags = flags;
1941         req->r_callback = rbd_osd_req_callback;
1942         req->r_priv = obj_request;
1943
1944         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1945         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1946                         rbd_dev->header.object_prefix, obj_request->object_no))
1947                 goto err_req;
1948
1949         if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1950                 goto err_req;
1951
1952         return req;
1953
1954 err_req:
1955         ceph_osdc_put_request(req);
1956         return NULL;
1957 }
1958
1959 /*
1960  * Create an osd request.  A read request has one osd op (read).
1961  * A write request has either one (watch) or two (hint+write) osd ops.
1962  * (All rbd data writes are prefixed with an allocation hint op, but
1963  * technically osd watch is a write request, hence this distinction.)
1964  */
1965 static struct ceph_osd_request *rbd_osd_req_create(
1966                                         struct rbd_device *rbd_dev,
1967                                         enum obj_operation_type op_type,
1968                                         unsigned int num_ops,
1969                                         struct rbd_obj_request *obj_request)
1970 {
1971         struct ceph_snap_context *snapc = NULL;
1972
1973         if (obj_request_img_data_test(obj_request) &&
1974                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1975                 struct rbd_img_request *img_request = obj_request->img_request;
1976                 if (op_type == OBJ_OP_WRITE) {
1977                         rbd_assert(img_request_write_test(img_request));
1978                 } else {
1979                         rbd_assert(img_request_discard_test(img_request));
1980                 }
1981                 snapc = img_request->snapc;
1982         }
1983
1984         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1985
1986         return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1987             (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1988             CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1989 }
1990
1991 /*
1992  * Create a copyup osd request based on the information in the object
1993  * request supplied.  A copyup request has two or three osd ops, a
1994  * copyup method call, potentially a hint op, and a write or truncate
1995  * or zero op.
1996  */
1997 static struct ceph_osd_request *
1998 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1999 {
2000         struct rbd_img_request *img_request;
2001         int num_osd_ops = 3;
2002
2003         rbd_assert(obj_request_img_data_test(obj_request));
2004         img_request = obj_request->img_request;
2005         rbd_assert(img_request);
2006         rbd_assert(img_request_write_test(img_request) ||
2007                         img_request_discard_test(img_request));
2008
2009         if (img_request_discard_test(img_request))
2010                 num_osd_ops = 2;
2011
2012         return __rbd_osd_req_create(img_request->rbd_dev,
2013                                     img_request->snapc, num_osd_ops,
2014                                     CEPH_OSD_FLAG_WRITE, obj_request);
2015 }
2016
2017 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2018 {
2019         ceph_osdc_put_request(osd_req);
2020 }
2021
2022 static struct rbd_obj_request *
2023 rbd_obj_request_create(enum obj_request_type type)
2024 {
2025         struct rbd_obj_request *obj_request;
2026
2027         rbd_assert(obj_request_type_valid(type));
2028
2029         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2030         if (!obj_request)
2031                 return NULL;
2032
2033         obj_request->which = BAD_WHICH;
2034         obj_request->type = type;
2035         INIT_LIST_HEAD(&obj_request->links);
2036         init_completion(&obj_request->completion);
2037         kref_init(&obj_request->kref);
2038
2039         dout("%s %p\n", __func__, obj_request);
2040         return obj_request;
2041 }
2042
2043 static void rbd_obj_request_destroy(struct kref *kref)
2044 {
2045         struct rbd_obj_request *obj_request;
2046
2047         obj_request = container_of(kref, struct rbd_obj_request, kref);
2048
2049         dout("%s: obj %p\n", __func__, obj_request);
2050
2051         rbd_assert(obj_request->img_request == NULL);
2052         rbd_assert(obj_request->which == BAD_WHICH);
2053
2054         if (obj_request->osd_req)
2055                 rbd_osd_req_destroy(obj_request->osd_req);
2056
2057         rbd_assert(obj_request_type_valid(obj_request->type));
2058         switch (obj_request->type) {
2059         case OBJ_REQUEST_NODATA:
2060                 break;          /* Nothing to do */
2061         case OBJ_REQUEST_BIO:
2062                 if (obj_request->bio_list)
2063                         bio_chain_put(obj_request->bio_list);
2064                 break;
2065         case OBJ_REQUEST_PAGES:
2066                 /* img_data requests don't own their page array */
2067                 if (obj_request->pages &&
2068                     !obj_request_img_data_test(obj_request))
2069                         ceph_release_page_vector(obj_request->pages,
2070                                                 obj_request->page_count);
2071                 break;
2072         }
2073
2074         kmem_cache_free(rbd_obj_request_cache, obj_request);
2075 }
2076
2077 /* It's OK to call this for a device with no parent */
2078
2079 static void rbd_spec_put(struct rbd_spec *spec);
2080 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2081 {
2082         rbd_dev_remove_parent(rbd_dev);
2083         rbd_spec_put(rbd_dev->parent_spec);
2084         rbd_dev->parent_spec = NULL;
2085         rbd_dev->parent_overlap = 0;
2086 }
2087
2088 /*
2089  * Parent image reference counting is used to determine when an
2090  * image's parent fields can be safely torn down--after there are no
2091  * more in-flight requests to the parent image.  When the last
2092  * reference is dropped, cleaning them up is safe.
2093  */
2094 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2095 {
2096         int counter;
2097
2098         if (!rbd_dev->parent_spec)
2099                 return;
2100
2101         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2102         if (counter > 0)
2103                 return;
2104
2105         /* Last reference; clean up parent data structures */
2106
2107         if (!counter)
2108                 rbd_dev_unparent(rbd_dev);
2109         else
2110                 rbd_warn(rbd_dev, "parent reference underflow");
2111 }
2112
2113 /*
2114  * If an image has a non-zero parent overlap, get a reference to its
2115  * parent.
2116  *
2117  * Returns true if the rbd device has a parent with a non-zero
2118  * overlap and a reference for it was successfully taken, or
2119  * false otherwise.
2120  */
2121 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2122 {
2123         int counter = 0;
2124
2125         if (!rbd_dev->parent_spec)
2126                 return false;
2127
2128         down_read(&rbd_dev->header_rwsem);
2129         if (rbd_dev->parent_overlap)
2130                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2131         up_read(&rbd_dev->header_rwsem);
2132
2133         if (counter < 0)
2134                 rbd_warn(rbd_dev, "parent reference overflow");
2135
2136         return counter > 0;
2137 }
2138
2139 /*
2140  * Caller is responsible for filling in the list of object requests
2141  * that comprises the image request, and the Linux request pointer
2142  * (if there is one).
2143  */
2144 static struct rbd_img_request *rbd_img_request_create(
2145                                         struct rbd_device *rbd_dev,
2146                                         u64 offset, u64 length,
2147                                         enum obj_operation_type op_type,
2148                                         struct ceph_snap_context *snapc)
2149 {
2150         struct rbd_img_request *img_request;
2151
2152         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2153         if (!img_request)
2154                 return NULL;
2155
2156         img_request->rq = NULL;
2157         img_request->rbd_dev = rbd_dev;
2158         img_request->offset = offset;
2159         img_request->length = length;
2160         img_request->flags = 0;
2161         if (op_type == OBJ_OP_DISCARD) {
2162                 img_request_discard_set(img_request);
2163                 img_request->snapc = snapc;
2164         } else if (op_type == OBJ_OP_WRITE) {
2165                 img_request_write_set(img_request);
2166                 img_request->snapc = snapc;
2167         } else {
2168                 img_request->snap_id = rbd_dev->spec->snap_id;
2169         }
2170         if (rbd_dev_parent_get(rbd_dev))
2171                 img_request_layered_set(img_request);
2172         spin_lock_init(&img_request->completion_lock);
2173         img_request->next_completion = 0;
2174         img_request->callback = NULL;
2175         img_request->result = 0;
2176         img_request->obj_request_count = 0;
2177         INIT_LIST_HEAD(&img_request->obj_requests);
2178         kref_init(&img_request->kref);
2179
2180         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2181                 obj_op_name(op_type), offset, length, img_request);
2182
2183         return img_request;
2184 }
2185
2186 static void rbd_img_request_destroy(struct kref *kref)
2187 {
2188         struct rbd_img_request *img_request;
2189         struct rbd_obj_request *obj_request;
2190         struct rbd_obj_request *next_obj_request;
2191
2192         img_request = container_of(kref, struct rbd_img_request, kref);
2193
2194         dout("%s: img %p\n", __func__, img_request);
2195
2196         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2197                 rbd_img_obj_request_del(img_request, obj_request);
2198         rbd_assert(img_request->obj_request_count == 0);
2199
2200         if (img_request_layered_test(img_request)) {
2201                 img_request_layered_clear(img_request);
2202                 rbd_dev_parent_put(img_request->rbd_dev);
2203         }
2204
2205         if (img_request_write_test(img_request) ||
2206                 img_request_discard_test(img_request))
2207                 ceph_put_snap_context(img_request->snapc);
2208
2209         kmem_cache_free(rbd_img_request_cache, img_request);
2210 }
2211
2212 static struct rbd_img_request *rbd_parent_request_create(
2213                                         struct rbd_obj_request *obj_request,
2214                                         u64 img_offset, u64 length)
2215 {
2216         struct rbd_img_request *parent_request;
2217         struct rbd_device *rbd_dev;
2218
2219         rbd_assert(obj_request->img_request);
2220         rbd_dev = obj_request->img_request->rbd_dev;
2221
2222         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2223                                                 length, OBJ_OP_READ, NULL);
2224         if (!parent_request)
2225                 return NULL;
2226
2227         img_request_child_set(parent_request);
2228         rbd_obj_request_get(obj_request);
2229         parent_request->obj_request = obj_request;
2230
2231         return parent_request;
2232 }
2233
2234 static void rbd_parent_request_destroy(struct kref *kref)
2235 {
2236         struct rbd_img_request *parent_request;
2237         struct rbd_obj_request *orig_request;
2238
2239         parent_request = container_of(kref, struct rbd_img_request, kref);
2240         orig_request = parent_request->obj_request;
2241
2242         parent_request->obj_request = NULL;
2243         rbd_obj_request_put(orig_request);
2244         img_request_child_clear(parent_request);
2245
2246         rbd_img_request_destroy(kref);
2247 }
2248
2249 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2250 {
2251         struct rbd_img_request *img_request;
2252         unsigned int xferred;
2253         int result;
2254         bool more;
2255
2256         rbd_assert(obj_request_img_data_test(obj_request));
2257         img_request = obj_request->img_request;
2258
2259         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2260         xferred = (unsigned int)obj_request->xferred;
2261         result = obj_request->result;
2262         if (result) {
2263                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2264                 enum obj_operation_type op_type;
2265
2266                 if (img_request_discard_test(img_request))
2267                         op_type = OBJ_OP_DISCARD;
2268                 else if (img_request_write_test(img_request))
2269                         op_type = OBJ_OP_WRITE;
2270                 else
2271                         op_type = OBJ_OP_READ;
2272
2273                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2274                         obj_op_name(op_type), obj_request->length,
2275                         obj_request->img_offset, obj_request->offset);
2276                 rbd_warn(rbd_dev, "  result %d xferred %x",
2277                         result, xferred);
2278                 if (!img_request->result)
2279                         img_request->result = result;
2280                 /*
2281                  * Need to end I/O on the entire obj_request worth of
2282                  * bytes in case of error.
2283                  */
2284                 xferred = obj_request->length;
2285         }
2286
2287         if (img_request_child_test(img_request)) {
2288                 rbd_assert(img_request->obj_request != NULL);
2289                 more = obj_request->which < img_request->obj_request_count - 1;
2290         } else {
2291                 blk_status_t status = errno_to_blk_status(result);
2292
2293                 rbd_assert(img_request->rq != NULL);
2294
2295                 more = blk_update_request(img_request->rq, status, xferred);
2296                 if (!more)
2297                         __blk_mq_end_request(img_request->rq, status);
2298         }
2299
2300         return more;
2301 }
2302
2303 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2304 {
2305         struct rbd_img_request *img_request;
2306         u32 which = obj_request->which;
2307         bool more = true;
2308
2309         rbd_assert(obj_request_img_data_test(obj_request));
2310         img_request = obj_request->img_request;
2311
2312         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2313         rbd_assert(img_request != NULL);
2314         rbd_assert(img_request->obj_request_count > 0);
2315         rbd_assert(which != BAD_WHICH);
2316         rbd_assert(which < img_request->obj_request_count);
2317
2318         spin_lock_irq(&img_request->completion_lock);
2319         if (which != img_request->next_completion)
2320                 goto out;
2321
2322         for_each_obj_request_from(img_request, obj_request) {
2323                 rbd_assert(more);
2324                 rbd_assert(which < img_request->obj_request_count);
2325
2326                 if (!obj_request_done_test(obj_request))
2327                         break;
2328                 more = rbd_img_obj_end_request(obj_request);
2329                 which++;
2330         }
2331
2332         rbd_assert(more ^ (which == img_request->obj_request_count));
2333         img_request->next_completion = which;
2334 out:
2335         spin_unlock_irq(&img_request->completion_lock);
2336         rbd_img_request_put(img_request);
2337
2338         if (!more)
2339                 rbd_img_request_complete(img_request);
2340 }
2341
2342 /*
2343  * Add individual osd ops to the given ceph_osd_request and prepare
2344  * them for submission. num_ops is the current number of
2345  * osd operations already to the object request.
2346  */
2347 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2348                                 struct ceph_osd_request *osd_request,
2349                                 enum obj_operation_type op_type,
2350                                 unsigned int num_ops)
2351 {
2352         struct rbd_img_request *img_request = obj_request->img_request;
2353         struct rbd_device *rbd_dev = img_request->rbd_dev;
2354         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2355         u64 offset = obj_request->offset;
2356         u64 length = obj_request->length;
2357         u64 img_end;
2358         u16 opcode;
2359
2360         if (op_type == OBJ_OP_DISCARD) {
2361                 if (!offset && length == object_size &&
2362                     (!img_request_layered_test(img_request) ||
2363                      !obj_request_overlaps_parent(obj_request))) {
2364                         opcode = CEPH_OSD_OP_DELETE;
2365                 } else if ((offset + length == object_size)) {
2366                         opcode = CEPH_OSD_OP_TRUNCATE;
2367                 } else {
2368                         down_read(&rbd_dev->header_rwsem);
2369                         img_end = rbd_dev->header.image_size;
2370                         up_read(&rbd_dev->header_rwsem);
2371
2372                         if (obj_request->img_offset + length == img_end)
2373                                 opcode = CEPH_OSD_OP_TRUNCATE;
2374                         else
2375                                 opcode = CEPH_OSD_OP_ZERO;
2376                 }
2377         } else if (op_type == OBJ_OP_WRITE) {
2378                 if (!offset && length == object_size)
2379                         opcode = CEPH_OSD_OP_WRITEFULL;
2380                 else
2381                         opcode = CEPH_OSD_OP_WRITE;
2382                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2383                                         object_size, object_size);
2384                 num_ops++;
2385         } else {
2386                 opcode = CEPH_OSD_OP_READ;
2387         }
2388
2389         if (opcode == CEPH_OSD_OP_DELETE)
2390                 osd_req_op_init(osd_request, num_ops, opcode, 0);
2391         else
2392                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2393                                        offset, length, 0, 0);
2394
2395         if (obj_request->type == OBJ_REQUEST_BIO)
2396                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2397                                         obj_request->bio_list, length);
2398         else if (obj_request->type == OBJ_REQUEST_PAGES)
2399                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2400                                         obj_request->pages, length,
2401                                         offset & ~PAGE_MASK, false, false);
2402
2403         /* Discards are also writes */
2404         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2405                 rbd_osd_req_format_write(obj_request);
2406         else
2407                 rbd_osd_req_format_read(obj_request);
2408 }
2409
2410 /*
2411  * Split up an image request into one or more object requests, each
2412  * to a different object.  The "type" parameter indicates whether
2413  * "data_desc" is the pointer to the head of a list of bio
2414  * structures, or the base of a page array.  In either case this
2415  * function assumes data_desc describes memory sufficient to hold
2416  * all data described by the image request.
2417  */
2418 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2419                                         enum obj_request_type type,
2420                                         void *data_desc)
2421 {
2422         struct rbd_device *rbd_dev = img_request->rbd_dev;
2423         struct rbd_obj_request *obj_request = NULL;
2424         struct rbd_obj_request *next_obj_request;
2425         struct bio *bio_list = NULL;
2426         unsigned int bio_offset = 0;
2427         struct page **pages = NULL;
2428         enum obj_operation_type op_type;
2429         u64 img_offset;
2430         u64 resid;
2431
2432         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2433                 (int)type, data_desc);
2434
2435         img_offset = img_request->offset;
2436         resid = img_request->length;
2437         rbd_assert(resid > 0);
2438         op_type = rbd_img_request_op_type(img_request);
2439
2440         if (type == OBJ_REQUEST_BIO) {
2441                 bio_list = data_desc;
2442                 rbd_assert(img_offset ==
2443                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2444         } else if (type == OBJ_REQUEST_PAGES) {
2445                 pages = data_desc;
2446         }
2447
2448         while (resid) {
2449                 struct ceph_osd_request *osd_req;
2450                 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2451                 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2452                 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2453
2454                 obj_request = rbd_obj_request_create(type);
2455                 if (!obj_request)
2456                         goto out_unwind;
2457
2458                 obj_request->object_no = object_no;
2459                 obj_request->offset = offset;
2460                 obj_request->length = length;
2461
2462                 /*
2463                  * set obj_request->img_request before creating the
2464                  * osd_request so that it gets the right snapc
2465                  */
2466                 rbd_img_obj_request_add(img_request, obj_request);
2467
2468                 if (type == OBJ_REQUEST_BIO) {
2469                         unsigned int clone_size;
2470
2471                         rbd_assert(length <= (u64)UINT_MAX);
2472                         clone_size = (unsigned int)length;
2473                         obj_request->bio_list =
2474                                         bio_chain_clone_range(&bio_list,
2475                                                                 &bio_offset,
2476                                                                 clone_size,
2477                                                                 GFP_NOIO);
2478                         if (!obj_request->bio_list)
2479                                 goto out_unwind;
2480                 } else if (type == OBJ_REQUEST_PAGES) {
2481                         unsigned int page_count;
2482
2483                         obj_request->pages = pages;
2484                         page_count = (u32)calc_pages_for(offset, length);
2485                         obj_request->page_count = page_count;
2486                         if ((offset + length) & ~PAGE_MASK)
2487                                 page_count--;   /* more on last page */
2488                         pages += page_count;
2489                 }
2490
2491                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2492                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2493                                         obj_request);
2494                 if (!osd_req)
2495                         goto out_unwind;
2496
2497                 obj_request->osd_req = osd_req;
2498                 obj_request->callback = rbd_img_obj_callback;
2499                 obj_request->img_offset = img_offset;
2500
2501                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2502
2503                 img_offset += length;
2504                 resid -= length;
2505         }
2506
2507         return 0;
2508
2509 out_unwind:
2510         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2511                 rbd_img_obj_request_del(img_request, obj_request);
2512
2513         return -ENOMEM;
2514 }
2515
2516 static void
2517 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2518 {
2519         struct rbd_img_request *img_request;
2520         struct rbd_device *rbd_dev;
2521         struct page **pages;
2522         u32 page_count;
2523
2524         dout("%s: obj %p\n", __func__, obj_request);
2525
2526         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2527                 obj_request->type == OBJ_REQUEST_NODATA);
2528         rbd_assert(obj_request_img_data_test(obj_request));
2529         img_request = obj_request->img_request;
2530         rbd_assert(img_request);
2531
2532         rbd_dev = img_request->rbd_dev;
2533         rbd_assert(rbd_dev);
2534
2535         pages = obj_request->copyup_pages;
2536         rbd_assert(pages != NULL);
2537         obj_request->copyup_pages = NULL;
2538         page_count = obj_request->copyup_page_count;
2539         rbd_assert(page_count);
2540         obj_request->copyup_page_count = 0;
2541         ceph_release_page_vector(pages, page_count);
2542
2543         /*
2544          * We want the transfer count to reflect the size of the
2545          * original write request.  There is no such thing as a
2546          * successful short write, so if the request was successful
2547          * we can just set it to the originally-requested length.
2548          */
2549         if (!obj_request->result)
2550                 obj_request->xferred = obj_request->length;
2551
2552         obj_request_done_set(obj_request);
2553 }
2554
2555 static void
2556 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2557 {
2558         struct rbd_obj_request *orig_request;
2559         struct ceph_osd_request *osd_req;
2560         struct rbd_device *rbd_dev;
2561         struct page **pages;
2562         enum obj_operation_type op_type;
2563         u32 page_count;
2564         int img_result;
2565         u64 parent_length;
2566
2567         rbd_assert(img_request_child_test(img_request));
2568
2569         /* First get what we need from the image request */
2570
2571         pages = img_request->copyup_pages;
2572         rbd_assert(pages != NULL);
2573         img_request->copyup_pages = NULL;
2574         page_count = img_request->copyup_page_count;
2575         rbd_assert(page_count);
2576         img_request->copyup_page_count = 0;
2577
2578         orig_request = img_request->obj_request;
2579         rbd_assert(orig_request != NULL);
2580         rbd_assert(obj_request_type_valid(orig_request->type));
2581         img_result = img_request->result;
2582         parent_length = img_request->length;
2583         rbd_assert(img_result || parent_length == img_request->xferred);
2584         rbd_img_request_put(img_request);
2585
2586         rbd_assert(orig_request->img_request);
2587         rbd_dev = orig_request->img_request->rbd_dev;
2588         rbd_assert(rbd_dev);
2589
2590         /*
2591          * If the overlap has become 0 (most likely because the
2592          * image has been flattened) we need to free the pages
2593          * and re-submit the original write request.
2594          */
2595         if (!rbd_dev->parent_overlap) {
2596                 ceph_release_page_vector(pages, page_count);
2597                 rbd_obj_request_submit(orig_request);
2598                 return;
2599         }
2600
2601         if (img_result)
2602                 goto out_err;
2603
2604         /*
2605          * The original osd request is of no use to use any more.
2606          * We need a new one that can hold the three ops in a copyup
2607          * request.  Allocate the new copyup osd request for the
2608          * original request, and release the old one.
2609          */
2610         img_result = -ENOMEM;
2611         osd_req = rbd_osd_req_create_copyup(orig_request);
2612         if (!osd_req)
2613                 goto out_err;
2614         rbd_osd_req_destroy(orig_request->osd_req);
2615         orig_request->osd_req = osd_req;
2616         orig_request->copyup_pages = pages;
2617         orig_request->copyup_page_count = page_count;
2618
2619         /* Initialize the copyup op */
2620
2621         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2622         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2623                                                 false, false);
2624
2625         /* Add the other op(s) */
2626
2627         op_type = rbd_img_request_op_type(orig_request->img_request);
2628         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2629
2630         /* All set, send it off. */
2631
2632         rbd_obj_request_submit(orig_request);
2633         return;
2634
2635 out_err:
2636         ceph_release_page_vector(pages, page_count);
2637         rbd_obj_request_error(orig_request, img_result);
2638 }
2639
2640 /*
2641  * Read from the parent image the range of data that covers the
2642  * entire target of the given object request.  This is used for
2643  * satisfying a layered image write request when the target of an
2644  * object request from the image request does not exist.
2645  *
2646  * A page array big enough to hold the returned data is allocated
2647  * and supplied to rbd_img_request_fill() as the "data descriptor."
2648  * When the read completes, this page array will be transferred to
2649  * the original object request for the copyup operation.
2650  *
2651  * If an error occurs, it is recorded as the result of the original
2652  * object request in rbd_img_obj_exists_callback().
2653  */
2654 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2655 {
2656         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2657         struct rbd_img_request *parent_request = NULL;
2658         u64 img_offset;
2659         u64 length;
2660         struct page **pages = NULL;
2661         u32 page_count;
2662         int result;
2663
2664         rbd_assert(rbd_dev->parent != NULL);
2665
2666         /*
2667          * Determine the byte range covered by the object in the
2668          * child image to which the original request was to be sent.
2669          */
2670         img_offset = obj_request->img_offset - obj_request->offset;
2671         length = rbd_obj_bytes(&rbd_dev->header);
2672
2673         /*
2674          * There is no defined parent data beyond the parent
2675          * overlap, so limit what we read at that boundary if
2676          * necessary.
2677          */
2678         if (img_offset + length > rbd_dev->parent_overlap) {
2679                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2680                 length = rbd_dev->parent_overlap - img_offset;
2681         }
2682
2683         /*
2684          * Allocate a page array big enough to receive the data read
2685          * from the parent.
2686          */
2687         page_count = (u32)calc_pages_for(0, length);
2688         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2689         if (IS_ERR(pages)) {
2690                 result = PTR_ERR(pages);
2691                 pages = NULL;
2692                 goto out_err;
2693         }
2694
2695         result = -ENOMEM;
2696         parent_request = rbd_parent_request_create(obj_request,
2697                                                 img_offset, length);
2698         if (!parent_request)
2699                 goto out_err;
2700
2701         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2702         if (result)
2703                 goto out_err;
2704
2705         parent_request->copyup_pages = pages;
2706         parent_request->copyup_page_count = page_count;
2707         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2708
2709         result = rbd_img_request_submit(parent_request);
2710         if (!result)
2711                 return 0;
2712
2713         parent_request->copyup_pages = NULL;
2714         parent_request->copyup_page_count = 0;
2715         parent_request->obj_request = NULL;
2716         rbd_obj_request_put(obj_request);
2717 out_err:
2718         if (pages)
2719                 ceph_release_page_vector(pages, page_count);
2720         if (parent_request)
2721                 rbd_img_request_put(parent_request);
2722         return result;
2723 }
2724
2725 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2726 {
2727         struct rbd_obj_request *orig_request;
2728         struct rbd_device *rbd_dev;
2729         int result;
2730
2731         rbd_assert(!obj_request_img_data_test(obj_request));
2732
2733         /*
2734          * All we need from the object request is the original
2735          * request and the result of the STAT op.  Grab those, then
2736          * we're done with the request.
2737          */
2738         orig_request = obj_request->obj_request;
2739         obj_request->obj_request = NULL;
2740         rbd_obj_request_put(orig_request);
2741         rbd_assert(orig_request);
2742         rbd_assert(orig_request->img_request);
2743
2744         result = obj_request->result;
2745         obj_request->result = 0;
2746
2747         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2748                 obj_request, orig_request, result,
2749                 obj_request->xferred, obj_request->length);
2750         rbd_obj_request_put(obj_request);
2751
2752         /*
2753          * If the overlap has become 0 (most likely because the
2754          * image has been flattened) we need to re-submit the
2755          * original request.
2756          */
2757         rbd_dev = orig_request->img_request->rbd_dev;
2758         if (!rbd_dev->parent_overlap) {
2759                 rbd_obj_request_submit(orig_request);
2760                 return;
2761         }
2762
2763         /*
2764          * Our only purpose here is to determine whether the object
2765          * exists, and we don't want to treat the non-existence as
2766          * an error.  If something else comes back, transfer the
2767          * error to the original request and complete it now.
2768          */
2769         if (!result) {
2770                 obj_request_existence_set(orig_request, true);
2771         } else if (result == -ENOENT) {
2772                 obj_request_existence_set(orig_request, false);
2773         } else {
2774                 goto fail_orig_request;
2775         }
2776
2777         /*
2778          * Resubmit the original request now that we have recorded
2779          * whether the target object exists.
2780          */
2781         result = rbd_img_obj_request_submit(orig_request);
2782         if (result)
2783                 goto fail_orig_request;
2784
2785         return;
2786
2787 fail_orig_request:
2788         rbd_obj_request_error(orig_request, result);
2789 }
2790
2791 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2792 {
2793         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2794         struct rbd_obj_request *stat_request;
2795         struct page **pages;
2796         u32 page_count;
2797         size_t size;
2798         int ret;
2799
2800         stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2801         if (!stat_request)
2802                 return -ENOMEM;
2803
2804         stat_request->object_no = obj_request->object_no;
2805
2806         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2807                                                    stat_request);
2808         if (!stat_request->osd_req) {
2809                 ret = -ENOMEM;
2810                 goto fail_stat_request;
2811         }
2812
2813         /*
2814          * The response data for a STAT call consists of:
2815          *     le64 length;
2816          *     struct {
2817          *         le32 tv_sec;
2818          *         le32 tv_nsec;
2819          *     } mtime;
2820          */
2821         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2822         page_count = (u32)calc_pages_for(0, size);
2823         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2824         if (IS_ERR(pages)) {
2825                 ret = PTR_ERR(pages);
2826                 goto fail_stat_request;
2827         }
2828
2829         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2830         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2831                                      false, false);
2832
2833         rbd_obj_request_get(obj_request);
2834         stat_request->obj_request = obj_request;
2835         stat_request->pages = pages;
2836         stat_request->page_count = page_count;
2837         stat_request->callback = rbd_img_obj_exists_callback;
2838
2839         rbd_obj_request_submit(stat_request);
2840         return 0;
2841
2842 fail_stat_request:
2843         rbd_obj_request_put(stat_request);
2844         return ret;
2845 }
2846
2847 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2848 {
2849         struct rbd_img_request *img_request = obj_request->img_request;
2850         struct rbd_device *rbd_dev = img_request->rbd_dev;
2851
2852         /* Reads */
2853         if (!img_request_write_test(img_request) &&
2854             !img_request_discard_test(img_request))
2855                 return true;
2856
2857         /* Non-layered writes */
2858         if (!img_request_layered_test(img_request))
2859                 return true;
2860
2861         /*
2862          * Layered writes outside of the parent overlap range don't
2863          * share any data with the parent.
2864          */
2865         if (!obj_request_overlaps_parent(obj_request))
2866                 return true;
2867
2868         /*
2869          * Entire-object layered writes - we will overwrite whatever
2870          * parent data there is anyway.
2871          */
2872         if (!obj_request->offset &&
2873             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2874                 return true;
2875
2876         /*
2877          * If the object is known to already exist, its parent data has
2878          * already been copied.
2879          */
2880         if (obj_request_known_test(obj_request) &&
2881             obj_request_exists_test(obj_request))
2882                 return true;
2883
2884         return false;
2885 }
2886
2887 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2888 {
2889         rbd_assert(obj_request_img_data_test(obj_request));
2890         rbd_assert(obj_request_type_valid(obj_request->type));
2891         rbd_assert(obj_request->img_request);
2892
2893         if (img_obj_request_simple(obj_request)) {
2894                 rbd_obj_request_submit(obj_request);
2895                 return 0;
2896         }
2897
2898         /*
2899          * It's a layered write.  The target object might exist but
2900          * we may not know that yet.  If we know it doesn't exist,
2901          * start by reading the data for the full target object from
2902          * the parent so we can use it for a copyup to the target.
2903          */
2904         if (obj_request_known_test(obj_request))
2905                 return rbd_img_obj_parent_read_full(obj_request);
2906
2907         /* We don't know whether the target exists.  Go find out. */
2908
2909         return rbd_img_obj_exists_submit(obj_request);
2910 }
2911
2912 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2913 {
2914         struct rbd_obj_request *obj_request;
2915         struct rbd_obj_request *next_obj_request;
2916         int ret = 0;
2917
2918         dout("%s: img %p\n", __func__, img_request);
2919
2920         rbd_img_request_get(img_request);
2921         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2922                 ret = rbd_img_obj_request_submit(obj_request);
2923                 if (ret)
2924                         goto out_put_ireq;
2925         }
2926
2927 out_put_ireq:
2928         rbd_img_request_put(img_request);
2929         return ret;
2930 }
2931
2932 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2933 {
2934         struct rbd_obj_request *obj_request;
2935         struct rbd_device *rbd_dev;
2936         u64 obj_end;
2937         u64 img_xferred;
2938         int img_result;
2939
2940         rbd_assert(img_request_child_test(img_request));
2941
2942         /* First get what we need from the image request and release it */
2943
2944         obj_request = img_request->obj_request;
2945         img_xferred = img_request->xferred;
2946         img_result = img_request->result;
2947         rbd_img_request_put(img_request);
2948
2949         /*
2950          * If the overlap has become 0 (most likely because the
2951          * image has been flattened) we need to re-submit the
2952          * original request.
2953          */
2954         rbd_assert(obj_request);
2955         rbd_assert(obj_request->img_request);
2956         rbd_dev = obj_request->img_request->rbd_dev;
2957         if (!rbd_dev->parent_overlap) {
2958                 rbd_obj_request_submit(obj_request);
2959                 return;
2960         }
2961
2962         obj_request->result = img_result;
2963         if (obj_request->result)
2964                 goto out;
2965
2966         /*
2967          * We need to zero anything beyond the parent overlap
2968          * boundary.  Since rbd_img_obj_request_read_callback()
2969          * will zero anything beyond the end of a short read, an
2970          * easy way to do this is to pretend the data from the
2971          * parent came up short--ending at the overlap boundary.
2972          */
2973         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2974         obj_end = obj_request->img_offset + obj_request->length;
2975         if (obj_end > rbd_dev->parent_overlap) {
2976                 u64 xferred = 0;
2977
2978                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2979                         xferred = rbd_dev->parent_overlap -
2980                                         obj_request->img_offset;
2981
2982                 obj_request->xferred = min(img_xferred, xferred);
2983         } else {
2984                 obj_request->xferred = img_xferred;
2985         }
2986 out:
2987         rbd_img_obj_request_read_callback(obj_request);
2988         rbd_obj_request_complete(obj_request);
2989 }
2990
2991 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2992 {
2993         struct rbd_img_request *img_request;
2994         int result;
2995
2996         rbd_assert(obj_request_img_data_test(obj_request));
2997         rbd_assert(obj_request->img_request != NULL);
2998         rbd_assert(obj_request->result == (s32) -ENOENT);
2999         rbd_assert(obj_request_type_valid(obj_request->type));
3000
3001         /* rbd_read_finish(obj_request, obj_request->length); */
3002         img_request = rbd_parent_request_create(obj_request,
3003                                                 obj_request->img_offset,
3004                                                 obj_request->length);
3005         result = -ENOMEM;
3006         if (!img_request)
3007                 goto out_err;
3008
3009         if (obj_request->type == OBJ_REQUEST_BIO)
3010                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3011                                                 obj_request->bio_list);
3012         else
3013                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3014                                                 obj_request->pages);
3015         if (result)
3016                 goto out_err;
3017
3018         img_request->callback = rbd_img_parent_read_callback;
3019         result = rbd_img_request_submit(img_request);
3020         if (result)
3021                 goto out_err;
3022
3023         return;
3024 out_err:
3025         if (img_request)
3026                 rbd_img_request_put(img_request);
3027         obj_request->result = result;
3028         obj_request->xferred = 0;
3029         obj_request_done_set(obj_request);
3030 }
3031
3032 static const struct rbd_client_id rbd_empty_cid;
3033
3034 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3035                           const struct rbd_client_id *rhs)
3036 {
3037         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3038 }
3039
3040 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3041 {
3042         struct rbd_client_id cid;
3043
3044         mutex_lock(&rbd_dev->watch_mutex);
3045         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3046         cid.handle = rbd_dev->watch_cookie;
3047         mutex_unlock(&rbd_dev->watch_mutex);
3048         return cid;
3049 }
3050
3051 /*
3052  * lock_rwsem must be held for write
3053  */
3054 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3055                               const struct rbd_client_id *cid)
3056 {
3057         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3058              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3059              cid->gid, cid->handle);
3060         rbd_dev->owner_cid = *cid; /* struct */
3061 }
3062
3063 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3064 {
3065         mutex_lock(&rbd_dev->watch_mutex);
3066         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3067         mutex_unlock(&rbd_dev->watch_mutex);
3068 }
3069
3070 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3071 {
3072         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3073
3074         strcpy(rbd_dev->lock_cookie, cookie);
3075         rbd_set_owner_cid(rbd_dev, &cid);
3076         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3077 }
3078
3079 /*
3080  * lock_rwsem must be held for write
3081  */
3082 static int rbd_lock(struct rbd_device *rbd_dev)
3083 {
3084         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3085         char cookie[32];
3086         int ret;
3087
3088         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3089                 rbd_dev->lock_cookie[0] != '\0');
3090
3091         format_lock_cookie(rbd_dev, cookie);
3092         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3093                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3094                             RBD_LOCK_TAG, "", 0);
3095         if (ret)
3096                 return ret;
3097
3098         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3099         __rbd_lock(rbd_dev, cookie);
3100         return 0;
3101 }
3102
3103 /*
3104  * lock_rwsem must be held for write
3105  */
3106 static void rbd_unlock(struct rbd_device *rbd_dev)
3107 {
3108         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3109         int ret;
3110
3111         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3112                 rbd_dev->lock_cookie[0] == '\0');
3113
3114         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3115                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3116         if (ret && ret != -ENOENT)
3117                 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3118
3119         /* treat errors as the image is unlocked */
3120         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3121         rbd_dev->lock_cookie[0] = '\0';
3122         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3123         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3124 }
3125
3126 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3127                                 enum rbd_notify_op notify_op,
3128                                 struct page ***preply_pages,
3129                                 size_t *preply_len)
3130 {
3131         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3132         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3133         int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3134         char buf[buf_size];
3135         void *p = buf;
3136
3137         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3138
3139         /* encode *LockPayload NotifyMessage (op + ClientId) */
3140         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3141         ceph_encode_32(&p, notify_op);
3142         ceph_encode_64(&p, cid.gid);
3143         ceph_encode_64(&p, cid.handle);
3144
3145         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3146                                 &rbd_dev->header_oloc, buf, buf_size,
3147                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3148 }
3149
3150 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3151                                enum rbd_notify_op notify_op)
3152 {
3153         struct page **reply_pages;
3154         size_t reply_len;
3155
3156         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3157         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3158 }
3159
3160 static void rbd_notify_acquired_lock(struct work_struct *work)
3161 {
3162         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3163                                                   acquired_lock_work);
3164
3165         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3166 }
3167
3168 static void rbd_notify_released_lock(struct work_struct *work)
3169 {
3170         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3171                                                   released_lock_work);
3172
3173         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3174 }
3175
3176 static int rbd_request_lock(struct rbd_device *rbd_dev)
3177 {
3178         struct page **reply_pages;
3179         size_t reply_len;
3180         bool lock_owner_responded = false;
3181         int ret;
3182
3183         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3184
3185         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3186                                    &reply_pages, &reply_len);
3187         if (ret && ret != -ETIMEDOUT) {
3188                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3189                 goto out;
3190         }
3191
3192         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3193                 void *p = page_address(reply_pages[0]);
3194                 void *const end = p + reply_len;
3195                 u32 n;
3196
3197                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3198                 while (n--) {
3199                         u8 struct_v;
3200                         u32 len;
3201
3202                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3203                         p += 8 + 8; /* skip gid and cookie */
3204
3205                         ceph_decode_32_safe(&p, end, len, e_inval);
3206                         if (!len)
3207                                 continue;
3208
3209                         if (lock_owner_responded) {
3210                                 rbd_warn(rbd_dev,
3211                                          "duplicate lock owners detected");
3212                                 ret = -EIO;
3213                                 goto out;
3214                         }
3215
3216                         lock_owner_responded = true;
3217                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3218                                                   &struct_v, &len);
3219                         if (ret) {
3220                                 rbd_warn(rbd_dev,
3221                                          "failed to decode ResponseMessage: %d",
3222                                          ret);
3223                                 goto e_inval;
3224                         }
3225
3226                         ret = ceph_decode_32(&p);
3227                 }
3228         }
3229
3230         if (!lock_owner_responded) {
3231                 rbd_warn(rbd_dev, "no lock owners detected");
3232                 ret = -ETIMEDOUT;
3233         }
3234
3235 out:
3236         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3237         return ret;
3238
3239 e_inval:
3240         ret = -EINVAL;
3241         goto out;
3242 }
3243
3244 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3245 {
3246         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3247
3248         cancel_delayed_work(&rbd_dev->lock_dwork);
3249         if (wake_all)
3250                 wake_up_all(&rbd_dev->lock_waitq);
3251         else
3252                 wake_up(&rbd_dev->lock_waitq);
3253 }
3254
3255 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3256                                struct ceph_locker **lockers, u32 *num_lockers)
3257 {
3258         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3259         u8 lock_type;
3260         char *lock_tag;
3261         int ret;
3262
3263         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3264
3265         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3266                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3267                                  &lock_type, &lock_tag, lockers, num_lockers);
3268         if (ret)
3269                 return ret;
3270
3271         if (*num_lockers == 0) {
3272                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3273                 goto out;
3274         }
3275
3276         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3277                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3278                          lock_tag);
3279                 ret = -EBUSY;
3280                 goto out;
3281         }
3282
3283         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3284                 rbd_warn(rbd_dev, "shared lock type detected");
3285                 ret = -EBUSY;
3286                 goto out;
3287         }
3288
3289         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3290                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3291                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3292                          (*lockers)[0].id.cookie);
3293                 ret = -EBUSY;
3294                 goto out;
3295         }
3296
3297 out:
3298         kfree(lock_tag);
3299         return ret;
3300 }
3301
3302 static int find_watcher(struct rbd_device *rbd_dev,
3303                         const struct ceph_locker *locker)
3304 {
3305         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3306         struct ceph_watch_item *watchers;
3307         u32 num_watchers;
3308         u64 cookie;
3309         int i;
3310         int ret;
3311
3312         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3313                                       &rbd_dev->header_oloc, &watchers,
3314                                       &num_watchers);
3315         if (ret)
3316                 return ret;
3317
3318         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3319         for (i = 0; i < num_watchers; i++) {
3320                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3321                             sizeof(locker->info.addr)) &&
3322                     watchers[i].cookie == cookie) {
3323                         struct rbd_client_id cid = {
3324                                 .gid = le64_to_cpu(watchers[i].name.num),
3325                                 .handle = cookie,
3326                         };
3327
3328                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3329                              rbd_dev, cid.gid, cid.handle);
3330                         rbd_set_owner_cid(rbd_dev, &cid);
3331                         ret = 1;
3332                         goto out;
3333                 }
3334         }
3335
3336         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3337         ret = 0;
3338 out:
3339         kfree(watchers);
3340         return ret;
3341 }
3342
3343 /*
3344  * lock_rwsem must be held for write
3345  */
3346 static int rbd_try_lock(struct rbd_device *rbd_dev)
3347 {
3348         struct ceph_client *client = rbd_dev->rbd_client->client;
3349         struct ceph_locker *lockers;
3350         u32 num_lockers;
3351         int ret;
3352
3353         for (;;) {
3354                 ret = rbd_lock(rbd_dev);
3355                 if (ret != -EBUSY)
3356                         return ret;
3357
3358                 /* determine if the current lock holder is still alive */
3359                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3360                 if (ret)
3361                         return ret;
3362
3363                 if (num_lockers == 0)
3364                         goto again;
3365
3366                 ret = find_watcher(rbd_dev, lockers);
3367                 if (ret) {
3368                         if (ret > 0)
3369                                 ret = 0; /* have to request lock */
3370                         goto out;
3371                 }
3372
3373                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3374                          ENTITY_NAME(lockers[0].id.name));
3375
3376                 ret = ceph_monc_blacklist_add(&client->monc,
3377                                               &lockers[0].info.addr);
3378                 if (ret) {
3379                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3380                                  ENTITY_NAME(lockers[0].id.name), ret);
3381                         goto out;
3382                 }
3383
3384                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3385                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3386                                           lockers[0].id.cookie,
3387                                           &lockers[0].id.name);
3388                 if (ret && ret != -ENOENT)
3389                         goto out;
3390
3391 again:
3392                 ceph_free_lockers(lockers, num_lockers);
3393         }
3394
3395 out:
3396         ceph_free_lockers(lockers, num_lockers);
3397         return ret;
3398 }
3399
3400 /*
3401  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3402  */
3403 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3404                                                 int *pret)
3405 {
3406         enum rbd_lock_state lock_state;
3407
3408         down_read(&rbd_dev->lock_rwsem);
3409         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3410              rbd_dev->lock_state);
3411         if (__rbd_is_lock_owner(rbd_dev)) {
3412                 lock_state = rbd_dev->lock_state;
3413                 up_read(&rbd_dev->lock_rwsem);
3414                 return lock_state;
3415         }
3416
3417         up_read(&rbd_dev->lock_rwsem);
3418         down_write(&rbd_dev->lock_rwsem);
3419         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3420              rbd_dev->lock_state);
3421         if (!__rbd_is_lock_owner(rbd_dev)) {
3422                 *pret = rbd_try_lock(rbd_dev);
3423                 if (*pret)
3424                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3425         }
3426
3427         lock_state = rbd_dev->lock_state;
3428         up_write(&rbd_dev->lock_rwsem);
3429         return lock_state;
3430 }
3431
3432 static void rbd_acquire_lock(struct work_struct *work)
3433 {
3434         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3435                                             struct rbd_device, lock_dwork);
3436         enum rbd_lock_state lock_state;
3437         int ret = 0;
3438
3439         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3440 again:
3441         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3442         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3443                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3444                         wake_requests(rbd_dev, true);
3445                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3446                      rbd_dev, lock_state, ret);
3447                 return;
3448         }
3449
3450         ret = rbd_request_lock(rbd_dev);
3451         if (ret == -ETIMEDOUT) {
3452                 goto again; /* treat this as a dead client */
3453         } else if (ret == -EROFS) {
3454                 rbd_warn(rbd_dev, "peer will not release lock");
3455                 /*
3456                  * If this is rbd_add_acquire_lock(), we want to fail
3457                  * immediately -- reuse BLACKLISTED flag.  Otherwise we
3458                  * want to block.
3459                  */
3460                 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3461                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3462                         /* wake "rbd map --exclusive" process */
3463                         wake_requests(rbd_dev, false);
3464                 }
3465         } else if (ret < 0) {
3466                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3467                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3468                                  RBD_RETRY_DELAY);
3469         } else {
3470                 /*
3471                  * lock owner acked, but resend if we don't see them
3472                  * release the lock
3473                  */
3474                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3475                      rbd_dev);
3476                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3477                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3478         }
3479 }
3480
3481 /*
3482  * lock_rwsem must be held for write
3483  */
3484 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3485 {
3486         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3487              rbd_dev->lock_state);
3488         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3489                 return false;
3490
3491         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3492         downgrade_write(&rbd_dev->lock_rwsem);
3493         /*
3494          * Ensure that all in-flight IO is flushed.
3495          *
3496          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3497          * may be shared with other devices.
3498          */
3499         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3500         up_read(&rbd_dev->lock_rwsem);
3501
3502         down_write(&rbd_dev->lock_rwsem);
3503         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3504              rbd_dev->lock_state);
3505         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3506                 return false;
3507
3508         rbd_unlock(rbd_dev);
3509         /*
3510          * Give others a chance to grab the lock - we would re-acquire
3511          * almost immediately if we got new IO during ceph_osdc_sync()
3512          * otherwise.  We need to ack our own notifications, so this
3513          * lock_dwork will be requeued from rbd_wait_state_locked()
3514          * after wake_requests() in rbd_handle_released_lock().
3515          */
3516         cancel_delayed_work(&rbd_dev->lock_dwork);
3517         return true;
3518 }
3519
3520 static void rbd_release_lock_work(struct work_struct *work)
3521 {
3522         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3523                                                   unlock_work);
3524
3525         down_write(&rbd_dev->lock_rwsem);
3526         rbd_release_lock(rbd_dev);
3527         up_write(&rbd_dev->lock_rwsem);
3528 }
3529
3530 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3531                                      void **p)
3532 {
3533         struct rbd_client_id cid = { 0 };
3534
3535         if (struct_v >= 2) {
3536                 cid.gid = ceph_decode_64(p);
3537                 cid.handle = ceph_decode_64(p);
3538         }
3539
3540         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3541              cid.handle);
3542         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3543                 down_write(&rbd_dev->lock_rwsem);
3544                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3545                         /*
3546                          * we already know that the remote client is
3547                          * the owner
3548                          */
3549                         up_write(&rbd_dev->lock_rwsem);
3550                         return;
3551                 }
3552
3553                 rbd_set_owner_cid(rbd_dev, &cid);
3554                 downgrade_write(&rbd_dev->lock_rwsem);
3555         } else {
3556                 down_read(&rbd_dev->lock_rwsem);
3557         }
3558
3559         if (!__rbd_is_lock_owner(rbd_dev))
3560                 wake_requests(rbd_dev, false);
3561         up_read(&rbd_dev->lock_rwsem);
3562 }
3563
3564 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3565                                      void **p)
3566 {
3567         struct rbd_client_id cid = { 0 };
3568
3569         if (struct_v >= 2) {
3570                 cid.gid = ceph_decode_64(p);
3571                 cid.handle = ceph_decode_64(p);
3572         }
3573
3574         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3575              cid.handle);
3576         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3577                 down_write(&rbd_dev->lock_rwsem);
3578                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3579                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3580                              __func__, rbd_dev, cid.gid, cid.handle,
3581                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3582                         up_write(&rbd_dev->lock_rwsem);
3583                         return;
3584                 }
3585
3586                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3587                 downgrade_write(&rbd_dev->lock_rwsem);
3588         } else {
3589                 down_read(&rbd_dev->lock_rwsem);
3590         }
3591
3592         if (!__rbd_is_lock_owner(rbd_dev))
3593                 wake_requests(rbd_dev, false);
3594         up_read(&rbd_dev->lock_rwsem);
3595 }
3596
3597 /*
3598  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3599  * ResponseMessage is needed.
3600  */
3601 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3602                                    void **p)
3603 {
3604         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3605         struct rbd_client_id cid = { 0 };
3606         int result = 1;
3607
3608         if (struct_v >= 2) {
3609                 cid.gid = ceph_decode_64(p);
3610                 cid.handle = ceph_decode_64(p);
3611         }
3612
3613         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3614              cid.handle);
3615         if (rbd_cid_equal(&cid, &my_cid))
3616                 return result;
3617
3618         down_read(&rbd_dev->lock_rwsem);
3619         if (__rbd_is_lock_owner(rbd_dev)) {
3620                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3621                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3622                         goto out_unlock;
3623
3624                 /*
3625                  * encode ResponseMessage(0) so the peer can detect
3626                  * a missing owner
3627                  */
3628                 result = 0;
3629
3630                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3631                         if (!rbd_dev->opts->exclusive) {
3632                                 dout("%s rbd_dev %p queueing unlock_work\n",
3633                                      __func__, rbd_dev);
3634                                 queue_work(rbd_dev->task_wq,
3635                                            &rbd_dev->unlock_work);
3636                         } else {
3637                                 /* refuse to release the lock */
3638                                 result = -EROFS;
3639                         }
3640                 }
3641         }
3642
3643 out_unlock:
3644         up_read(&rbd_dev->lock_rwsem);
3645         return result;
3646 }
3647
3648 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3649                                      u64 notify_id, u64 cookie, s32 *result)
3650 {
3651         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3652         int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3653         char buf[buf_size];
3654         int ret;
3655
3656         if (result) {
3657                 void *p = buf;
3658
3659                 /* encode ResponseMessage */
3660                 ceph_start_encoding(&p, 1, 1,
3661                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3662                 ceph_encode_32(&p, *result);
3663         } else {
3664                 buf_size = 0;
3665         }
3666
3667         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3668                                    &rbd_dev->header_oloc, notify_id, cookie,
3669                                    buf, buf_size);
3670         if (ret)
3671                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3672 }
3673
3674 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3675                                    u64 cookie)
3676 {
3677         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3678         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3679 }
3680
3681 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3682                                           u64 notify_id, u64 cookie, s32 result)
3683 {
3684         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3685         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3686 }
3687
3688 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3689                          u64 notifier_id, void *data, size_t data_len)
3690 {
3691         struct rbd_device *rbd_dev = arg;
3692         void *p = data;
3693         void *const end = p + data_len;
3694         u8 struct_v = 0;
3695         u32 len;
3696         u32 notify_op;
3697         int ret;
3698
3699         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3700              __func__, rbd_dev, cookie, notify_id, data_len);
3701         if (data_len) {
3702                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3703                                           &struct_v, &len);
3704                 if (ret) {
3705                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3706                                  ret);
3707                         return;
3708                 }
3709
3710                 notify_op = ceph_decode_32(&p);
3711         } else {
3712                 /* legacy notification for header updates */
3713                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3714                 len = 0;
3715         }
3716
3717         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3718         switch (notify_op) {
3719         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3720                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3721                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3722                 break;
3723         case RBD_NOTIFY_OP_RELEASED_LOCK:
3724                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3725                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3726                 break;
3727         case RBD_NOTIFY_OP_REQUEST_LOCK:
3728                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3729                 if (ret <= 0)
3730                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3731                                                       cookie, ret);
3732                 else
3733                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3734                 break;
3735         case RBD_NOTIFY_OP_HEADER_UPDATE:
3736                 ret = rbd_dev_refresh(rbd_dev);
3737                 if (ret)
3738                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3739
3740                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3741                 break;
3742         default:
3743                 if (rbd_is_lock_owner(rbd_dev))
3744                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3745                                                       cookie, -EOPNOTSUPP);
3746                 else
3747                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3748                 break;
3749         }
3750 }
3751
3752 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3753
3754 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3755 {
3756         struct rbd_device *rbd_dev = arg;
3757
3758         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3759
3760         down_write(&rbd_dev->lock_rwsem);
3761         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3762         up_write(&rbd_dev->lock_rwsem);
3763
3764         mutex_lock(&rbd_dev->watch_mutex);
3765         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3766                 __rbd_unregister_watch(rbd_dev);
3767                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3768
3769                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3770         }
3771         mutex_unlock(&rbd_dev->watch_mutex);
3772 }
3773
3774 /*
3775  * watch_mutex must be locked
3776  */
3777 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3778 {
3779         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3780         struct ceph_osd_linger_request *handle;
3781
3782         rbd_assert(!rbd_dev->watch_handle);
3783         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3784
3785         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3786                                  &rbd_dev->header_oloc, rbd_watch_cb,
3787                                  rbd_watch_errcb, rbd_dev);
3788         if (IS_ERR(handle))
3789                 return PTR_ERR(handle);
3790
3791         rbd_dev->watch_handle = handle;
3792         return 0;
3793 }
3794
3795 /*
3796  * watch_mutex must be locked
3797  */
3798 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3799 {
3800         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3801         int ret;
3802
3803         rbd_assert(rbd_dev->watch_handle);
3804         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3805
3806         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3807         if (ret)
3808                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3809
3810         rbd_dev->watch_handle = NULL;
3811 }
3812
3813 static int rbd_register_watch(struct rbd_device *rbd_dev)
3814 {
3815         int ret;
3816
3817         mutex_lock(&rbd_dev->watch_mutex);
3818         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3819         ret = __rbd_register_watch(rbd_dev);
3820         if (ret)
3821                 goto out;
3822
3823         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3824         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3825
3826 out:
3827         mutex_unlock(&rbd_dev->watch_mutex);
3828         return ret;
3829 }
3830
3831 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3832 {
3833         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3834
3835         cancel_work_sync(&rbd_dev->acquired_lock_work);
3836         cancel_work_sync(&rbd_dev->released_lock_work);
3837         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3838         cancel_work_sync(&rbd_dev->unlock_work);
3839 }
3840
3841 /*
3842  * header_rwsem must not be held to avoid a deadlock with
3843  * rbd_dev_refresh() when flushing notifies.
3844  */
3845 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3846 {
3847         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3848         cancel_tasks_sync(rbd_dev);
3849
3850         mutex_lock(&rbd_dev->watch_mutex);
3851         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3852                 __rbd_unregister_watch(rbd_dev);
3853         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3854         mutex_unlock(&rbd_dev->watch_mutex);
3855
3856         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3857         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3858 }
3859
3860 /*
3861  * lock_rwsem must be held for write
3862  */
3863 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3864 {
3865         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3866         char cookie[32];
3867         int ret;
3868
3869         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3870
3871         format_lock_cookie(rbd_dev, cookie);
3872         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3873                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
3874                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3875                                   RBD_LOCK_TAG, cookie);
3876         if (ret) {
3877                 if (ret != -EOPNOTSUPP)
3878                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3879                                  ret);
3880
3881                 /*
3882                  * Lock cookie cannot be updated on older OSDs, so do
3883                  * a manual release and queue an acquire.
3884                  */
3885                 if (rbd_release_lock(rbd_dev))
3886                         queue_delayed_work(rbd_dev->task_wq,
3887                                            &rbd_dev->lock_dwork, 0);
3888         } else {
3889                 __rbd_lock(rbd_dev, cookie);
3890         }
3891 }
3892
3893 static void rbd_reregister_watch(struct work_struct *work)
3894 {
3895         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3896                                             struct rbd_device, watch_dwork);
3897         int ret;
3898
3899         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3900
3901         mutex_lock(&rbd_dev->watch_mutex);
3902         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3903                 mutex_unlock(&rbd_dev->watch_mutex);
3904                 return;
3905         }
3906
3907         ret = __rbd_register_watch(rbd_dev);
3908         if (ret) {
3909                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3910                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3911                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3912                         wake_requests(rbd_dev, true);
3913                 } else {
3914                         queue_delayed_work(rbd_dev->task_wq,
3915                                            &rbd_dev->watch_dwork,
3916                                            RBD_RETRY_DELAY);
3917                 }
3918                 mutex_unlock(&rbd_dev->watch_mutex);
3919                 return;
3920         }
3921
3922         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3923         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3924         mutex_unlock(&rbd_dev->watch_mutex);
3925
3926         down_write(&rbd_dev->lock_rwsem);
3927         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3928                 rbd_reacquire_lock(rbd_dev);
3929         up_write(&rbd_dev->lock_rwsem);
3930
3931         ret = rbd_dev_refresh(rbd_dev);
3932         if (ret)
3933                 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3934 }
3935
3936 /*
3937  * Synchronous osd object method call.  Returns the number of bytes
3938  * returned in the outbound buffer, or a negative error code.
3939  */
3940 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3941                              struct ceph_object_id *oid,
3942                              struct ceph_object_locator *oloc,
3943                              const char *method_name,
3944                              const void *outbound,
3945                              size_t outbound_size,
3946                              void *inbound,
3947                              size_t inbound_size)
3948 {
3949         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3950         struct page *req_page = NULL;
3951         struct page *reply_page;
3952         int ret;
3953
3954         /*
3955          * Method calls are ultimately read operations.  The result
3956          * should placed into the inbound buffer provided.  They
3957          * also supply outbound data--parameters for the object
3958          * method.  Currently if this is present it will be a
3959          * snapshot id.
3960          */
3961         if (outbound) {
3962                 if (outbound_size > PAGE_SIZE)
3963                         return -E2BIG;
3964
3965                 req_page = alloc_page(GFP_KERNEL);
3966                 if (!req_page)
3967                         return -ENOMEM;
3968
3969                 memcpy(page_address(req_page), outbound, outbound_size);
3970         }
3971
3972         reply_page = alloc_page(GFP_KERNEL);
3973         if (!reply_page) {
3974                 if (req_page)
3975                         __free_page(req_page);
3976                 return -ENOMEM;
3977         }
3978
3979         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3980                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
3981                              reply_page, &inbound_size);
3982         if (!ret) {
3983                 memcpy(inbound, page_address(reply_page), inbound_size);
3984                 ret = inbound_size;
3985         }
3986
3987         if (req_page)
3988                 __free_page(req_page);
3989         __free_page(reply_page);
3990         return ret;
3991 }
3992
3993 /*
3994  * lock_rwsem must be held for read
3995  */
3996 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3997 {
3998         DEFINE_WAIT(wait);
3999
4000         do {
4001                 /*
4002                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
4003                  * and cancel_delayed_work() in wake_requests().
4004                  */
4005                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4006                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4007                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4008                                           TASK_UNINTERRUPTIBLE);
4009                 up_read(&rbd_dev->lock_rwsem);
4010                 schedule();
4011                 down_read(&rbd_dev->lock_rwsem);
4012         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4013                  !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4014
4015         finish_wait(&rbd_dev->lock_waitq, &wait);
4016 }
4017
4018 static void rbd_queue_workfn(struct work_struct *work)
4019 {
4020         struct request *rq = blk_mq_rq_from_pdu(work);
4021         struct rbd_device *rbd_dev = rq->q->queuedata;
4022         struct rbd_img_request *img_request;
4023         struct ceph_snap_context *snapc = NULL;
4024         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4025         u64 length = blk_rq_bytes(rq);
4026         enum obj_operation_type op_type;
4027         u64 mapping_size;
4028         bool must_be_locked;
4029         int result;
4030
4031         switch (req_op(rq)) {
4032         case REQ_OP_DISCARD:
4033         case REQ_OP_WRITE_ZEROES:
4034                 op_type = OBJ_OP_DISCARD;
4035                 break;
4036         case REQ_OP_WRITE:
4037                 op_type = OBJ_OP_WRITE;
4038                 break;
4039         case REQ_OP_READ:
4040                 op_type = OBJ_OP_READ;
4041                 break;
4042         default:
4043                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4044                 result = -EIO;
4045                 goto err;
4046         }
4047
4048         /* Ignore/skip any zero-length requests */
4049
4050         if (!length) {
4051                 dout("%s: zero-length request\n", __func__);
4052                 result = 0;
4053                 goto err_rq;
4054         }
4055
4056         /* Only reads are allowed to a read-only device */
4057
4058         if (op_type != OBJ_OP_READ) {
4059                 if (rbd_dev->mapping.read_only) {
4060                         result = -EROFS;
4061                         goto err_rq;
4062                 }
4063                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4064         }
4065
4066         /*
4067          * Quit early if the mapped snapshot no longer exists.  It's
4068          * still possible the snapshot will have disappeared by the
4069          * time our request arrives at the osd, but there's no sense in
4070          * sending it if we already know.
4071          */
4072         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4073                 dout("request for non-existent snapshot");
4074                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4075                 result = -ENXIO;
4076                 goto err_rq;
4077         }
4078
4079         if (offset && length > U64_MAX - offset + 1) {
4080                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4081                          length);
4082                 result = -EINVAL;
4083                 goto err_rq;    /* Shouldn't happen */
4084         }
4085
4086         blk_mq_start_request(rq);
4087
4088         down_read(&rbd_dev->header_rwsem);
4089         mapping_size = rbd_dev->mapping.size;
4090         if (op_type != OBJ_OP_READ) {
4091                 snapc = rbd_dev->header.snapc;
4092                 ceph_get_snap_context(snapc);
4093         }
4094         up_read(&rbd_dev->header_rwsem);
4095
4096         if (offset + length > mapping_size) {
4097                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4098                          length, mapping_size);
4099                 result = -EIO;
4100                 goto err_rq;
4101         }
4102
4103         must_be_locked =
4104             (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4105             (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4106         if (must_be_locked) {
4107                 down_read(&rbd_dev->lock_rwsem);
4108                 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4109                     !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4110                         if (rbd_dev->opts->exclusive) {
4111                                 rbd_warn(rbd_dev, "exclusive lock required");
4112                                 result = -EROFS;
4113                                 goto err_unlock;
4114                         }
4115                         rbd_wait_state_locked(rbd_dev);
4116                 }
4117                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4118                         result = -EBLACKLISTED;
4119                         goto err_unlock;
4120                 }
4121         }
4122
4123         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4124                                              snapc);
4125         if (!img_request) {
4126                 result = -ENOMEM;
4127                 goto err_unlock;
4128         }
4129         img_request->rq = rq;
4130         snapc = NULL; /* img_request consumes a ref */
4131
4132         if (op_type == OBJ_OP_DISCARD)
4133                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4134                                               NULL);
4135         else
4136                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4137                                               rq->bio);
4138         if (result)
4139                 goto err_img_request;
4140
4141         result = rbd_img_request_submit(img_request);
4142         if (result)
4143                 goto err_img_request;
4144
4145         if (must_be_locked)
4146                 up_read(&rbd_dev->lock_rwsem);
4147         return;
4148
4149 err_img_request:
4150         rbd_img_request_put(img_request);
4151 err_unlock:
4152         if (must_be_locked)
4153                 up_read(&rbd_dev->lock_rwsem);
4154 err_rq:
4155         if (result)
4156                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4157                          obj_op_name(op_type), length, offset, result);
4158         ceph_put_snap_context(snapc);
4159 err:
4160         blk_mq_end_request(rq, errno_to_blk_status(result));
4161 }
4162
4163 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4164                 const struct blk_mq_queue_data *bd)
4165 {
4166         struct request *rq = bd->rq;
4167         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4168
4169         queue_work(rbd_wq, work);
4170         return BLK_STS_OK;
4171 }
4172
4173 static void rbd_free_disk(struct rbd_device *rbd_dev)
4174 {
4175         blk_cleanup_queue(rbd_dev->disk->queue);
4176         blk_mq_free_tag_set(&rbd_dev->tag_set);
4177         put_disk(rbd_dev->disk);
4178         rbd_dev->disk = NULL;
4179 }
4180
4181 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4182                              struct ceph_object_id *oid,
4183                              struct ceph_object_locator *oloc,
4184                              void *buf, int buf_len)
4185
4186 {
4187         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4188         struct ceph_osd_request *req;
4189         struct page **pages;
4190         int num_pages = calc_pages_for(0, buf_len);
4191         int ret;
4192
4193         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4194         if (!req)
4195                 return -ENOMEM;
4196
4197         ceph_oid_copy(&req->r_base_oid, oid);
4198         ceph_oloc_copy(&req->r_base_oloc, oloc);
4199         req->r_flags = CEPH_OSD_FLAG_READ;
4200
4201         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4202         if (ret)
4203                 goto out_req;
4204
4205         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4206         if (IS_ERR(pages)) {
4207                 ret = PTR_ERR(pages);
4208                 goto out_req;
4209         }
4210
4211         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4212         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4213                                          true);
4214
4215         ceph_osdc_start_request(osdc, req, false);
4216         ret = ceph_osdc_wait_request(osdc, req);
4217         if (ret >= 0)
4218                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4219
4220 out_req:
4221         ceph_osdc_put_request(req);
4222         return ret;
4223 }
4224
4225 /*
4226  * Read the complete header for the given rbd device.  On successful
4227  * return, the rbd_dev->header field will contain up-to-date
4228  * information about the image.
4229  */
4230 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4231 {
4232         struct rbd_image_header_ondisk *ondisk = NULL;
4233         u32 snap_count = 0;
4234         u64 names_size = 0;
4235         u32 want_count;
4236         int ret;
4237
4238         /*
4239          * The complete header will include an array of its 64-bit
4240          * snapshot ids, followed by the names of those snapshots as
4241          * a contiguous block of NUL-terminated strings.  Note that
4242          * the number of snapshots could change by the time we read
4243          * it in, in which case we re-read it.
4244          */
4245         do {
4246                 size_t size;
4247
4248                 kfree(ondisk);
4249
4250                 size = sizeof (*ondisk);
4251                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4252                 size += names_size;
4253                 ondisk = kmalloc(size, GFP_KERNEL);
4254                 if (!ondisk)
4255                         return -ENOMEM;
4256
4257                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4258                                         &rbd_dev->header_oloc, ondisk, size);
4259                 if (ret < 0)
4260                         goto out;
4261                 if ((size_t)ret < size) {
4262                         ret = -ENXIO;
4263                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4264                                 size, ret);
4265                         goto out;
4266                 }
4267                 if (!rbd_dev_ondisk_valid(ondisk)) {
4268                         ret = -ENXIO;
4269                         rbd_warn(rbd_dev, "invalid header");
4270                         goto out;
4271                 }
4272
4273                 names_size = le64_to_cpu(ondisk->snap_names_len);
4274                 want_count = snap_count;
4275                 snap_count = le32_to_cpu(ondisk->snap_count);
4276         } while (snap_count != want_count);
4277
4278         ret = rbd_header_from_disk(rbd_dev, ondisk);
4279 out:
4280         kfree(ondisk);
4281
4282         return ret;
4283 }
4284
4285 /*
4286  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4287  * has disappeared from the (just updated) snapshot context.
4288  */
4289 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4290 {
4291         u64 snap_id;
4292
4293         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4294                 return;
4295
4296         snap_id = rbd_dev->spec->snap_id;
4297         if (snap_id == CEPH_NOSNAP)
4298                 return;
4299
4300         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4301                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4302 }
4303
4304 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4305 {
4306         sector_t size;
4307
4308         /*
4309          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4310          * try to update its size.  If REMOVING is set, updating size
4311          * is just useless work since the device can't be opened.
4312          */
4313         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4314             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4315                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4316                 dout("setting size to %llu sectors", (unsigned long long)size);
4317                 set_capacity(rbd_dev->disk, size);
4318                 revalidate_disk(rbd_dev->disk);
4319         }
4320 }
4321
4322 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4323 {
4324         u64 mapping_size;
4325         int ret;
4326
4327         down_write(&rbd_dev->header_rwsem);
4328         mapping_size = rbd_dev->mapping.size;
4329
4330         ret = rbd_dev_header_info(rbd_dev);
4331         if (ret)
4332                 goto out;
4333
4334         /*
4335          * If there is a parent, see if it has disappeared due to the
4336          * mapped image getting flattened.
4337          */
4338         if (rbd_dev->parent) {
4339                 ret = rbd_dev_v2_parent_info(rbd_dev);
4340                 if (ret)
4341                         goto out;
4342         }
4343
4344         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4345                 rbd_dev->mapping.size = rbd_dev->header.image_size;
4346         } else {
4347                 /* validate mapped snapshot's EXISTS flag */
4348                 rbd_exists_validate(rbd_dev);
4349         }
4350
4351 out:
4352         up_write(&rbd_dev->header_rwsem);
4353         if (!ret && mapping_size != rbd_dev->mapping.size)
4354                 rbd_dev_update_size(rbd_dev);
4355
4356         return ret;
4357 }
4358
4359 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4360                 unsigned int hctx_idx, unsigned int numa_node)
4361 {
4362         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4363
4364         INIT_WORK(work, rbd_queue_workfn);
4365         return 0;
4366 }
4367
4368 static const struct blk_mq_ops rbd_mq_ops = {
4369         .queue_rq       = rbd_queue_rq,
4370         .init_request   = rbd_init_request,
4371 };
4372
4373 static int rbd_init_disk(struct rbd_device *rbd_dev)
4374 {
4375         struct gendisk *disk;
4376         struct request_queue *q;
4377         u64 segment_size;
4378         int err;
4379
4380         /* create gendisk info */
4381         disk = alloc_disk(single_major ?
4382                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4383                           RBD_MINORS_PER_MAJOR);
4384         if (!disk)
4385                 return -ENOMEM;
4386
4387         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4388                  rbd_dev->dev_id);
4389         disk->major = rbd_dev->major;
4390         disk->first_minor = rbd_dev->minor;
4391         if (single_major)
4392                 disk->flags |= GENHD_FL_EXT_DEVT;
4393         disk->fops = &rbd_bd_ops;
4394         disk->private_data = rbd_dev;
4395
4396         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4397         rbd_dev->tag_set.ops = &rbd_mq_ops;
4398         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4399         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4400         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4401         rbd_dev->tag_set.nr_hw_queues = 1;
4402         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4403
4404         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4405         if (err)
4406                 goto out_disk;
4407
4408         q = blk_mq_init_queue(&rbd_dev->tag_set);
4409         if (IS_ERR(q)) {
4410                 err = PTR_ERR(q);
4411                 goto out_tag_set;
4412         }
4413
4414         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4415         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4416
4417         /* set io sizes to object size */
4418         segment_size = rbd_obj_bytes(&rbd_dev->header);
4419         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4420         q->limits.max_sectors = queue_max_hw_sectors(q);
4421         blk_queue_max_segments(q, USHRT_MAX);
4422         blk_queue_max_segment_size(q, segment_size);
4423         blk_queue_io_min(q, segment_size);
4424         blk_queue_io_opt(q, segment_size);
4425
4426         /* enable the discard support */
4427         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4428         q->limits.discard_granularity = segment_size;
4429         q->limits.discard_alignment = segment_size;
4430         blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4431         blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4432
4433         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4434                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4435
4436         /*
4437          * disk_release() expects a queue ref from add_disk() and will
4438          * put it.  Hold an extra ref until add_disk() is called.
4439          */
4440         WARN_ON(!blk_get_queue(q));
4441         disk->queue = q;
4442         q->queuedata = rbd_dev;
4443
4444         rbd_dev->disk = disk;
4445
4446         return 0;
4447 out_tag_set:
4448         blk_mq_free_tag_set(&rbd_dev->tag_set);
4449 out_disk:
4450         put_disk(disk);
4451         return err;
4452 }
4453
4454 /*
4455   sysfs
4456 */
4457
4458 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4459 {
4460         return container_of(dev, struct rbd_device, dev);
4461 }
4462
4463 static ssize_t rbd_size_show(struct device *dev,
4464                              struct device_attribute *attr, char *buf)
4465 {
4466         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4467
4468         return sprintf(buf, "%llu\n",
4469                 (unsigned long long)rbd_dev->mapping.size);
4470 }
4471
4472 /*
4473  * Note this shows the features for whatever's mapped, which is not
4474  * necessarily the base image.
4475  */
4476 static ssize_t rbd_features_show(struct device *dev,
4477                              struct device_attribute *attr, char *buf)
4478 {
4479         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4480
4481         return sprintf(buf, "0x%016llx\n",
4482                         (unsigned long long)rbd_dev->mapping.features);
4483 }
4484
4485 static ssize_t rbd_major_show(struct device *dev,
4486                               struct device_attribute *attr, char *buf)
4487 {
4488         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4489
4490         if (rbd_dev->major)
4491                 return sprintf(buf, "%d\n", rbd_dev->major);
4492
4493         return sprintf(buf, "(none)\n");
4494 }
4495
4496 static ssize_t rbd_minor_show(struct device *dev,
4497                               struct device_attribute *attr, char *buf)
4498 {
4499         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4500
4501         return sprintf(buf, "%d\n", rbd_dev->minor);
4502 }
4503
4504 static ssize_t rbd_client_addr_show(struct device *dev,
4505                                     struct device_attribute *attr, char *buf)
4506 {
4507         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4508         struct ceph_entity_addr *client_addr =
4509             ceph_client_addr(rbd_dev->rbd_client->client);
4510
4511         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4512                        le32_to_cpu(client_addr->nonce));
4513 }
4514
4515 static ssize_t rbd_client_id_show(struct device *dev,
4516                                   struct device_attribute *attr, char *buf)
4517 {
4518         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4519
4520         return sprintf(buf, "client%lld\n",
4521                        ceph_client_gid(rbd_dev->rbd_client->client));
4522 }
4523
4524 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4525                                      struct device_attribute *attr, char *buf)
4526 {
4527         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4528
4529         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4530 }
4531
4532 static ssize_t rbd_config_info_show(struct device *dev,
4533                                     struct device_attribute *attr, char *buf)
4534 {
4535         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4536
4537         if (!capable(CAP_SYS_ADMIN))
4538                 return -EPERM;
4539
4540         return sprintf(buf, "%s\n", rbd_dev->config_info);
4541 }
4542
4543 static ssize_t rbd_pool_show(struct device *dev,
4544                              struct device_attribute *attr, char *buf)
4545 {
4546         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4547
4548         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4549 }
4550
4551 static ssize_t rbd_pool_id_show(struct device *dev,
4552                              struct device_attribute *attr, char *buf)
4553 {
4554         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4555
4556         return sprintf(buf, "%llu\n",
4557                         (unsigned long long) rbd_dev->spec->pool_id);
4558 }
4559
4560 static ssize_t rbd_name_show(struct device *dev,
4561                              struct device_attribute *attr, char *buf)
4562 {
4563         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4564
4565         if (rbd_dev->spec->image_name)
4566                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4567
4568         return sprintf(buf, "(unknown)\n");
4569 }
4570
4571 static ssize_t rbd_image_id_show(struct device *dev,
4572                              struct device_attribute *attr, char *buf)
4573 {
4574         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4575
4576         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4577 }
4578
4579 /*
4580  * Shows the name of the currently-mapped snapshot (or
4581  * RBD_SNAP_HEAD_NAME for the base image).
4582  */
4583 static ssize_t rbd_snap_show(struct device *dev,
4584                              struct device_attribute *attr,
4585                              char *buf)
4586 {
4587         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4588
4589         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4590 }
4591
4592 static ssize_t rbd_snap_id_show(struct device *dev,
4593                                 struct device_attribute *attr, char *buf)
4594 {
4595         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4596
4597         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4598 }
4599
4600 /*
4601  * For a v2 image, shows the chain of parent images, separated by empty
4602  * lines.  For v1 images or if there is no parent, shows "(no parent
4603  * image)".
4604  */
4605 static ssize_t rbd_parent_show(struct device *dev,
4606                                struct device_attribute *attr,
4607                                char *buf)
4608 {
4609         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4610         ssize_t count = 0;
4611
4612         if (!rbd_dev->parent)
4613                 return sprintf(buf, "(no parent image)\n");
4614
4615         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4616                 struct rbd_spec *spec = rbd_dev->parent_spec;
4617
4618                 count += sprintf(&buf[count], "%s"
4619                             "pool_id %llu\npool_name %s\n"
4620                             "image_id %s\nimage_name %s\n"
4621                             "snap_id %llu\nsnap_name %s\n"
4622                             "overlap %llu\n",
4623                             !count ? "" : "\n", /* first? */
4624                             spec->pool_id, spec->pool_name,
4625                             spec->image_id, spec->image_name ?: "(unknown)",
4626                             spec->snap_id, spec->snap_name,
4627                             rbd_dev->parent_overlap);
4628         }
4629
4630         return count;
4631 }
4632
4633 static ssize_t rbd_image_refresh(struct device *dev,
4634                                  struct device_attribute *attr,
4635                                  const char *buf,
4636                                  size_t size)
4637 {
4638         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4639         int ret;
4640
4641         if (!capable(CAP_SYS_ADMIN))
4642                 return -EPERM;
4643
4644         ret = rbd_dev_refresh(rbd_dev);
4645         if (ret)
4646                 return ret;
4647
4648         return size;
4649 }
4650
4651 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4652 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4653 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4654 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4655 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4656 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4657 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4658 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4659 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4660 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4661 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4662 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4663 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4664 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4665 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4666 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4667
4668 static struct attribute *rbd_attrs[] = {
4669         &dev_attr_size.attr,
4670         &dev_attr_features.attr,
4671         &dev_attr_major.attr,
4672         &dev_attr_minor.attr,
4673         &dev_attr_client_addr.attr,
4674         &dev_attr_client_id.attr,
4675         &dev_attr_cluster_fsid.attr,
4676         &dev_attr_config_info.attr,
4677         &dev_attr_pool.attr,
4678         &dev_attr_pool_id.attr,
4679         &dev_attr_name.attr,
4680         &dev_attr_image_id.attr,
4681         &dev_attr_current_snap.attr,
4682         &dev_attr_snap_id.attr,
4683         &dev_attr_parent.attr,
4684         &dev_attr_refresh.attr,
4685         NULL
4686 };
4687
4688 static struct attribute_group rbd_attr_group = {
4689         .attrs = rbd_attrs,
4690 };
4691
4692 static const struct attribute_group *rbd_attr_groups[] = {
4693         &rbd_attr_group,
4694         NULL
4695 };
4696
4697 static void rbd_dev_release(struct device *dev);
4698
4699 static const struct device_type rbd_device_type = {
4700         .name           = "rbd",
4701         .groups         = rbd_attr_groups,
4702         .release        = rbd_dev_release,
4703 };
4704
4705 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4706 {
4707         kref_get(&spec->kref);
4708
4709         return spec;
4710 }
4711
4712 static void rbd_spec_free(struct kref *kref);
4713 static void rbd_spec_put(struct rbd_spec *spec)
4714 {
4715         if (spec)
4716                 kref_put(&spec->kref, rbd_spec_free);
4717 }
4718
4719 static struct rbd_spec *rbd_spec_alloc(void)
4720 {
4721         struct rbd_spec *spec;
4722
4723         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4724         if (!spec)
4725                 return NULL;
4726
4727         spec->pool_id = CEPH_NOPOOL;
4728         spec->snap_id = CEPH_NOSNAP;
4729         kref_init(&spec->kref);
4730
4731         return spec;
4732 }
4733
4734 static void rbd_spec_free(struct kref *kref)
4735 {
4736         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4737
4738         kfree(spec->pool_name);
4739         kfree(spec->image_id);
4740         kfree(spec->image_name);
4741         kfree(spec->snap_name);
4742         kfree(spec);
4743 }
4744
4745 static void rbd_dev_free(struct rbd_device *rbd_dev)
4746 {
4747         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4748         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4749
4750         ceph_oid_destroy(&rbd_dev->header_oid);
4751         ceph_oloc_destroy(&rbd_dev->header_oloc);
4752         kfree(rbd_dev->config_info);
4753
4754         rbd_put_client(rbd_dev->rbd_client);
4755         rbd_spec_put(rbd_dev->spec);
4756         kfree(rbd_dev->opts);
4757         kfree(rbd_dev);
4758 }
4759
4760 static void rbd_dev_release(struct device *dev)
4761 {
4762         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4763         bool need_put = !!rbd_dev->opts;
4764
4765         if (need_put) {
4766                 destroy_workqueue(rbd_dev->task_wq);
4767                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4768         }
4769
4770         rbd_dev_free(rbd_dev);
4771
4772         /*
4773          * This is racy, but way better than putting module outside of
4774          * the release callback.  The race window is pretty small, so
4775          * doing something similar to dm (dm-builtin.c) is overkill.
4776          */
4777         if (need_put)
4778                 module_put(THIS_MODULE);
4779 }
4780
4781 static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec)
4782 {
4783         struct rbd_device *rbd_dev;
4784
4785         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4786         if (!rbd_dev)
4787                 return NULL;
4788
4789         spin_lock_init(&rbd_dev->lock);
4790         INIT_LIST_HEAD(&rbd_dev->node);
4791         init_rwsem(&rbd_dev->header_rwsem);
4792
4793         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4794         ceph_oid_init(&rbd_dev->header_oid);
4795         rbd_dev->header_oloc.pool = spec->pool_id;
4796
4797         mutex_init(&rbd_dev->watch_mutex);
4798         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4799         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4800
4801         init_rwsem(&rbd_dev->lock_rwsem);
4802         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4803         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4804         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4805         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4806         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4807         init_waitqueue_head(&rbd_dev->lock_waitq);
4808
4809         rbd_dev->dev.bus = &rbd_bus_type;
4810         rbd_dev->dev.type = &rbd_device_type;
4811         rbd_dev->dev.parent = &rbd_root_dev;
4812         device_initialize(&rbd_dev->dev);
4813
4814         return rbd_dev;
4815 }
4816
4817 /*
4818  * Create a mapping rbd_dev.
4819  */
4820 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4821                                          struct rbd_spec *spec,
4822                                          struct rbd_options *opts)
4823 {
4824         struct rbd_device *rbd_dev;
4825
4826         rbd_dev = __rbd_dev_create(spec);
4827         if (!rbd_dev)
4828                 return NULL;
4829
4830         /* get an id and fill in device name */
4831         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4832                                          minor_to_rbd_dev_id(1 << MINORBITS),
4833                                          GFP_KERNEL);
4834         if (rbd_dev->dev_id < 0)
4835                 goto fail_rbd_dev;
4836
4837         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4838         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4839                                                    rbd_dev->name);
4840         if (!rbd_dev->task_wq)
4841                 goto fail_dev_id;
4842
4843         /* we have a ref from do_rbd_add() */
4844         __module_get(THIS_MODULE);
4845
4846         rbd_dev->rbd_client = rbdc;
4847         rbd_dev->spec = spec;
4848         rbd_dev->opts = opts;
4849
4850         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4851         return rbd_dev;
4852
4853 fail_dev_id:
4854         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4855 fail_rbd_dev:
4856         rbd_dev_free(rbd_dev);
4857         return NULL;
4858 }
4859
4860 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4861 {
4862         if (rbd_dev)
4863                 put_device(&rbd_dev->dev);
4864 }
4865
4866 /*
4867  * Get the size and object order for an image snapshot, or if
4868  * snap_id is CEPH_NOSNAP, gets this information for the base
4869  * image.
4870  */
4871 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4872                                 u8 *order, u64 *snap_size)
4873 {
4874         __le64 snapid = cpu_to_le64(snap_id);
4875         int ret;
4876         struct {
4877                 u8 order;
4878                 __le64 size;
4879         } __attribute__ ((packed)) size_buf = { 0 };
4880
4881         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4882                                   &rbd_dev->header_oloc, "get_size",
4883                                   &snapid, sizeof(snapid),
4884                                   &size_buf, sizeof(size_buf));
4885         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4886         if (ret < 0)
4887                 return ret;
4888         if (ret < sizeof (size_buf))
4889                 return -ERANGE;
4890
4891         if (order) {
4892                 *order = size_buf.order;
4893                 dout("  order %u", (unsigned int)*order);
4894         }
4895         *snap_size = le64_to_cpu(size_buf.size);
4896
4897         dout("  snap_id 0x%016llx snap_size = %llu\n",
4898                 (unsigned long long)snap_id,
4899                 (unsigned long long)*snap_size);
4900
4901         return 0;
4902 }
4903
4904 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4905 {
4906         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4907                                         &rbd_dev->header.obj_order,
4908                                         &rbd_dev->header.image_size);
4909 }
4910
4911 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4912 {
4913         void *reply_buf;
4914         int ret;
4915         void *p;
4916
4917         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4918         if (!reply_buf)
4919                 return -ENOMEM;
4920
4921         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4922                                   &rbd_dev->header_oloc, "get_object_prefix",
4923                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4924         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4925         if (ret < 0)
4926                 goto out;
4927
4928         p = reply_buf;
4929         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4930                                                 p + ret, NULL, GFP_NOIO);
4931         ret = 0;
4932
4933         if (IS_ERR(rbd_dev->header.object_prefix)) {
4934                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4935                 rbd_dev->header.object_prefix = NULL;
4936         } else {
4937                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4938         }
4939 out:
4940         kfree(reply_buf);
4941
4942         return ret;
4943 }
4944
4945 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4946                 u64 *snap_features)
4947 {
4948         __le64 snapid = cpu_to_le64(snap_id);
4949         struct {
4950                 __le64 features;
4951                 __le64 incompat;
4952         } __attribute__ ((packed)) features_buf = { 0 };
4953         u64 unsup;
4954         int ret;
4955
4956         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4957                                   &rbd_dev->header_oloc, "get_features",
4958                                   &snapid, sizeof(snapid),
4959                                   &features_buf, sizeof(features_buf));
4960         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4961         if (ret < 0)
4962                 return ret;
4963         if (ret < sizeof (features_buf))
4964                 return -ERANGE;
4965
4966         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4967         if (unsup) {
4968                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4969                          unsup);
4970                 return -ENXIO;
4971         }
4972
4973         *snap_features = le64_to_cpu(features_buf.features);
4974
4975         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4976                 (unsigned long long)snap_id,
4977                 (unsigned long long)*snap_features,
4978                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4979
4980         return 0;
4981 }
4982
4983 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4984 {
4985         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4986                                                 &rbd_dev->header.features);
4987 }
4988
4989 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4990 {
4991         struct rbd_spec *parent_spec;
4992         size_t size;
4993         void *reply_buf = NULL;
4994         __le64 snapid;
4995         void *p;
4996         void *end;
4997         u64 pool_id;
4998         char *image_id;
4999         u64 snap_id;
5000         u64 overlap;
5001         int ret;
5002
5003         parent_spec = rbd_spec_alloc();
5004         if (!parent_spec)
5005                 return -ENOMEM;
5006
5007         size = sizeof (__le64) +                                /* pool_id */
5008                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
5009                 sizeof (__le64) +                               /* snap_id */
5010                 sizeof (__le64);                                /* overlap */
5011         reply_buf = kmalloc(size, GFP_KERNEL);
5012         if (!reply_buf) {
5013                 ret = -ENOMEM;
5014                 goto out_err;
5015         }
5016
5017         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5018         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5019                                   &rbd_dev->header_oloc, "get_parent",
5020                                   &snapid, sizeof(snapid), reply_buf, size);
5021         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5022         if (ret < 0)
5023                 goto out_err;
5024
5025         p = reply_buf;
5026         end = reply_buf + ret;
5027         ret = -ERANGE;
5028         ceph_decode_64_safe(&p, end, pool_id, out_err);
5029         if (pool_id == CEPH_NOPOOL) {
5030                 /*
5031                  * Either the parent never existed, or we have
5032                  * record of it but the image got flattened so it no
5033                  * longer has a parent.  When the parent of a
5034                  * layered image disappears we immediately set the
5035                  * overlap to 0.  The effect of this is that all new
5036                  * requests will be treated as if the image had no
5037                  * parent.
5038                  */
5039                 if (rbd_dev->parent_overlap) {
5040                         rbd_dev->parent_overlap = 0;
5041                         rbd_dev_parent_put(rbd_dev);
5042                         pr_info("%s: clone image has been flattened\n",
5043                                 rbd_dev->disk->disk_name);
5044                 }
5045
5046                 goto out;       /* No parent?  No problem. */
5047         }
5048
5049         /* The ceph file layout needs to fit pool id in 32 bits */
5050
5051         ret = -EIO;
5052         if (pool_id > (u64)U32_MAX) {
5053                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5054                         (unsigned long long)pool_id, U32_MAX);
5055                 goto out_err;
5056         }
5057
5058         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5059         if (IS_ERR(image_id)) {
5060                 ret = PTR_ERR(image_id);
5061                 goto out_err;
5062         }
5063         ceph_decode_64_safe(&p, end, snap_id, out_err);
5064         ceph_decode_64_safe(&p, end, overlap, out_err);
5065
5066         /*
5067          * The parent won't change (except when the clone is
5068          * flattened, already handled that).  So we only need to
5069          * record the parent spec we have not already done so.
5070          */
5071         if (!rbd_dev->parent_spec) {
5072                 parent_spec->pool_id = pool_id;
5073                 parent_spec->image_id = image_id;
5074                 parent_spec->snap_id = snap_id;
5075                 rbd_dev->parent_spec = parent_spec;
5076                 parent_spec = NULL;     /* rbd_dev now owns this */
5077         } else {
5078                 kfree(image_id);
5079         }
5080
5081         /*
5082          * We always update the parent overlap.  If it's zero we issue
5083          * a warning, as we will proceed as if there was no parent.
5084          */
5085         if (!overlap) {
5086                 if (parent_spec) {
5087                         /* refresh, careful to warn just once */
5088                         if (rbd_dev->parent_overlap)
5089                                 rbd_warn(rbd_dev,
5090                                     "clone now standalone (overlap became 0)");
5091                 } else {
5092                         /* initial probe */
5093                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5094                 }
5095         }
5096         rbd_dev->parent_overlap = overlap;
5097
5098 out:
5099         ret = 0;
5100 out_err:
5101         kfree(reply_buf);
5102         rbd_spec_put(parent_spec);
5103
5104         return ret;
5105 }
5106
5107 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5108 {
5109         struct {
5110                 __le64 stripe_unit;
5111                 __le64 stripe_count;
5112         } __attribute__ ((packed)) striping_info_buf = { 0 };
5113         size_t size = sizeof (striping_info_buf);
5114         void *p;
5115         u64 obj_size;
5116         u64 stripe_unit;
5117         u64 stripe_count;
5118         int ret;
5119
5120         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5121                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5122                                 NULL, 0, &striping_info_buf, size);
5123         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5124         if (ret < 0)
5125                 return ret;
5126         if (ret < size)
5127                 return -ERANGE;
5128
5129         /*
5130          * We don't actually support the "fancy striping" feature
5131          * (STRIPINGV2) yet, but if the striping sizes are the
5132          * defaults the behavior is the same as before.  So find
5133          * out, and only fail if the image has non-default values.
5134          */
5135         ret = -EINVAL;
5136         obj_size = rbd_obj_bytes(&rbd_dev->header);
5137         p = &striping_info_buf;
5138         stripe_unit = ceph_decode_64(&p);
5139         if (stripe_unit != obj_size) {
5140                 rbd_warn(rbd_dev, "unsupported stripe unit "
5141                                 "(got %llu want %llu)",
5142                                 stripe_unit, obj_size);
5143                 return -EINVAL;
5144         }
5145         stripe_count = ceph_decode_64(&p);
5146         if (stripe_count != 1) {
5147                 rbd_warn(rbd_dev, "unsupported stripe count "
5148                                 "(got %llu want 1)", stripe_count);
5149                 return -EINVAL;
5150         }
5151         rbd_dev->header.stripe_unit = stripe_unit;
5152         rbd_dev->header.stripe_count = stripe_count;
5153
5154         return 0;
5155 }
5156
5157 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5158 {
5159         __le64 data_pool_id;
5160         int ret;
5161
5162         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5163                                   &rbd_dev->header_oloc, "get_data_pool",
5164                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5165         if (ret < 0)
5166                 return ret;
5167         if (ret < sizeof(data_pool_id))
5168                 return -EBADMSG;
5169
5170         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5171         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5172         return 0;
5173 }
5174
5175 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5176 {
5177         CEPH_DEFINE_OID_ONSTACK(oid);
5178         size_t image_id_size;
5179         char *image_id;
5180         void *p;
5181         void *end;
5182         size_t size;
5183         void *reply_buf = NULL;
5184         size_t len = 0;
5185         char *image_name = NULL;
5186         int ret;
5187
5188         rbd_assert(!rbd_dev->spec->image_name);
5189
5190         len = strlen(rbd_dev->spec->image_id);
5191         image_id_size = sizeof (__le32) + len;
5192         image_id = kmalloc(image_id_size, GFP_KERNEL);
5193         if (!image_id)
5194                 return NULL;
5195
5196         p = image_id;
5197         end = image_id + image_id_size;
5198         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5199
5200         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5201         reply_buf = kmalloc(size, GFP_KERNEL);
5202         if (!reply_buf)
5203                 goto out;
5204
5205         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5206         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5207                                   "dir_get_name", image_id, image_id_size,
5208                                   reply_buf, size);
5209         if (ret < 0)
5210                 goto out;
5211         p = reply_buf;
5212         end = reply_buf + ret;
5213
5214         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5215         if (IS_ERR(image_name))
5216                 image_name = NULL;
5217         else
5218                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5219 out:
5220         kfree(reply_buf);
5221         kfree(image_id);
5222
5223         return image_name;
5224 }
5225
5226 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5227 {
5228         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5229         const char *snap_name;
5230         u32 which = 0;
5231
5232         /* Skip over names until we find the one we are looking for */
5233
5234         snap_name = rbd_dev->header.snap_names;
5235         while (which < snapc->num_snaps) {
5236                 if (!strcmp(name, snap_name))
5237                         return snapc->snaps[which];
5238                 snap_name += strlen(snap_name) + 1;
5239                 which++;
5240         }
5241         return CEPH_NOSNAP;
5242 }
5243
5244 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5245 {
5246         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5247         u32 which;
5248         bool found = false;
5249         u64 snap_id;
5250
5251         for (which = 0; !found && which < snapc->num_snaps; which++) {
5252                 const char *snap_name;
5253
5254                 snap_id = snapc->snaps[which];
5255                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5256                 if (IS_ERR(snap_name)) {
5257                         /* ignore no-longer existing snapshots */
5258                         if (PTR_ERR(snap_name) == -ENOENT)
5259                                 continue;
5260                         else
5261                                 break;
5262                 }
5263                 found = !strcmp(name, snap_name);
5264                 kfree(snap_name);
5265         }
5266         return found ? snap_id : CEPH_NOSNAP;
5267 }
5268
5269 /*
5270  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5271  * no snapshot by that name is found, or if an error occurs.
5272  */
5273 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5274 {
5275         if (rbd_dev->image_format == 1)
5276                 return rbd_v1_snap_id_by_name(rbd_dev, name);
5277
5278         return rbd_v2_snap_id_by_name(rbd_dev, name);
5279 }
5280
5281 /*
5282  * An image being mapped will have everything but the snap id.
5283  */
5284 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5285 {
5286         struct rbd_spec *spec = rbd_dev->spec;
5287
5288         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5289         rbd_assert(spec->image_id && spec->image_name);
5290         rbd_assert(spec->snap_name);
5291
5292         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5293                 u64 snap_id;
5294
5295                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5296                 if (snap_id == CEPH_NOSNAP)
5297                         return -ENOENT;
5298
5299                 spec->snap_id = snap_id;
5300         } else {
5301                 spec->snap_id = CEPH_NOSNAP;
5302         }
5303
5304         return 0;
5305 }
5306
5307 /*
5308  * A parent image will have all ids but none of the names.
5309  *
5310  * All names in an rbd spec are dynamically allocated.  It's OK if we
5311  * can't figure out the name for an image id.
5312  */
5313 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5314 {
5315         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5316         struct rbd_spec *spec = rbd_dev->spec;
5317         const char *pool_name;
5318         const char *image_name;
5319         const char *snap_name;
5320         int ret;
5321
5322         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5323         rbd_assert(spec->image_id);
5324         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5325
5326         /* Get the pool name; we have to make our own copy of this */
5327
5328         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5329         if (!pool_name) {
5330                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5331                 return -EIO;
5332         }
5333         pool_name = kstrdup(pool_name, GFP_KERNEL);
5334         if (!pool_name)
5335                 return -ENOMEM;
5336
5337         /* Fetch the image name; tolerate failure here */
5338
5339         image_name = rbd_dev_image_name(rbd_dev);
5340         if (!image_name)
5341                 rbd_warn(rbd_dev, "unable to get image name");
5342
5343         /* Fetch the snapshot name */
5344
5345         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5346         if (IS_ERR(snap_name)) {
5347                 ret = PTR_ERR(snap_name);
5348                 goto out_err;
5349         }
5350
5351         spec->pool_name = pool_name;
5352         spec->image_name = image_name;
5353         spec->snap_name = snap_name;
5354
5355         return 0;
5356
5357 out_err:
5358         kfree(image_name);
5359         kfree(pool_name);
5360         return ret;
5361 }
5362
5363 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5364 {
5365         size_t size;
5366         int ret;
5367         void *reply_buf;
5368         void *p;
5369         void *end;
5370         u64 seq;
5371         u32 snap_count;
5372         struct ceph_snap_context *snapc;
5373         u32 i;
5374
5375         /*
5376          * We'll need room for the seq value (maximum snapshot id),
5377          * snapshot count, and array of that many snapshot ids.
5378          * For now we have a fixed upper limit on the number we're
5379          * prepared to receive.
5380          */
5381         size = sizeof (__le64) + sizeof (__le32) +
5382                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5383         reply_buf = kzalloc(size, GFP_KERNEL);
5384         if (!reply_buf)
5385                 return -ENOMEM;
5386
5387         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5388                                   &rbd_dev->header_oloc, "get_snapcontext",
5389                                   NULL, 0, reply_buf, size);
5390         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5391         if (ret < 0)
5392                 goto out;
5393
5394         p = reply_buf;
5395         end = reply_buf + ret;
5396         ret = -ERANGE;
5397         ceph_decode_64_safe(&p, end, seq, out);
5398         ceph_decode_32_safe(&p, end, snap_count, out);
5399
5400         /*
5401          * Make sure the reported number of snapshot ids wouldn't go
5402          * beyond the end of our buffer.  But before checking that,
5403          * make sure the computed size of the snapshot context we
5404          * allocate is representable in a size_t.
5405          */
5406         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5407                                  / sizeof (u64)) {
5408                 ret = -EINVAL;
5409                 goto out;
5410         }
5411         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5412                 goto out;
5413         ret = 0;
5414
5415         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5416         if (!snapc) {
5417                 ret = -ENOMEM;
5418                 goto out;
5419         }
5420         snapc->seq = seq;
5421         for (i = 0; i < snap_count; i++)
5422                 snapc->snaps[i] = ceph_decode_64(&p);
5423
5424         ceph_put_snap_context(rbd_dev->header.snapc);
5425         rbd_dev->header.snapc = snapc;
5426
5427         dout("  snap context seq = %llu, snap_count = %u\n",
5428                 (unsigned long long)seq, (unsigned int)snap_count);
5429 out:
5430         kfree(reply_buf);
5431
5432         return ret;
5433 }
5434
5435 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5436                                         u64 snap_id)
5437 {
5438         size_t size;
5439         void *reply_buf;
5440         __le64 snapid;
5441         int ret;
5442         void *p;
5443         void *end;
5444         char *snap_name;
5445
5446         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5447         reply_buf = kmalloc(size, GFP_KERNEL);
5448         if (!reply_buf)
5449                 return ERR_PTR(-ENOMEM);
5450
5451         snapid = cpu_to_le64(snap_id);
5452         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5453                                   &rbd_dev->header_oloc, "get_snapshot_name",
5454                                   &snapid, sizeof(snapid), reply_buf, size);
5455         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5456         if (ret < 0) {
5457                 snap_name = ERR_PTR(ret);
5458                 goto out;
5459         }
5460
5461         p = reply_buf;
5462         end = reply_buf + ret;
5463         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5464         if (IS_ERR(snap_name))
5465                 goto out;
5466
5467         dout("  snap_id 0x%016llx snap_name = %s\n",
5468                 (unsigned long long)snap_id, snap_name);
5469 out:
5470         kfree(reply_buf);
5471
5472         return snap_name;
5473 }
5474
5475 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5476 {
5477         bool first_time = rbd_dev->header.object_prefix == NULL;
5478         int ret;
5479
5480         ret = rbd_dev_v2_image_size(rbd_dev);
5481         if (ret)
5482                 return ret;
5483
5484         if (first_time) {
5485                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5486                 if (ret)
5487                         return ret;
5488         }
5489
5490         ret = rbd_dev_v2_snap_context(rbd_dev);
5491         if (ret && first_time) {
5492                 kfree(rbd_dev->header.object_prefix);
5493                 rbd_dev->header.object_prefix = NULL;
5494         }
5495
5496         return ret;
5497 }
5498
5499 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5500 {
5501         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5502
5503         if (rbd_dev->image_format == 1)
5504                 return rbd_dev_v1_header_info(rbd_dev);
5505
5506         return rbd_dev_v2_header_info(rbd_dev);
5507 }
5508
5509 /*
5510  * Skips over white space at *buf, and updates *buf to point to the
5511  * first found non-space character (if any). Returns the length of
5512  * the token (string of non-white space characters) found.  Note
5513  * that *buf must be terminated with '\0'.
5514  */
5515 static inline size_t next_token(const char **buf)
5516 {
5517         /*
5518         * These are the characters that produce nonzero for
5519         * isspace() in the "C" and "POSIX" locales.
5520         */
5521         const char *spaces = " \f\n\r\t\v";
5522
5523         *buf += strspn(*buf, spaces);   /* Find start of token */
5524
5525         return strcspn(*buf, spaces);   /* Return token length */
5526 }
5527
5528 /*
5529  * Finds the next token in *buf, dynamically allocates a buffer big
5530  * enough to hold a copy of it, and copies the token into the new
5531  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5532  * that a duplicate buffer is created even for a zero-length token.
5533  *
5534  * Returns a pointer to the newly-allocated duplicate, or a null
5535  * pointer if memory for the duplicate was not available.  If
5536  * the lenp argument is a non-null pointer, the length of the token
5537  * (not including the '\0') is returned in *lenp.
5538  *
5539  * If successful, the *buf pointer will be updated to point beyond
5540  * the end of the found token.
5541  *
5542  * Note: uses GFP_KERNEL for allocation.
5543  */
5544 static inline char *dup_token(const char **buf, size_t *lenp)
5545 {
5546         char *dup;
5547         size_t len;
5548
5549         len = next_token(buf);
5550         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5551         if (!dup)
5552                 return NULL;
5553         *(dup + len) = '\0';
5554         *buf += len;
5555
5556         if (lenp)
5557                 *lenp = len;
5558
5559         return dup;
5560 }
5561
5562 /*
5563  * Parse the options provided for an "rbd add" (i.e., rbd image
5564  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5565  * and the data written is passed here via a NUL-terminated buffer.
5566  * Returns 0 if successful or an error code otherwise.
5567  *
5568  * The information extracted from these options is recorded in
5569  * the other parameters which return dynamically-allocated
5570  * structures:
5571  *  ceph_opts
5572  *      The address of a pointer that will refer to a ceph options
5573  *      structure.  Caller must release the returned pointer using
5574  *      ceph_destroy_options() when it is no longer needed.
5575  *  rbd_opts
5576  *      Address of an rbd options pointer.  Fully initialized by
5577  *      this function; caller must release with kfree().
5578  *  spec
5579  *      Address of an rbd image specification pointer.  Fully
5580  *      initialized by this function based on parsed options.
5581  *      Caller must release with rbd_spec_put().
5582  *
5583  * The options passed take this form:
5584  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5585  * where:
5586  *  <mon_addrs>
5587  *      A comma-separated list of one or more monitor addresses.
5588  *      A monitor address is an ip address, optionally followed
5589  *      by a port number (separated by a colon).
5590  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5591  *  <options>
5592  *      A comma-separated list of ceph and/or rbd options.
5593  *  <pool_name>
5594  *      The name of the rados pool containing the rbd image.
5595  *  <image_name>
5596  *      The name of the image in that pool to map.
5597  *  <snap_id>
5598  *      An optional snapshot id.  If provided, the mapping will
5599  *      present data from the image at the time that snapshot was
5600  *      created.  The image head is used if no snapshot id is
5601  *      provided.  Snapshot mappings are always read-only.
5602  */
5603 static int rbd_add_parse_args(const char *buf,
5604                                 struct ceph_options **ceph_opts,
5605                                 struct rbd_options **opts,
5606                                 struct rbd_spec **rbd_spec)
5607 {
5608         size_t len;
5609         char *options;
5610         const char *mon_addrs;
5611         char *snap_name;
5612         size_t mon_addrs_size;
5613         struct rbd_spec *spec = NULL;
5614         struct rbd_options *rbd_opts = NULL;
5615         struct ceph_options *copts;
5616         int ret;
5617
5618         /* The first four tokens are required */
5619
5620         len = next_token(&buf);
5621         if (!len) {
5622                 rbd_warn(NULL, "no monitor address(es) provided");
5623                 return -EINVAL;
5624         }
5625         mon_addrs = buf;
5626         mon_addrs_size = len + 1;
5627         buf += len;
5628
5629         ret = -EINVAL;
5630         options = dup_token(&buf, NULL);
5631         if (!options)
5632                 return -ENOMEM;
5633         if (!*options) {
5634                 rbd_warn(NULL, "no options provided");
5635                 goto out_err;
5636         }
5637
5638         spec = rbd_spec_alloc();
5639         if (!spec)
5640                 goto out_mem;
5641
5642         spec->pool_name = dup_token(&buf, NULL);
5643         if (!spec->pool_name)
5644                 goto out_mem;
5645         if (!*spec->pool_name) {
5646                 rbd_warn(NULL, "no pool name provided");
5647                 goto out_err;
5648         }
5649
5650         spec->image_name = dup_token(&buf, NULL);
5651         if (!spec->image_name)
5652                 goto out_mem;
5653         if (!*spec->image_name) {
5654                 rbd_warn(NULL, "no image name provided");
5655                 goto out_err;
5656         }
5657
5658         /*
5659          * Snapshot name is optional; default is to use "-"
5660          * (indicating the head/no snapshot).
5661          */
5662         len = next_token(&buf);
5663         if (!len) {
5664                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5665                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5666         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5667                 ret = -ENAMETOOLONG;
5668                 goto out_err;
5669         }
5670         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5671         if (!snap_name)
5672                 goto out_mem;
5673         *(snap_name + len) = '\0';
5674         spec->snap_name = snap_name;
5675
5676         /* Initialize all rbd options to the defaults */
5677
5678         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5679         if (!rbd_opts)
5680                 goto out_mem;
5681
5682         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5683         rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5684         rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5685         rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5686
5687         copts = ceph_parse_options(options, mon_addrs,
5688                                         mon_addrs + mon_addrs_size - 1,
5689                                         parse_rbd_opts_token, rbd_opts);
5690         if (IS_ERR(copts)) {
5691                 ret = PTR_ERR(copts);
5692                 goto out_err;
5693         }
5694         kfree(options);
5695
5696         *ceph_opts = copts;
5697         *opts = rbd_opts;
5698         *rbd_spec = spec;
5699
5700         return 0;
5701 out_mem:
5702         ret = -ENOMEM;
5703 out_err:
5704         kfree(rbd_opts);
5705         rbd_spec_put(spec);
5706         kfree(options);
5707
5708         return ret;
5709 }
5710
5711 /*
5712  * Return pool id (>= 0) or a negative error code.
5713  */
5714 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5715 {
5716         struct ceph_options *opts = rbdc->client->options;
5717         u64 newest_epoch;
5718         int tries = 0;
5719         int ret;
5720
5721 again:
5722         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5723         if (ret == -ENOENT && tries++ < 1) {
5724                 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5725                                             &newest_epoch);
5726                 if (ret < 0)
5727                         return ret;
5728
5729                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5730                         ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5731                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5732                                                      newest_epoch,
5733                                                      opts->mount_timeout);
5734                         goto again;
5735                 } else {
5736                         /* the osdmap we have is new enough */
5737                         return -ENOENT;
5738                 }
5739         }
5740
5741         return ret;
5742 }
5743
5744 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5745 {
5746         down_write(&rbd_dev->lock_rwsem);
5747         if (__rbd_is_lock_owner(rbd_dev))
5748                 rbd_unlock(rbd_dev);
5749         up_write(&rbd_dev->lock_rwsem);
5750 }
5751
5752 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5753 {
5754         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5755                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5756                 return -EINVAL;
5757         }
5758
5759         /* FIXME: "rbd map --exclusive" should be in interruptible */
5760         down_read(&rbd_dev->lock_rwsem);
5761         rbd_wait_state_locked(rbd_dev);
5762         up_read(&rbd_dev->lock_rwsem);
5763         if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5764                 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5765                 return -EROFS;
5766         }
5767
5768         return 0;
5769 }
5770
5771 /*
5772  * An rbd format 2 image has a unique identifier, distinct from the
5773  * name given to it by the user.  Internally, that identifier is
5774  * what's used to specify the names of objects related to the image.
5775  *
5776  * A special "rbd id" object is used to map an rbd image name to its
5777  * id.  If that object doesn't exist, then there is no v2 rbd image
5778  * with the supplied name.
5779  *
5780  * This function will record the given rbd_dev's image_id field if
5781  * it can be determined, and in that case will return 0.  If any
5782  * errors occur a negative errno will be returned and the rbd_dev's
5783  * image_id field will be unchanged (and should be NULL).
5784  */
5785 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5786 {
5787         int ret;
5788         size_t size;
5789         CEPH_DEFINE_OID_ONSTACK(oid);
5790         void *response;
5791         char *image_id;
5792
5793         /*
5794          * When probing a parent image, the image id is already
5795          * known (and the image name likely is not).  There's no
5796          * need to fetch the image id again in this case.  We
5797          * do still need to set the image format though.
5798          */
5799         if (rbd_dev->spec->image_id) {
5800                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5801
5802                 return 0;
5803         }
5804
5805         /*
5806          * First, see if the format 2 image id file exists, and if
5807          * so, get the image's persistent id from it.
5808          */
5809         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5810                                rbd_dev->spec->image_name);
5811         if (ret)
5812                 return ret;
5813
5814         dout("rbd id object name is %s\n", oid.name);
5815
5816         /* Response will be an encoded string, which includes a length */
5817
5818         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5819         response = kzalloc(size, GFP_NOIO);
5820         if (!response) {
5821                 ret = -ENOMEM;
5822                 goto out;
5823         }
5824
5825         /* If it doesn't exist we'll assume it's a format 1 image */
5826
5827         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5828                                   "get_id", NULL, 0,
5829                                   response, RBD_IMAGE_ID_LEN_MAX);
5830         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5831         if (ret == -ENOENT) {
5832                 image_id = kstrdup("", GFP_KERNEL);
5833                 ret = image_id ? 0 : -ENOMEM;
5834                 if (!ret)
5835                         rbd_dev->image_format = 1;
5836         } else if (ret >= 0) {
5837                 void *p = response;
5838
5839                 image_id = ceph_extract_encoded_string(&p, p + ret,
5840                                                 NULL, GFP_NOIO);
5841                 ret = PTR_ERR_OR_ZERO(image_id);
5842                 if (!ret)
5843                         rbd_dev->image_format = 2;
5844         }
5845
5846         if (!ret) {
5847                 rbd_dev->spec->image_id = image_id;
5848                 dout("image_id is %s\n", image_id);
5849         }
5850 out:
5851         kfree(response);
5852         ceph_oid_destroy(&oid);
5853         return ret;
5854 }
5855
5856 /*
5857  * Undo whatever state changes are made by v1 or v2 header info
5858  * call.
5859  */
5860 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5861 {
5862         struct rbd_image_header *header;
5863
5864         rbd_dev_parent_put(rbd_dev);
5865
5866         /* Free dynamic fields from the header, then zero it out */
5867
5868         header = &rbd_dev->header;
5869         ceph_put_snap_context(header->snapc);
5870         kfree(header->snap_sizes);
5871         kfree(header->snap_names);
5872         kfree(header->object_prefix);
5873         memset(header, 0, sizeof (*header));
5874 }
5875
5876 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5877 {
5878         int ret;
5879
5880         ret = rbd_dev_v2_object_prefix(rbd_dev);
5881         if (ret)
5882                 goto out_err;
5883
5884         /*
5885          * Get the and check features for the image.  Currently the
5886          * features are assumed to never change.
5887          */
5888         ret = rbd_dev_v2_features(rbd_dev);
5889         if (ret)
5890                 goto out_err;
5891
5892         /* If the image supports fancy striping, get its parameters */
5893
5894         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5895                 ret = rbd_dev_v2_striping_info(rbd_dev);
5896                 if (ret < 0)
5897                         goto out_err;
5898         }
5899
5900         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5901                 ret = rbd_dev_v2_data_pool(rbd_dev);
5902                 if (ret)
5903                         goto out_err;
5904         }
5905
5906         rbd_init_layout(rbd_dev);
5907         return 0;
5908
5909 out_err:
5910         rbd_dev->header.features = 0;
5911         kfree(rbd_dev->header.object_prefix);
5912         rbd_dev->header.object_prefix = NULL;
5913         return ret;
5914 }
5915
5916 /*
5917  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5918  * rbd_dev_image_probe() recursion depth, which means it's also the
5919  * length of the already discovered part of the parent chain.
5920  */
5921 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5922 {
5923         struct rbd_device *parent = NULL;
5924         int ret;
5925
5926         if (!rbd_dev->parent_spec)
5927                 return 0;
5928
5929         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5930                 pr_info("parent chain is too long (%d)\n", depth);
5931                 ret = -EINVAL;
5932                 goto out_err;
5933         }
5934
5935         parent = __rbd_dev_create(rbd_dev->parent_spec);
5936         if (!parent) {
5937                 ret = -ENOMEM;
5938                 goto out_err;
5939         }
5940
5941         /*
5942          * Images related by parent/child relationships always share
5943          * rbd_client and spec/parent_spec, so bump their refcounts.
5944          */
5945         parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client);
5946         parent->spec = rbd_spec_get(rbd_dev->parent_spec);
5947
5948         ret = rbd_dev_image_probe(parent, depth);
5949         if (ret < 0)
5950                 goto out_err;
5951
5952         rbd_dev->parent = parent;
5953         atomic_set(&rbd_dev->parent_ref, 1);
5954         return 0;
5955
5956 out_err:
5957         rbd_dev_unparent(rbd_dev);
5958         rbd_dev_destroy(parent);
5959         return ret;
5960 }
5961
5962 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5963 {
5964         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5965         rbd_dev_mapping_clear(rbd_dev);
5966         rbd_free_disk(rbd_dev);
5967         if (!single_major)
5968                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5969 }
5970
5971 /*
5972  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5973  * upon return.
5974  */
5975 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5976 {
5977         int ret;
5978
5979         /* Record our major and minor device numbers. */
5980
5981         if (!single_major) {
5982                 ret = register_blkdev(0, rbd_dev->name);
5983                 if (ret < 0)
5984                         goto err_out_unlock;
5985
5986                 rbd_dev->major = ret;
5987                 rbd_dev->minor = 0;
5988         } else {
5989                 rbd_dev->major = rbd_major;
5990                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5991         }
5992
5993         /* Set up the blkdev mapping. */
5994
5995         ret = rbd_init_disk(rbd_dev);
5996         if (ret)
5997                 goto err_out_blkdev;
5998
5999         ret = rbd_dev_mapping_set(rbd_dev);
6000         if (ret)
6001                 goto err_out_disk;
6002
6003         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6004         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
6005
6006         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6007         if (ret)
6008                 goto err_out_mapping;
6009
6010         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6011         up_write(&rbd_dev->header_rwsem);
6012         return 0;
6013
6014 err_out_mapping:
6015         rbd_dev_mapping_clear(rbd_dev);
6016 err_out_disk:
6017         rbd_free_disk(rbd_dev);
6018 err_out_blkdev:
6019         if (!single_major)
6020                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6021 err_out_unlock:
6022         up_write(&rbd_dev->header_rwsem);
6023         return ret;
6024 }
6025
6026 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6027 {
6028         struct rbd_spec *spec = rbd_dev->spec;
6029         int ret;
6030
6031         /* Record the header object name for this rbd image. */
6032
6033         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6034         if (rbd_dev->image_format == 1)
6035                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6036                                        spec->image_name, RBD_SUFFIX);
6037         else
6038                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6039                                        RBD_HEADER_PREFIX, spec->image_id);
6040
6041         return ret;
6042 }
6043
6044 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6045 {
6046         if (rbd_dev->opts)
6047                 rbd_unregister_watch(rbd_dev);
6048
6049         rbd_dev_unprobe(rbd_dev);
6050         rbd_dev->image_format = 0;
6051         kfree(rbd_dev->spec->image_id);
6052         rbd_dev->spec->image_id = NULL;
6053 }
6054
6055 /*
6056  * Probe for the existence of the header object for the given rbd
6057  * device.  If this image is the one being mapped (i.e., not a
6058  * parent), initiate a watch on its header object before using that
6059  * object to get detailed information about the rbd image.
6060  *
6061  * On success, returns with header_rwsem held for write if called
6062  * with @depth == 0.
6063  */
6064 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6065 {
6066         int ret;
6067
6068         /*
6069          * Get the id from the image id object.  Unless there's an
6070          * error, rbd_dev->spec->image_id will be filled in with
6071          * a dynamically-allocated string, and rbd_dev->image_format
6072          * will be set to either 1 or 2.
6073          */
6074         ret = rbd_dev_image_id(rbd_dev);
6075         if (ret)
6076                 return ret;
6077
6078         ret = rbd_dev_header_name(rbd_dev);
6079         if (ret)
6080                 goto err_out_format;
6081
6082         if (!depth) {
6083                 ret = rbd_register_watch(rbd_dev);
6084                 if (ret) {
6085                         if (ret == -ENOENT)
6086                                 pr_info("image %s/%s does not exist\n",
6087                                         rbd_dev->spec->pool_name,
6088                                         rbd_dev->spec->image_name);
6089                         goto err_out_format;
6090                 }
6091         }
6092
6093         if (!depth)
6094                 down_write(&rbd_dev->header_rwsem);
6095
6096         ret = rbd_dev_header_info(rbd_dev);
6097         if (ret)
6098                 goto err_out_probe;
6099
6100         /*
6101          * If this image is the one being mapped, we have pool name and
6102          * id, image name and id, and snap name - need to fill snap id.
6103          * Otherwise this is a parent image, identified by pool, image
6104          * and snap ids - need to fill in names for those ids.
6105          */
6106         if (!depth)
6107                 ret = rbd_spec_fill_snap_id(rbd_dev);
6108         else
6109                 ret = rbd_spec_fill_names(rbd_dev);
6110         if (ret) {
6111                 if (ret == -ENOENT)
6112                         pr_info("snap %s/%s@%s does not exist\n",
6113                                 rbd_dev->spec->pool_name,
6114                                 rbd_dev->spec->image_name,
6115                                 rbd_dev->spec->snap_name);
6116                 goto err_out_probe;
6117         }
6118
6119         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6120                 ret = rbd_dev_v2_parent_info(rbd_dev);
6121                 if (ret)
6122                         goto err_out_probe;
6123
6124                 /*
6125                  * Need to warn users if this image is the one being
6126                  * mapped and has a parent.
6127                  */
6128                 if (!depth && rbd_dev->parent_spec)
6129                         rbd_warn(rbd_dev,
6130                                  "WARNING: kernel layering is EXPERIMENTAL!");
6131         }
6132
6133         ret = rbd_dev_probe_parent(rbd_dev, depth);
6134         if (ret)
6135                 goto err_out_probe;
6136
6137         dout("discovered format %u image, header name is %s\n",
6138                 rbd_dev->image_format, rbd_dev->header_oid.name);
6139         return 0;
6140
6141 err_out_probe:
6142         if (!depth)
6143                 up_write(&rbd_dev->header_rwsem);
6144         if (!depth)
6145                 rbd_unregister_watch(rbd_dev);
6146         rbd_dev_unprobe(rbd_dev);
6147 err_out_format:
6148         rbd_dev->image_format = 0;
6149         kfree(rbd_dev->spec->image_id);
6150         rbd_dev->spec->image_id = NULL;
6151         return ret;
6152 }
6153
6154 static ssize_t do_rbd_add(struct bus_type *bus,
6155                           const char *buf,
6156                           size_t count)
6157 {
6158         struct rbd_device *rbd_dev = NULL;
6159         struct ceph_options *ceph_opts = NULL;
6160         struct rbd_options *rbd_opts = NULL;
6161         struct rbd_spec *spec = NULL;
6162         struct rbd_client *rbdc;
6163         bool read_only;
6164         int rc;
6165
6166         if (!capable(CAP_SYS_ADMIN))
6167                 return -EPERM;
6168
6169         if (!try_module_get(THIS_MODULE))
6170                 return -ENODEV;
6171
6172         /* parse add command */
6173         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6174         if (rc < 0)
6175                 goto out;
6176
6177         rbdc = rbd_get_client(ceph_opts);
6178         if (IS_ERR(rbdc)) {
6179                 rc = PTR_ERR(rbdc);
6180                 goto err_out_args;
6181         }
6182
6183         /* pick the pool */
6184         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6185         if (rc < 0) {
6186                 if (rc == -ENOENT)
6187                         pr_info("pool %s does not exist\n", spec->pool_name);
6188                 goto err_out_client;
6189         }
6190         spec->pool_id = (u64)rc;
6191
6192         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6193         if (!rbd_dev) {
6194                 rc = -ENOMEM;
6195                 goto err_out_client;
6196         }
6197         rbdc = NULL;            /* rbd_dev now owns this */
6198         spec = NULL;            /* rbd_dev now owns this */
6199         rbd_opts = NULL;        /* rbd_dev now owns this */
6200
6201         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6202         if (!rbd_dev->config_info) {
6203                 rc = -ENOMEM;
6204                 goto err_out_rbd_dev;
6205         }
6206
6207         rc = rbd_dev_image_probe(rbd_dev, 0);
6208         if (rc < 0)
6209                 goto err_out_rbd_dev;
6210
6211         /* If we are mapping a snapshot it must be marked read-only */
6212
6213         read_only = rbd_dev->opts->read_only;
6214         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6215                 read_only = true;
6216         rbd_dev->mapping.read_only = read_only;
6217
6218         rc = rbd_dev_device_setup(rbd_dev);
6219         if (rc)
6220                 goto err_out_image_probe;
6221
6222         if (rbd_dev->opts->exclusive) {
6223                 rc = rbd_add_acquire_lock(rbd_dev);
6224                 if (rc)
6225                         goto err_out_device_setup;
6226         }
6227
6228         /* Everything's ready.  Announce the disk to the world. */
6229
6230         rc = device_add(&rbd_dev->dev);
6231         if (rc)
6232                 goto err_out_image_lock;
6233
6234         add_disk(rbd_dev->disk);
6235         /* see rbd_init_disk() */
6236         blk_put_queue(rbd_dev->disk->queue);
6237
6238         spin_lock(&rbd_dev_list_lock);
6239         list_add_tail(&rbd_dev->node, &rbd_dev_list);
6240         spin_unlock(&rbd_dev_list_lock);
6241
6242         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6243                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6244                 rbd_dev->header.features);
6245         rc = count;
6246 out:
6247         module_put(THIS_MODULE);
6248         return rc;
6249
6250 err_out_image_lock:
6251         rbd_dev_image_unlock(rbd_dev);
6252 err_out_device_setup:
6253         rbd_dev_device_release(rbd_dev);
6254 err_out_image_probe:
6255         rbd_dev_image_release(rbd_dev);
6256 err_out_rbd_dev:
6257         rbd_dev_destroy(rbd_dev);
6258 err_out_client:
6259         rbd_put_client(rbdc);
6260 err_out_args:
6261         rbd_spec_put(spec);
6262         kfree(rbd_opts);
6263         goto out;
6264 }
6265
6266 static ssize_t rbd_add(struct bus_type *bus,
6267                        const char *buf,
6268                        size_t count)
6269 {
6270         if (single_major)
6271                 return -EINVAL;
6272
6273         return do_rbd_add(bus, buf, count);
6274 }
6275
6276 static ssize_t rbd_add_single_major(struct bus_type *bus,
6277                                     const char *buf,
6278                                     size_t count)
6279 {
6280         return do_rbd_add(bus, buf, count);
6281 }
6282
6283 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6284 {
6285         while (rbd_dev->parent) {
6286                 struct rbd_device *first = rbd_dev;
6287                 struct rbd_device *second = first->parent;
6288                 struct rbd_device *third;
6289
6290                 /*
6291                  * Follow to the parent with no grandparent and
6292                  * remove it.
6293                  */
6294                 while (second && (third = second->parent)) {
6295                         first = second;
6296                         second = third;
6297                 }
6298                 rbd_assert(second);
6299                 rbd_dev_image_release(second);
6300                 rbd_dev_destroy(second);
6301                 first->parent = NULL;
6302                 first->parent_overlap = 0;
6303
6304                 rbd_assert(first->parent_spec);
6305                 rbd_spec_put(first->parent_spec);
6306                 first->parent_spec = NULL;
6307         }
6308 }
6309
6310 static ssize_t do_rbd_remove(struct bus_type *bus,
6311                              const char *buf,
6312                              size_t count)
6313 {
6314         struct rbd_device *rbd_dev = NULL;
6315         struct list_head *tmp;
6316         int dev_id;
6317         char opt_buf[6];
6318         bool force = false;
6319         int ret;
6320
6321         if (!capable(CAP_SYS_ADMIN))
6322                 return -EPERM;
6323
6324         dev_id = -1;
6325         opt_buf[0] = '\0';
6326         sscanf(buf, "%d %5s", &dev_id, opt_buf);
6327         if (dev_id < 0) {
6328                 pr_err("dev_id out of range\n");
6329                 return -EINVAL;
6330         }
6331         if (opt_buf[0] != '\0') {
6332                 if (!strcmp(opt_buf, "force")) {
6333                         force = true;
6334                 } else {
6335                         pr_err("bad remove option at '%s'\n", opt_buf);
6336                         return -EINVAL;
6337                 }
6338         }
6339
6340         ret = -ENOENT;
6341         spin_lock(&rbd_dev_list_lock);
6342         list_for_each(tmp, &rbd_dev_list) {
6343                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6344                 if (rbd_dev->dev_id == dev_id) {
6345                         ret = 0;
6346                         break;
6347                 }
6348         }
6349         if (!ret) {
6350                 spin_lock_irq(&rbd_dev->lock);
6351                 if (rbd_dev->open_count && !force)
6352                         ret = -EBUSY;
6353                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6354                                           &rbd_dev->flags))
6355                         ret = -EINPROGRESS;
6356                 spin_unlock_irq(&rbd_dev->lock);
6357         }
6358         spin_unlock(&rbd_dev_list_lock);
6359         if (ret)
6360                 return ret;
6361
6362         if (force) {
6363                 /*
6364                  * Prevent new IO from being queued and wait for existing
6365                  * IO to complete/fail.
6366                  */
6367                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6368                 blk_set_queue_dying(rbd_dev->disk->queue);
6369         }
6370
6371         del_gendisk(rbd_dev->disk);
6372         spin_lock(&rbd_dev_list_lock);
6373         list_del_init(&rbd_dev->node);
6374         spin_unlock(&rbd_dev_list_lock);
6375         device_del(&rbd_dev->dev);
6376
6377         rbd_dev_image_unlock(rbd_dev);
6378         rbd_dev_device_release(rbd_dev);
6379         rbd_dev_image_release(rbd_dev);
6380         rbd_dev_destroy(rbd_dev);
6381         return count;
6382 }
6383
6384 static ssize_t rbd_remove(struct bus_type *bus,
6385                           const char *buf,
6386                           size_t count)
6387 {
6388         if (single_major)
6389                 return -EINVAL;
6390
6391         return do_rbd_remove(bus, buf, count);
6392 }
6393
6394 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6395                                        const char *buf,
6396                                        size_t count)
6397 {
6398         return do_rbd_remove(bus, buf, count);
6399 }
6400
6401 /*
6402  * create control files in sysfs
6403  * /sys/bus/rbd/...
6404  */
6405 static int rbd_sysfs_init(void)
6406 {
6407         int ret;
6408
6409         ret = device_register(&rbd_root_dev);
6410         if (ret < 0)
6411                 return ret;
6412
6413         ret = bus_register(&rbd_bus_type);
6414         if (ret < 0)
6415                 device_unregister(&rbd_root_dev);
6416
6417         return ret;
6418 }
6419
6420 static void rbd_sysfs_cleanup(void)
6421 {
6422         bus_unregister(&rbd_bus_type);
6423         device_unregister(&rbd_root_dev);
6424 }
6425
6426 static int rbd_slab_init(void)
6427 {
6428         rbd_assert(!rbd_img_request_cache);
6429         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6430         if (!rbd_img_request_cache)
6431                 return -ENOMEM;
6432
6433         rbd_assert(!rbd_obj_request_cache);
6434         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6435         if (!rbd_obj_request_cache)
6436                 goto out_err;
6437
6438         rbd_assert(!rbd_bio_clone);
6439         rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6440         if (!rbd_bio_clone)
6441                 goto out_err_clone;
6442
6443         return 0;
6444
6445 out_err_clone:
6446         kmem_cache_destroy(rbd_obj_request_cache);
6447         rbd_obj_request_cache = NULL;
6448 out_err:
6449         kmem_cache_destroy(rbd_img_request_cache);
6450         rbd_img_request_cache = NULL;
6451         return -ENOMEM;
6452 }
6453
6454 static void rbd_slab_exit(void)
6455 {
6456         rbd_assert(rbd_obj_request_cache);
6457         kmem_cache_destroy(rbd_obj_request_cache);
6458         rbd_obj_request_cache = NULL;
6459
6460         rbd_assert(rbd_img_request_cache);
6461         kmem_cache_destroy(rbd_img_request_cache);
6462         rbd_img_request_cache = NULL;
6463
6464         rbd_assert(rbd_bio_clone);
6465         bioset_free(rbd_bio_clone);
6466         rbd_bio_clone = NULL;
6467 }
6468
6469 static int __init rbd_init(void)
6470 {
6471         int rc;
6472
6473         if (!libceph_compatible(NULL)) {
6474                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6475                 return -EINVAL;
6476         }
6477
6478         rc = rbd_slab_init();
6479         if (rc)
6480                 return rc;
6481
6482         /*
6483          * The number of active work items is limited by the number of
6484          * rbd devices * queue depth, so leave @max_active at default.
6485          */
6486         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6487         if (!rbd_wq) {
6488                 rc = -ENOMEM;
6489                 goto err_out_slab;
6490         }
6491
6492         if (single_major) {
6493                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6494                 if (rbd_major < 0) {
6495                         rc = rbd_major;
6496                         goto err_out_wq;
6497                 }
6498         }
6499
6500         rc = rbd_sysfs_init();
6501         if (rc)
6502                 goto err_out_blkdev;
6503
6504         if (single_major)
6505                 pr_info("loaded (major %d)\n", rbd_major);
6506         else
6507                 pr_info("loaded\n");
6508
6509         return 0;
6510
6511 err_out_blkdev:
6512         if (single_major)
6513                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6514 err_out_wq:
6515         destroy_workqueue(rbd_wq);
6516 err_out_slab:
6517         rbd_slab_exit();
6518         return rc;
6519 }
6520
6521 static void __exit rbd_exit(void)
6522 {
6523         ida_destroy(&rbd_dev_id_ida);
6524         rbd_sysfs_cleanup();
6525         if (single_major)
6526                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6527         destroy_workqueue(rbd_wq);
6528         rbd_slab_exit();
6529 }
6530
6531 module_init(rbd_init);
6532 module_exit(rbd_exit);
6533
6534 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6535 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6536 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6537 /* following authorship retained from original osdblk.c */
6538 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6539
6540 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6541 MODULE_LICENSE("GPL");