3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
49 #include "rbd_types.h"
51 #define RBD_DEBUG /* Activate rbd_assert() calls */
54 * Increment the given counter and return its updated value.
55 * If the counter is already 0 it will not be incremented.
56 * If the counter is already at its maximum value returns
57 * -EINVAL without updating it.
59 static int atomic_inc_return_safe(atomic_t *v)
63 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
64 if (counter <= (unsigned int)INT_MAX)
72 /* Decrement the counter. Return the resulting value, or -EINVAL */
73 static int atomic_dec_return_safe(atomic_t *v)
77 counter = atomic_dec_return(v);
86 #define RBD_DRV_NAME "rbd"
88 #define RBD_MINORS_PER_MAJOR 256
89 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
91 #define RBD_MAX_PARENT_CHAIN_LEN 16
93 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
94 #define RBD_MAX_SNAP_NAME_LEN \
95 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
99 #define RBD_SNAP_HEAD_NAME "-"
101 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
103 /* This allows a single page to hold an image name sent by OSD */
104 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
105 #define RBD_IMAGE_ID_LEN_MAX 64
107 #define RBD_OBJ_PREFIX_LEN_MAX 64
109 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */
110 #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
114 #define RBD_FEATURE_LAYERING (1ULL<<0)
115 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
116 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
117 #define RBD_FEATURE_DATA_POOL (1ULL<<7)
118 #define RBD_FEATURE_OPERATIONS (1ULL<<8)
120 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
121 RBD_FEATURE_STRIPINGV2 | \
122 RBD_FEATURE_EXCLUSIVE_LOCK | \
123 RBD_FEATURE_DATA_POOL | \
124 RBD_FEATURE_OPERATIONS)
126 /* Features supported by this (client software) implementation. */
128 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
131 * An RBD device name will be "rbd#", where the "rbd" comes from
132 * RBD_DRV_NAME above, and # is a unique integer identifier.
134 #define DEV_NAME_LEN 32
137 * block device image metadata (in-memory version)
139 struct rbd_image_header {
140 /* These six fields never change for a given rbd image */
146 u64 features; /* Might be changeable someday? */
148 /* The remaining fields need to be updated occasionally */
150 struct ceph_snap_context *snapc;
151 char *snap_names; /* format 1 only */
152 u64 *snap_sizes; /* format 1 only */
156 * An rbd image specification.
158 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
159 * identify an image. Each rbd_dev structure includes a pointer to
160 * an rbd_spec structure that encapsulates this identity.
162 * Each of the id's in an rbd_spec has an associated name. For a
163 * user-mapped image, the names are supplied and the id's associated
164 * with them are looked up. For a layered image, a parent image is
165 * defined by the tuple, and the names are looked up.
167 * An rbd_dev structure contains a parent_spec pointer which is
168 * non-null if the image it represents is a child in a layered
169 * image. This pointer will refer to the rbd_spec structure used
170 * by the parent rbd_dev for its own identity (i.e., the structure
171 * is shared between the parent and child).
173 * Since these structures are populated once, during the discovery
174 * phase of image construction, they are effectively immutable so
175 * we make no effort to synchronize access to them.
177 * Note that code herein does not assume the image name is known (it
178 * could be a null pointer).
182 const char *pool_name;
184 const char *image_id;
185 const char *image_name;
188 const char *snap_name;
194 * an instance of the client. multiple devices may share an rbd client.
197 struct ceph_client *client;
199 struct list_head node;
202 struct rbd_img_request;
203 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
205 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
207 struct rbd_obj_request;
208 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
210 enum obj_request_type {
211 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
214 enum obj_operation_type {
221 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
222 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
223 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
224 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
227 struct rbd_obj_request {
229 u64 offset; /* object start byte */
230 u64 length; /* bytes from offset */
234 * An object request associated with an image will have its
235 * img_data flag set; a standalone object request will not.
237 * A standalone object request will have which == BAD_WHICH
238 * and a null obj_request pointer.
240 * An object request initiated in support of a layered image
241 * object (to check for its existence before a write) will
242 * have which == BAD_WHICH and a non-null obj_request pointer.
244 * Finally, an object request for rbd image data will have
245 * which != BAD_WHICH, and will have a non-null img_request
246 * pointer. The value of which will be in the range
247 * 0..(img_request->obj_request_count-1).
250 struct rbd_obj_request *obj_request; /* STAT op */
252 struct rbd_img_request *img_request;
254 /* links for img_request->obj_requests list */
255 struct list_head links;
258 u32 which; /* posn image request list */
260 enum obj_request_type type;
262 struct bio *bio_list;
268 struct page **copyup_pages;
269 u32 copyup_page_count;
271 struct ceph_osd_request *osd_req;
273 u64 xferred; /* bytes transferred */
276 rbd_obj_callback_t callback;
277 struct completion completion;
283 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
284 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
285 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
286 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
289 struct rbd_img_request {
290 struct rbd_device *rbd_dev;
291 u64 offset; /* starting image byte offset */
292 u64 length; /* byte count from offset */
295 u64 snap_id; /* for reads */
296 struct ceph_snap_context *snapc; /* for writes */
299 struct request *rq; /* block request */
300 struct rbd_obj_request *obj_request; /* obj req initiator */
302 struct page **copyup_pages;
303 u32 copyup_page_count;
304 spinlock_t completion_lock;/* protects next_completion */
306 rbd_img_callback_t callback;
307 u64 xferred;/* aggregate bytes transferred */
308 int result; /* first nonzero obj_request result */
310 u32 obj_request_count;
311 struct list_head obj_requests; /* rbd_obj_request structs */
316 #define for_each_obj_request(ireq, oreq) \
317 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
318 #define for_each_obj_request_from(ireq, oreq) \
319 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
320 #define for_each_obj_request_safe(ireq, oreq, n) \
321 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
323 enum rbd_watch_state {
324 RBD_WATCH_STATE_UNREGISTERED,
325 RBD_WATCH_STATE_REGISTERED,
326 RBD_WATCH_STATE_ERROR,
329 enum rbd_lock_state {
330 RBD_LOCK_STATE_UNLOCKED,
331 RBD_LOCK_STATE_LOCKED,
332 RBD_LOCK_STATE_RELEASING,
335 /* WatchNotify::ClientId */
336 struct rbd_client_id {
351 int dev_id; /* blkdev unique id */
353 int major; /* blkdev assigned major */
355 struct gendisk *disk; /* blkdev's gendisk and rq */
357 u32 image_format; /* Either 1 or 2 */
358 struct rbd_client *rbd_client;
360 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
362 spinlock_t lock; /* queue, flags, open_count */
364 struct rbd_image_header header;
365 unsigned long flags; /* possibly lock protected */
366 struct rbd_spec *spec;
367 struct rbd_options *opts;
368 char *config_info; /* add{,_single_major} string */
370 struct ceph_object_id header_oid;
371 struct ceph_object_locator header_oloc;
373 struct ceph_file_layout layout; /* used for all rbd requests */
375 struct mutex watch_mutex;
376 enum rbd_watch_state watch_state;
377 struct ceph_osd_linger_request *watch_handle;
379 struct delayed_work watch_dwork;
381 struct rw_semaphore lock_rwsem;
382 enum rbd_lock_state lock_state;
383 char lock_cookie[32];
384 struct rbd_client_id owner_cid;
385 struct work_struct acquired_lock_work;
386 struct work_struct released_lock_work;
387 struct delayed_work lock_dwork;
388 struct work_struct unlock_work;
389 wait_queue_head_t lock_waitq;
391 struct workqueue_struct *task_wq;
393 struct rbd_spec *parent_spec;
396 struct rbd_device *parent;
398 /* Block layer tags. */
399 struct blk_mq_tag_set tag_set;
401 /* protects updating the header */
402 struct rw_semaphore header_rwsem;
404 struct rbd_mapping mapping;
406 struct list_head node;
410 unsigned long open_count; /* protected by lock */
414 * Flag bits for rbd_dev->flags:
415 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
417 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
420 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
421 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
422 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
425 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
427 static LIST_HEAD(rbd_dev_list); /* devices */
428 static DEFINE_SPINLOCK(rbd_dev_list_lock);
430 static LIST_HEAD(rbd_client_list); /* clients */
431 static DEFINE_SPINLOCK(rbd_client_list_lock);
433 /* Slab caches for frequently-allocated structures */
435 static struct kmem_cache *rbd_img_request_cache;
436 static struct kmem_cache *rbd_obj_request_cache;
438 static struct bio_set *rbd_bio_clone;
440 static int rbd_major;
441 static DEFINE_IDA(rbd_dev_id_ida);
443 static struct workqueue_struct *rbd_wq;
446 * Default to false for now, as single-major requires >= 0.75 version of
447 * userspace rbd utility.
449 static bool single_major = false;
450 module_param(single_major, bool, S_IRUGO);
451 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
453 static int rbd_img_request_submit(struct rbd_img_request *img_request);
455 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
457 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
459 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
461 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
463 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
464 static void rbd_spec_put(struct rbd_spec *spec);
466 static int rbd_dev_id_to_minor(int dev_id)
468 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
471 static int minor_to_rbd_dev_id(int minor)
473 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
476 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
478 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
479 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
482 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
486 down_read(&rbd_dev->lock_rwsem);
487 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
488 up_read(&rbd_dev->lock_rwsem);
489 return is_lock_owner;
492 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
494 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
497 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
498 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
499 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
500 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
501 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
503 static struct attribute *rbd_bus_attrs[] = {
505 &bus_attr_remove.attr,
506 &bus_attr_add_single_major.attr,
507 &bus_attr_remove_single_major.attr,
508 &bus_attr_supported_features.attr,
512 static umode_t rbd_bus_is_visible(struct kobject *kobj,
513 struct attribute *attr, int index)
516 (attr == &bus_attr_add_single_major.attr ||
517 attr == &bus_attr_remove_single_major.attr))
523 static const struct attribute_group rbd_bus_group = {
524 .attrs = rbd_bus_attrs,
525 .is_visible = rbd_bus_is_visible,
527 __ATTRIBUTE_GROUPS(rbd_bus);
529 static struct bus_type rbd_bus_type = {
531 .bus_groups = rbd_bus_groups,
534 static void rbd_root_dev_release(struct device *dev)
538 static struct device rbd_root_dev = {
540 .release = rbd_root_dev_release,
543 static __printf(2, 3)
544 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
546 struct va_format vaf;
554 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
555 else if (rbd_dev->disk)
556 printk(KERN_WARNING "%s: %s: %pV\n",
557 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
558 else if (rbd_dev->spec && rbd_dev->spec->image_name)
559 printk(KERN_WARNING "%s: image %s: %pV\n",
560 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
561 else if (rbd_dev->spec && rbd_dev->spec->image_id)
562 printk(KERN_WARNING "%s: id %s: %pV\n",
563 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
565 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
566 RBD_DRV_NAME, rbd_dev, &vaf);
571 #define rbd_assert(expr) \
572 if (unlikely(!(expr))) { \
573 printk(KERN_ERR "\nAssertion failure in %s() " \
575 "\trbd_assert(%s);\n\n", \
576 __func__, __LINE__, #expr); \
579 #else /* !RBD_DEBUG */
580 # define rbd_assert(expr) ((void) 0)
581 #endif /* !RBD_DEBUG */
583 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
584 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
585 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
586 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
588 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
589 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
590 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
591 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
592 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
594 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
595 u8 *order, u64 *snap_size);
596 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
599 static int rbd_open(struct block_device *bdev, fmode_t mode)
601 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602 bool removing = false;
604 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
607 spin_lock_irq(&rbd_dev->lock);
608 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
611 rbd_dev->open_count++;
612 spin_unlock_irq(&rbd_dev->lock);
616 (void) get_device(&rbd_dev->dev);
621 static void rbd_release(struct gendisk *disk, fmode_t mode)
623 struct rbd_device *rbd_dev = disk->private_data;
624 unsigned long open_count_before;
626 spin_lock_irq(&rbd_dev->lock);
627 open_count_before = rbd_dev->open_count--;
628 spin_unlock_irq(&rbd_dev->lock);
629 rbd_assert(open_count_before > 0);
631 put_device(&rbd_dev->dev);
634 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
639 bool ro_changed = false;
641 /* get_user() may sleep, so call it before taking rbd_dev->lock */
642 if (get_user(val, (int __user *)(arg)))
645 ro = val ? true : false;
646 /* Snapshot doesn't allow to write*/
647 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
650 spin_lock_irq(&rbd_dev->lock);
651 /* prevent others open this device */
652 if (rbd_dev->open_count > 1) {
657 if (rbd_dev->mapping.read_only != ro) {
658 rbd_dev->mapping.read_only = ro;
663 spin_unlock_irq(&rbd_dev->lock);
664 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
665 if (ret == 0 && ro_changed)
666 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
671 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
672 unsigned int cmd, unsigned long arg)
674 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
679 ret = rbd_ioctl_set_ro(rbd_dev, arg);
689 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
690 unsigned int cmd, unsigned long arg)
692 return rbd_ioctl(bdev, mode, cmd, arg);
694 #endif /* CONFIG_COMPAT */
696 static const struct block_device_operations rbd_bd_ops = {
697 .owner = THIS_MODULE,
699 .release = rbd_release,
702 .compat_ioctl = rbd_compat_ioctl,
707 * Initialize an rbd client instance. Success or not, this function
708 * consumes ceph_opts. Caller holds client_mutex.
710 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
712 struct rbd_client *rbdc;
715 dout("%s:\n", __func__);
716 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
720 kref_init(&rbdc->kref);
721 INIT_LIST_HEAD(&rbdc->node);
723 rbdc->client = ceph_create_client(ceph_opts, rbdc);
724 if (IS_ERR(rbdc->client))
726 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
728 ret = ceph_open_session(rbdc->client);
732 spin_lock(&rbd_client_list_lock);
733 list_add_tail(&rbdc->node, &rbd_client_list);
734 spin_unlock(&rbd_client_list_lock);
736 dout("%s: rbdc %p\n", __func__, rbdc);
740 ceph_destroy_client(rbdc->client);
745 ceph_destroy_options(ceph_opts);
746 dout("%s: error %d\n", __func__, ret);
751 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
753 kref_get(&rbdc->kref);
759 * Find a ceph client with specific addr and configuration. If
760 * found, bump its reference count.
762 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
764 struct rbd_client *client_node;
767 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
770 spin_lock(&rbd_client_list_lock);
771 list_for_each_entry(client_node, &rbd_client_list, node) {
772 if (!ceph_compare_options(ceph_opts, client_node->client)) {
773 __rbd_get_client(client_node);
779 spin_unlock(&rbd_client_list_lock);
781 return found ? client_node : NULL;
785 * (Per device) rbd map options
792 /* string args above */
800 static match_table_t rbd_opts_tokens = {
801 {Opt_queue_depth, "queue_depth=%d"},
803 /* string args above */
804 {Opt_read_only, "read_only"},
805 {Opt_read_only, "ro"}, /* Alternate spelling */
806 {Opt_read_write, "read_write"},
807 {Opt_read_write, "rw"}, /* Alternate spelling */
808 {Opt_lock_on_read, "lock_on_read"},
809 {Opt_exclusive, "exclusive"},
820 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
821 #define RBD_READ_ONLY_DEFAULT false
822 #define RBD_LOCK_ON_READ_DEFAULT false
823 #define RBD_EXCLUSIVE_DEFAULT false
825 static int parse_rbd_opts_token(char *c, void *private)
827 struct rbd_options *rbd_opts = private;
828 substring_t argstr[MAX_OPT_ARGS];
829 int token, intval, ret;
831 token = match_token(c, rbd_opts_tokens, argstr);
832 if (token < Opt_last_int) {
833 ret = match_int(&argstr[0], &intval);
835 pr_err("bad mount option arg (not int) at '%s'\n", c);
838 dout("got int token %d val %d\n", token, intval);
839 } else if (token > Opt_last_int && token < Opt_last_string) {
840 dout("got string token %d val %s\n", token, argstr[0].from);
842 dout("got token %d\n", token);
846 case Opt_queue_depth:
848 pr_err("queue_depth out of range\n");
851 rbd_opts->queue_depth = intval;
854 rbd_opts->read_only = true;
857 rbd_opts->read_only = false;
859 case Opt_lock_on_read:
860 rbd_opts->lock_on_read = true;
863 rbd_opts->exclusive = true;
866 /* libceph prints "bad option" msg */
873 static char* obj_op_name(enum obj_operation_type op_type)
888 * Get a ceph client with specific addr and configuration, if one does
889 * not exist create it. Either way, ceph_opts is consumed by this
892 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
894 struct rbd_client *rbdc;
896 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
897 rbdc = rbd_client_find(ceph_opts);
898 if (rbdc) /* using an existing client */
899 ceph_destroy_options(ceph_opts);
901 rbdc = rbd_client_create(ceph_opts);
902 mutex_unlock(&client_mutex);
908 * Destroy ceph client
910 * Caller must hold rbd_client_list_lock.
912 static void rbd_client_release(struct kref *kref)
914 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
916 dout("%s: rbdc %p\n", __func__, rbdc);
917 spin_lock(&rbd_client_list_lock);
918 list_del(&rbdc->node);
919 spin_unlock(&rbd_client_list_lock);
921 ceph_destroy_client(rbdc->client);
926 * Drop reference to ceph client node. If it's not referenced anymore, release
929 static void rbd_put_client(struct rbd_client *rbdc)
932 kref_put(&rbdc->kref, rbd_client_release);
935 static bool rbd_image_format_valid(u32 image_format)
937 return image_format == 1 || image_format == 2;
940 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
945 /* The header has to start with the magic rbd header text */
946 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
949 /* The bio layer requires at least sector-sized I/O */
951 if (ondisk->options.order < SECTOR_SHIFT)
954 /* If we use u64 in a few spots we may be able to loosen this */
956 if (ondisk->options.order > 8 * sizeof (int) - 1)
960 * The size of a snapshot header has to fit in a size_t, and
961 * that limits the number of snapshots.
963 snap_count = le32_to_cpu(ondisk->snap_count);
964 size = SIZE_MAX - sizeof (struct ceph_snap_context);
965 if (snap_count > size / sizeof (__le64))
969 * Not only that, but the size of the entire the snapshot
970 * header must also be representable in a size_t.
972 size -= snap_count * sizeof (__le64);
973 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
980 * returns the size of an object in the image
982 static u32 rbd_obj_bytes(struct rbd_image_header *header)
984 return 1U << header->obj_order;
987 static void rbd_init_layout(struct rbd_device *rbd_dev)
989 if (rbd_dev->header.stripe_unit == 0 ||
990 rbd_dev->header.stripe_count == 0) {
991 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
992 rbd_dev->header.stripe_count = 1;
995 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
996 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
997 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
998 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
999 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1000 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1004 * Fill an rbd image header with information from the given format 1
1007 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1008 struct rbd_image_header_ondisk *ondisk)
1010 struct rbd_image_header *header = &rbd_dev->header;
1011 bool first_time = header->object_prefix == NULL;
1012 struct ceph_snap_context *snapc;
1013 char *object_prefix = NULL;
1014 char *snap_names = NULL;
1015 u64 *snap_sizes = NULL;
1020 /* Allocate this now to avoid having to handle failure below */
1023 object_prefix = kstrndup(ondisk->object_prefix,
1024 sizeof(ondisk->object_prefix),
1030 /* Allocate the snapshot context and fill it in */
1032 snap_count = le32_to_cpu(ondisk->snap_count);
1033 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1036 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1038 struct rbd_image_snap_ondisk *snaps;
1039 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1041 /* We'll keep a copy of the snapshot names... */
1043 if (snap_names_len > (u64)SIZE_MAX)
1045 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1049 /* ...as well as the array of their sizes. */
1050 snap_sizes = kmalloc_array(snap_count,
1051 sizeof(*header->snap_sizes),
1057 * Copy the names, and fill in each snapshot's id
1060 * Note that rbd_dev_v1_header_info() guarantees the
1061 * ondisk buffer we're working with has
1062 * snap_names_len bytes beyond the end of the
1063 * snapshot id array, this memcpy() is safe.
1065 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1066 snaps = ondisk->snaps;
1067 for (i = 0; i < snap_count; i++) {
1068 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1069 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1073 /* We won't fail any more, fill in the header */
1076 header->object_prefix = object_prefix;
1077 header->obj_order = ondisk->options.order;
1078 rbd_init_layout(rbd_dev);
1080 ceph_put_snap_context(header->snapc);
1081 kfree(header->snap_names);
1082 kfree(header->snap_sizes);
1085 /* The remaining fields always get updated (when we refresh) */
1087 header->image_size = le64_to_cpu(ondisk->image_size);
1088 header->snapc = snapc;
1089 header->snap_names = snap_names;
1090 header->snap_sizes = snap_sizes;
1098 ceph_put_snap_context(snapc);
1099 kfree(object_prefix);
1104 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1106 const char *snap_name;
1108 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1110 /* Skip over names until we find the one we are looking for */
1112 snap_name = rbd_dev->header.snap_names;
1114 snap_name += strlen(snap_name) + 1;
1116 return kstrdup(snap_name, GFP_KERNEL);
1120 * Snapshot id comparison function for use with qsort()/bsearch().
1121 * Note that result is for snapshots in *descending* order.
1123 static int snapid_compare_reverse(const void *s1, const void *s2)
1125 u64 snap_id1 = *(u64 *)s1;
1126 u64 snap_id2 = *(u64 *)s2;
1128 if (snap_id1 < snap_id2)
1130 return snap_id1 == snap_id2 ? 0 : -1;
1134 * Search a snapshot context to see if the given snapshot id is
1137 * Returns the position of the snapshot id in the array if it's found,
1138 * or BAD_SNAP_INDEX otherwise.
1140 * Note: The snapshot array is in kept sorted (by the osd) in
1141 * reverse order, highest snapshot id first.
1143 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1145 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1148 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1149 sizeof (snap_id), snapid_compare_reverse);
1151 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1154 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1158 const char *snap_name;
1160 which = rbd_dev_snap_index(rbd_dev, snap_id);
1161 if (which == BAD_SNAP_INDEX)
1162 return ERR_PTR(-ENOENT);
1164 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1165 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1168 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1170 if (snap_id == CEPH_NOSNAP)
1171 return RBD_SNAP_HEAD_NAME;
1173 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1174 if (rbd_dev->image_format == 1)
1175 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1177 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1180 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1183 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1184 if (snap_id == CEPH_NOSNAP) {
1185 *snap_size = rbd_dev->header.image_size;
1186 } else if (rbd_dev->image_format == 1) {
1189 which = rbd_dev_snap_index(rbd_dev, snap_id);
1190 if (which == BAD_SNAP_INDEX)
1193 *snap_size = rbd_dev->header.snap_sizes[which];
1198 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1207 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1210 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1211 if (snap_id == CEPH_NOSNAP) {
1212 *snap_features = rbd_dev->header.features;
1213 } else if (rbd_dev->image_format == 1) {
1214 *snap_features = 0; /* No features for format 1 */
1219 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1223 *snap_features = features;
1228 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1230 u64 snap_id = rbd_dev->spec->snap_id;
1235 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1238 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1242 rbd_dev->mapping.size = size;
1243 rbd_dev->mapping.features = features;
1248 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1250 rbd_dev->mapping.size = 0;
1251 rbd_dev->mapping.features = 0;
1254 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1256 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1258 return offset & (segment_size - 1);
1261 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1262 u64 offset, u64 length)
1264 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1266 offset &= segment_size - 1;
1268 rbd_assert(length <= U64_MAX - offset);
1269 if (offset + length > segment_size)
1270 length = segment_size - offset;
1279 static void bio_chain_put(struct bio *chain)
1285 chain = chain->bi_next;
1291 * zeros a bio chain, starting at specific offset
1293 static void zero_bio_chain(struct bio *chain, int start_ofs)
1296 struct bvec_iter iter;
1297 unsigned long flags;
1302 bio_for_each_segment(bv, chain, iter) {
1303 if (pos + bv.bv_len > start_ofs) {
1304 int remainder = max(start_ofs - pos, 0);
1305 buf = bvec_kmap_irq(&bv, &flags);
1306 memset(buf + remainder, 0,
1307 bv.bv_len - remainder);
1308 flush_dcache_page(bv.bv_page);
1309 bvec_kunmap_irq(buf, &flags);
1314 chain = chain->bi_next;
1319 * similar to zero_bio_chain(), zeros data defined by a page array,
1320 * starting at the given byte offset from the start of the array and
1321 * continuing up to the given end offset. The pages array is
1322 * assumed to be big enough to hold all bytes up to the end.
1324 static void zero_pages(struct page **pages, u64 offset, u64 end)
1326 struct page **page = &pages[offset >> PAGE_SHIFT];
1328 rbd_assert(end > offset);
1329 rbd_assert(end - offset <= (u64)SIZE_MAX);
1330 while (offset < end) {
1333 unsigned long flags;
1336 page_offset = offset & ~PAGE_MASK;
1337 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1338 local_irq_save(flags);
1339 kaddr = kmap_atomic(*page);
1340 memset(kaddr + page_offset, 0, length);
1341 flush_dcache_page(*page);
1342 kunmap_atomic(kaddr);
1343 local_irq_restore(flags);
1351 * Clone a portion of a bio, starting at the given byte offset
1352 * and continuing for the number of bytes indicated.
1354 static struct bio *bio_clone_range(struct bio *bio_src,
1355 unsigned int offset,
1361 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1363 return NULL; /* ENOMEM */
1365 bio_advance(bio, offset);
1366 bio->bi_iter.bi_size = len;
1372 * Clone a portion of a bio chain, starting at the given byte offset
1373 * into the first bio in the source chain and continuing for the
1374 * number of bytes indicated. The result is another bio chain of
1375 * exactly the given length, or a null pointer on error.
1377 * The bio_src and offset parameters are both in-out. On entry they
1378 * refer to the first source bio and the offset into that bio where
1379 * the start of data to be cloned is located.
1381 * On return, bio_src is updated to refer to the bio in the source
1382 * chain that contains first un-cloned byte, and *offset will
1383 * contain the offset of that byte within that bio.
1385 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1386 unsigned int *offset,
1390 struct bio *bi = *bio_src;
1391 unsigned int off = *offset;
1392 struct bio *chain = NULL;
1395 /* Build up a chain of clone bios up to the limit */
1397 if (!bi || off >= bi->bi_iter.bi_size || !len)
1398 return NULL; /* Nothing to clone */
1402 unsigned int bi_size;
1406 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1407 goto out_err; /* EINVAL; ran out of bio's */
1409 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1410 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1412 goto out_err; /* ENOMEM */
1415 end = &bio->bi_next;
1418 if (off == bi->bi_iter.bi_size) {
1429 bio_chain_put(chain);
1435 * The default/initial value for all object request flags is 0. For
1436 * each flag, once its value is set to 1 it is never reset to 0
1439 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1441 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1442 struct rbd_device *rbd_dev;
1444 rbd_dev = obj_request->img_request->rbd_dev;
1445 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1450 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1453 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1456 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1458 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1459 struct rbd_device *rbd_dev = NULL;
1461 if (obj_request_img_data_test(obj_request))
1462 rbd_dev = obj_request->img_request->rbd_dev;
1463 rbd_warn(rbd_dev, "obj_request %p already marked done",
1468 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1471 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1475 * This sets the KNOWN flag after (possibly) setting the EXISTS
1476 * flag. The latter is set based on the "exists" value provided.
1478 * Note that for our purposes once an object exists it never goes
1479 * away again. It's possible that the response from two existence
1480 * checks are separated by the creation of the target object, and
1481 * the first ("doesn't exist") response arrives *after* the second
1482 * ("does exist"). In that case we ignore the second one.
1484 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1488 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1489 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1493 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1496 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1499 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1502 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1505 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1507 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1509 return obj_request->img_offset <
1510 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1513 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1515 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1516 kref_read(&obj_request->kref));
1517 kref_get(&obj_request->kref);
1520 static void rbd_obj_request_destroy(struct kref *kref);
1521 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1523 rbd_assert(obj_request != NULL);
1524 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1525 kref_read(&obj_request->kref));
1526 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1529 static void rbd_img_request_get(struct rbd_img_request *img_request)
1531 dout("%s: img %p (was %d)\n", __func__, img_request,
1532 kref_read(&img_request->kref));
1533 kref_get(&img_request->kref);
1536 static bool img_request_child_test(struct rbd_img_request *img_request);
1537 static void rbd_parent_request_destroy(struct kref *kref);
1538 static void rbd_img_request_destroy(struct kref *kref);
1539 static void rbd_img_request_put(struct rbd_img_request *img_request)
1541 rbd_assert(img_request != NULL);
1542 dout("%s: img %p (was %d)\n", __func__, img_request,
1543 kref_read(&img_request->kref));
1544 if (img_request_child_test(img_request))
1545 kref_put(&img_request->kref, rbd_parent_request_destroy);
1547 kref_put(&img_request->kref, rbd_img_request_destroy);
1550 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1551 struct rbd_obj_request *obj_request)
1553 rbd_assert(obj_request->img_request == NULL);
1555 /* Image request now owns object's original reference */
1556 obj_request->img_request = img_request;
1557 obj_request->which = img_request->obj_request_count;
1558 rbd_assert(!obj_request_img_data_test(obj_request));
1559 obj_request_img_data_set(obj_request);
1560 rbd_assert(obj_request->which != BAD_WHICH);
1561 img_request->obj_request_count++;
1562 list_add_tail(&obj_request->links, &img_request->obj_requests);
1563 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1564 obj_request->which);
1567 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1568 struct rbd_obj_request *obj_request)
1570 rbd_assert(obj_request->which != BAD_WHICH);
1572 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1573 obj_request->which);
1574 list_del(&obj_request->links);
1575 rbd_assert(img_request->obj_request_count > 0);
1576 img_request->obj_request_count--;
1577 rbd_assert(obj_request->which == img_request->obj_request_count);
1578 obj_request->which = BAD_WHICH;
1579 rbd_assert(obj_request_img_data_test(obj_request));
1580 rbd_assert(obj_request->img_request == img_request);
1581 obj_request->img_request = NULL;
1582 obj_request->callback = NULL;
1583 rbd_obj_request_put(obj_request);
1586 static bool obj_request_type_valid(enum obj_request_type type)
1589 case OBJ_REQUEST_NODATA:
1590 case OBJ_REQUEST_BIO:
1591 case OBJ_REQUEST_PAGES:
1598 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1600 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1602 struct ceph_osd_request *osd_req = obj_request->osd_req;
1604 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1605 obj_request, obj_request->object_no, obj_request->offset,
1606 obj_request->length, osd_req);
1607 if (obj_request_img_data_test(obj_request)) {
1608 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1609 rbd_img_request_get(obj_request->img_request);
1611 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1614 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1617 dout("%s: img %p\n", __func__, img_request);
1620 * If no error occurred, compute the aggregate transfer
1621 * count for the image request. We could instead use
1622 * atomic64_cmpxchg() to update it as each object request
1623 * completes; not clear which way is better off hand.
1625 if (!img_request->result) {
1626 struct rbd_obj_request *obj_request;
1629 for_each_obj_request(img_request, obj_request)
1630 xferred += obj_request->xferred;
1631 img_request->xferred = xferred;
1634 if (img_request->callback)
1635 img_request->callback(img_request);
1637 rbd_img_request_put(img_request);
1641 * The default/initial value for all image request flags is 0. Each
1642 * is conditionally set to 1 at image request initialization time
1643 * and currently never change thereafter.
1645 static void img_request_write_set(struct rbd_img_request *img_request)
1647 set_bit(IMG_REQ_WRITE, &img_request->flags);
1651 static bool img_request_write_test(struct rbd_img_request *img_request)
1654 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1658 * Set the discard flag when the img_request is an discard request
1660 static void img_request_discard_set(struct rbd_img_request *img_request)
1662 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1666 static bool img_request_discard_test(struct rbd_img_request *img_request)
1669 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1672 static void img_request_child_set(struct rbd_img_request *img_request)
1674 set_bit(IMG_REQ_CHILD, &img_request->flags);
1678 static void img_request_child_clear(struct rbd_img_request *img_request)
1680 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1684 static bool img_request_child_test(struct rbd_img_request *img_request)
1687 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1690 static void img_request_layered_set(struct rbd_img_request *img_request)
1692 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1696 static void img_request_layered_clear(struct rbd_img_request *img_request)
1698 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1702 static bool img_request_layered_test(struct rbd_img_request *img_request)
1705 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1708 static enum obj_operation_type
1709 rbd_img_request_op_type(struct rbd_img_request *img_request)
1711 if (img_request_write_test(img_request))
1712 return OBJ_OP_WRITE;
1713 else if (img_request_discard_test(img_request))
1714 return OBJ_OP_DISCARD;
1720 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1722 u64 xferred = obj_request->xferred;
1723 u64 length = obj_request->length;
1725 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1726 obj_request, obj_request->img_request, obj_request->result,
1729 * ENOENT means a hole in the image. We zero-fill the entire
1730 * length of the request. A short read also implies zero-fill
1731 * to the end of the request. An error requires the whole
1732 * length of the request to be reported finished with an error
1733 * to the block layer. In each case we update the xferred
1734 * count to indicate the whole request was satisfied.
1736 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1737 if (obj_request->result == -ENOENT) {
1738 if (obj_request->type == OBJ_REQUEST_BIO)
1739 zero_bio_chain(obj_request->bio_list, 0);
1741 zero_pages(obj_request->pages, 0, length);
1742 obj_request->result = 0;
1743 } else if (xferred < length && !obj_request->result) {
1744 if (obj_request->type == OBJ_REQUEST_BIO)
1745 zero_bio_chain(obj_request->bio_list, xferred);
1747 zero_pages(obj_request->pages, xferred, length);
1749 obj_request->xferred = length;
1750 obj_request_done_set(obj_request);
1753 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1755 dout("%s: obj %p cb %p\n", __func__, obj_request,
1756 obj_request->callback);
1757 if (obj_request->callback)
1758 obj_request->callback(obj_request);
1760 complete_all(&obj_request->completion);
1763 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1765 obj_request->result = err;
1766 obj_request->xferred = 0;
1768 * kludge - mirror rbd_obj_request_submit() to match a put in
1769 * rbd_img_obj_callback()
1771 if (obj_request_img_data_test(obj_request)) {
1772 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1773 rbd_img_request_get(obj_request->img_request);
1775 obj_request_done_set(obj_request);
1776 rbd_obj_request_complete(obj_request);
1779 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1781 struct rbd_img_request *img_request = NULL;
1782 struct rbd_device *rbd_dev = NULL;
1783 bool layered = false;
1785 if (obj_request_img_data_test(obj_request)) {
1786 img_request = obj_request->img_request;
1787 layered = img_request && img_request_layered_test(img_request);
1788 rbd_dev = img_request->rbd_dev;
1791 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1792 obj_request, img_request, obj_request->result,
1793 obj_request->xferred, obj_request->length);
1794 if (layered && obj_request->result == -ENOENT &&
1795 obj_request->img_offset < rbd_dev->parent_overlap)
1796 rbd_img_parent_read(obj_request);
1797 else if (img_request)
1798 rbd_img_obj_request_read_callback(obj_request);
1800 obj_request_done_set(obj_request);
1803 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1805 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1806 obj_request->result, obj_request->length);
1808 * There is no such thing as a successful short write. Set
1809 * it to our originally-requested length.
1811 obj_request->xferred = obj_request->length;
1812 obj_request_done_set(obj_request);
1815 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1817 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1818 obj_request->result, obj_request->length);
1820 * There is no such thing as a successful short discard. Set
1821 * it to our originally-requested length.
1823 obj_request->xferred = obj_request->length;
1824 /* discarding a non-existent object is not a problem */
1825 if (obj_request->result == -ENOENT)
1826 obj_request->result = 0;
1827 obj_request_done_set(obj_request);
1831 * For a simple stat call there's nothing to do. We'll do more if
1832 * this is part of a write sequence for a layered image.
1834 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1836 dout("%s: obj %p\n", __func__, obj_request);
1837 obj_request_done_set(obj_request);
1840 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1842 dout("%s: obj %p\n", __func__, obj_request);
1844 if (obj_request_img_data_test(obj_request))
1845 rbd_osd_copyup_callback(obj_request);
1847 obj_request_done_set(obj_request);
1850 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1852 struct rbd_obj_request *obj_request = osd_req->r_priv;
1855 dout("%s: osd_req %p\n", __func__, osd_req);
1856 rbd_assert(osd_req == obj_request->osd_req);
1857 if (obj_request_img_data_test(obj_request)) {
1858 rbd_assert(obj_request->img_request);
1859 rbd_assert(obj_request->which != BAD_WHICH);
1861 rbd_assert(obj_request->which == BAD_WHICH);
1864 if (osd_req->r_result < 0)
1865 obj_request->result = osd_req->r_result;
1868 * We support a 64-bit length, but ultimately it has to be
1869 * passed to the block layer, which just supports a 32-bit
1872 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1873 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1875 opcode = osd_req->r_ops[0].op;
1877 case CEPH_OSD_OP_READ:
1878 rbd_osd_read_callback(obj_request);
1880 case CEPH_OSD_OP_SETALLOCHINT:
1881 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1882 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1884 case CEPH_OSD_OP_WRITE:
1885 case CEPH_OSD_OP_WRITEFULL:
1886 rbd_osd_write_callback(obj_request);
1888 case CEPH_OSD_OP_STAT:
1889 rbd_osd_stat_callback(obj_request);
1891 case CEPH_OSD_OP_DELETE:
1892 case CEPH_OSD_OP_TRUNCATE:
1893 case CEPH_OSD_OP_ZERO:
1894 rbd_osd_discard_callback(obj_request);
1896 case CEPH_OSD_OP_CALL:
1897 rbd_osd_call_callback(obj_request);
1900 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1901 obj_request->object_no, opcode);
1905 if (obj_request_done_test(obj_request))
1906 rbd_obj_request_complete(obj_request);
1909 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1911 struct ceph_osd_request *osd_req = obj_request->osd_req;
1913 rbd_assert(obj_request_img_data_test(obj_request));
1914 osd_req->r_snapid = obj_request->img_request->snap_id;
1917 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1919 struct ceph_osd_request *osd_req = obj_request->osd_req;
1921 ktime_get_real_ts(&osd_req->r_mtime);
1922 osd_req->r_data_offset = obj_request->offset;
1925 static struct ceph_osd_request *
1926 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1927 struct ceph_snap_context *snapc,
1928 int num_ops, unsigned int flags,
1929 struct rbd_obj_request *obj_request)
1931 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1932 struct ceph_osd_request *req;
1933 const char *name_format = rbd_dev->image_format == 1 ?
1934 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1936 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1940 req->r_flags = flags;
1941 req->r_callback = rbd_osd_req_callback;
1942 req->r_priv = obj_request;
1944 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1945 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1946 rbd_dev->header.object_prefix, obj_request->object_no))
1949 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1955 ceph_osdc_put_request(req);
1960 * Create an osd request. A read request has one osd op (read).
1961 * A write request has either one (watch) or two (hint+write) osd ops.
1962 * (All rbd data writes are prefixed with an allocation hint op, but
1963 * technically osd watch is a write request, hence this distinction.)
1965 static struct ceph_osd_request *rbd_osd_req_create(
1966 struct rbd_device *rbd_dev,
1967 enum obj_operation_type op_type,
1968 unsigned int num_ops,
1969 struct rbd_obj_request *obj_request)
1971 struct ceph_snap_context *snapc = NULL;
1973 if (obj_request_img_data_test(obj_request) &&
1974 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1975 struct rbd_img_request *img_request = obj_request->img_request;
1976 if (op_type == OBJ_OP_WRITE) {
1977 rbd_assert(img_request_write_test(img_request));
1979 rbd_assert(img_request_discard_test(img_request));
1981 snapc = img_request->snapc;
1984 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1986 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1987 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1988 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1992 * Create a copyup osd request based on the information in the object
1993 * request supplied. A copyup request has two or three osd ops, a
1994 * copyup method call, potentially a hint op, and a write or truncate
1997 static struct ceph_osd_request *
1998 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2000 struct rbd_img_request *img_request;
2001 int num_osd_ops = 3;
2003 rbd_assert(obj_request_img_data_test(obj_request));
2004 img_request = obj_request->img_request;
2005 rbd_assert(img_request);
2006 rbd_assert(img_request_write_test(img_request) ||
2007 img_request_discard_test(img_request));
2009 if (img_request_discard_test(img_request))
2012 return __rbd_osd_req_create(img_request->rbd_dev,
2013 img_request->snapc, num_osd_ops,
2014 CEPH_OSD_FLAG_WRITE, obj_request);
2017 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2019 ceph_osdc_put_request(osd_req);
2022 static struct rbd_obj_request *
2023 rbd_obj_request_create(enum obj_request_type type)
2025 struct rbd_obj_request *obj_request;
2027 rbd_assert(obj_request_type_valid(type));
2029 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2033 obj_request->which = BAD_WHICH;
2034 obj_request->type = type;
2035 INIT_LIST_HEAD(&obj_request->links);
2036 init_completion(&obj_request->completion);
2037 kref_init(&obj_request->kref);
2039 dout("%s %p\n", __func__, obj_request);
2043 static void rbd_obj_request_destroy(struct kref *kref)
2045 struct rbd_obj_request *obj_request;
2047 obj_request = container_of(kref, struct rbd_obj_request, kref);
2049 dout("%s: obj %p\n", __func__, obj_request);
2051 rbd_assert(obj_request->img_request == NULL);
2052 rbd_assert(obj_request->which == BAD_WHICH);
2054 if (obj_request->osd_req)
2055 rbd_osd_req_destroy(obj_request->osd_req);
2057 rbd_assert(obj_request_type_valid(obj_request->type));
2058 switch (obj_request->type) {
2059 case OBJ_REQUEST_NODATA:
2060 break; /* Nothing to do */
2061 case OBJ_REQUEST_BIO:
2062 if (obj_request->bio_list)
2063 bio_chain_put(obj_request->bio_list);
2065 case OBJ_REQUEST_PAGES:
2066 /* img_data requests don't own their page array */
2067 if (obj_request->pages &&
2068 !obj_request_img_data_test(obj_request))
2069 ceph_release_page_vector(obj_request->pages,
2070 obj_request->page_count);
2074 kmem_cache_free(rbd_obj_request_cache, obj_request);
2077 /* It's OK to call this for a device with no parent */
2079 static void rbd_spec_put(struct rbd_spec *spec);
2080 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2082 rbd_dev_remove_parent(rbd_dev);
2083 rbd_spec_put(rbd_dev->parent_spec);
2084 rbd_dev->parent_spec = NULL;
2085 rbd_dev->parent_overlap = 0;
2089 * Parent image reference counting is used to determine when an
2090 * image's parent fields can be safely torn down--after there are no
2091 * more in-flight requests to the parent image. When the last
2092 * reference is dropped, cleaning them up is safe.
2094 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2098 if (!rbd_dev->parent_spec)
2101 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2105 /* Last reference; clean up parent data structures */
2108 rbd_dev_unparent(rbd_dev);
2110 rbd_warn(rbd_dev, "parent reference underflow");
2114 * If an image has a non-zero parent overlap, get a reference to its
2117 * Returns true if the rbd device has a parent with a non-zero
2118 * overlap and a reference for it was successfully taken, or
2121 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2125 if (!rbd_dev->parent_spec)
2128 down_read(&rbd_dev->header_rwsem);
2129 if (rbd_dev->parent_overlap)
2130 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2131 up_read(&rbd_dev->header_rwsem);
2134 rbd_warn(rbd_dev, "parent reference overflow");
2140 * Caller is responsible for filling in the list of object requests
2141 * that comprises the image request, and the Linux request pointer
2142 * (if there is one).
2144 static struct rbd_img_request *rbd_img_request_create(
2145 struct rbd_device *rbd_dev,
2146 u64 offset, u64 length,
2147 enum obj_operation_type op_type,
2148 struct ceph_snap_context *snapc)
2150 struct rbd_img_request *img_request;
2152 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2156 img_request->rq = NULL;
2157 img_request->rbd_dev = rbd_dev;
2158 img_request->offset = offset;
2159 img_request->length = length;
2160 img_request->flags = 0;
2161 if (op_type == OBJ_OP_DISCARD) {
2162 img_request_discard_set(img_request);
2163 img_request->snapc = snapc;
2164 } else if (op_type == OBJ_OP_WRITE) {
2165 img_request_write_set(img_request);
2166 img_request->snapc = snapc;
2168 img_request->snap_id = rbd_dev->spec->snap_id;
2170 if (rbd_dev_parent_get(rbd_dev))
2171 img_request_layered_set(img_request);
2172 spin_lock_init(&img_request->completion_lock);
2173 img_request->next_completion = 0;
2174 img_request->callback = NULL;
2175 img_request->result = 0;
2176 img_request->obj_request_count = 0;
2177 INIT_LIST_HEAD(&img_request->obj_requests);
2178 kref_init(&img_request->kref);
2180 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2181 obj_op_name(op_type), offset, length, img_request);
2186 static void rbd_img_request_destroy(struct kref *kref)
2188 struct rbd_img_request *img_request;
2189 struct rbd_obj_request *obj_request;
2190 struct rbd_obj_request *next_obj_request;
2192 img_request = container_of(kref, struct rbd_img_request, kref);
2194 dout("%s: img %p\n", __func__, img_request);
2196 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2197 rbd_img_obj_request_del(img_request, obj_request);
2198 rbd_assert(img_request->obj_request_count == 0);
2200 if (img_request_layered_test(img_request)) {
2201 img_request_layered_clear(img_request);
2202 rbd_dev_parent_put(img_request->rbd_dev);
2205 if (img_request_write_test(img_request) ||
2206 img_request_discard_test(img_request))
2207 ceph_put_snap_context(img_request->snapc);
2209 kmem_cache_free(rbd_img_request_cache, img_request);
2212 static struct rbd_img_request *rbd_parent_request_create(
2213 struct rbd_obj_request *obj_request,
2214 u64 img_offset, u64 length)
2216 struct rbd_img_request *parent_request;
2217 struct rbd_device *rbd_dev;
2219 rbd_assert(obj_request->img_request);
2220 rbd_dev = obj_request->img_request->rbd_dev;
2222 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2223 length, OBJ_OP_READ, NULL);
2224 if (!parent_request)
2227 img_request_child_set(parent_request);
2228 rbd_obj_request_get(obj_request);
2229 parent_request->obj_request = obj_request;
2231 return parent_request;
2234 static void rbd_parent_request_destroy(struct kref *kref)
2236 struct rbd_img_request *parent_request;
2237 struct rbd_obj_request *orig_request;
2239 parent_request = container_of(kref, struct rbd_img_request, kref);
2240 orig_request = parent_request->obj_request;
2242 parent_request->obj_request = NULL;
2243 rbd_obj_request_put(orig_request);
2244 img_request_child_clear(parent_request);
2246 rbd_img_request_destroy(kref);
2249 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2251 struct rbd_img_request *img_request;
2252 unsigned int xferred;
2256 rbd_assert(obj_request_img_data_test(obj_request));
2257 img_request = obj_request->img_request;
2259 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2260 xferred = (unsigned int)obj_request->xferred;
2261 result = obj_request->result;
2263 struct rbd_device *rbd_dev = img_request->rbd_dev;
2264 enum obj_operation_type op_type;
2266 if (img_request_discard_test(img_request))
2267 op_type = OBJ_OP_DISCARD;
2268 else if (img_request_write_test(img_request))
2269 op_type = OBJ_OP_WRITE;
2271 op_type = OBJ_OP_READ;
2273 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2274 obj_op_name(op_type), obj_request->length,
2275 obj_request->img_offset, obj_request->offset);
2276 rbd_warn(rbd_dev, " result %d xferred %x",
2278 if (!img_request->result)
2279 img_request->result = result;
2281 * Need to end I/O on the entire obj_request worth of
2282 * bytes in case of error.
2284 xferred = obj_request->length;
2287 if (img_request_child_test(img_request)) {
2288 rbd_assert(img_request->obj_request != NULL);
2289 more = obj_request->which < img_request->obj_request_count - 1;
2291 blk_status_t status = errno_to_blk_status(result);
2293 rbd_assert(img_request->rq != NULL);
2295 more = blk_update_request(img_request->rq, status, xferred);
2297 __blk_mq_end_request(img_request->rq, status);
2303 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2305 struct rbd_img_request *img_request;
2306 u32 which = obj_request->which;
2309 rbd_assert(obj_request_img_data_test(obj_request));
2310 img_request = obj_request->img_request;
2312 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2313 rbd_assert(img_request != NULL);
2314 rbd_assert(img_request->obj_request_count > 0);
2315 rbd_assert(which != BAD_WHICH);
2316 rbd_assert(which < img_request->obj_request_count);
2318 spin_lock_irq(&img_request->completion_lock);
2319 if (which != img_request->next_completion)
2322 for_each_obj_request_from(img_request, obj_request) {
2324 rbd_assert(which < img_request->obj_request_count);
2326 if (!obj_request_done_test(obj_request))
2328 more = rbd_img_obj_end_request(obj_request);
2332 rbd_assert(more ^ (which == img_request->obj_request_count));
2333 img_request->next_completion = which;
2335 spin_unlock_irq(&img_request->completion_lock);
2336 rbd_img_request_put(img_request);
2339 rbd_img_request_complete(img_request);
2343 * Add individual osd ops to the given ceph_osd_request and prepare
2344 * them for submission. num_ops is the current number of
2345 * osd operations already to the object request.
2347 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2348 struct ceph_osd_request *osd_request,
2349 enum obj_operation_type op_type,
2350 unsigned int num_ops)
2352 struct rbd_img_request *img_request = obj_request->img_request;
2353 struct rbd_device *rbd_dev = img_request->rbd_dev;
2354 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2355 u64 offset = obj_request->offset;
2356 u64 length = obj_request->length;
2360 if (op_type == OBJ_OP_DISCARD) {
2361 if (!offset && length == object_size &&
2362 (!img_request_layered_test(img_request) ||
2363 !obj_request_overlaps_parent(obj_request))) {
2364 opcode = CEPH_OSD_OP_DELETE;
2365 } else if ((offset + length == object_size)) {
2366 opcode = CEPH_OSD_OP_TRUNCATE;
2368 down_read(&rbd_dev->header_rwsem);
2369 img_end = rbd_dev->header.image_size;
2370 up_read(&rbd_dev->header_rwsem);
2372 if (obj_request->img_offset + length == img_end)
2373 opcode = CEPH_OSD_OP_TRUNCATE;
2375 opcode = CEPH_OSD_OP_ZERO;
2377 } else if (op_type == OBJ_OP_WRITE) {
2378 if (!offset && length == object_size)
2379 opcode = CEPH_OSD_OP_WRITEFULL;
2381 opcode = CEPH_OSD_OP_WRITE;
2382 osd_req_op_alloc_hint_init(osd_request, num_ops,
2383 object_size, object_size);
2386 opcode = CEPH_OSD_OP_READ;
2389 if (opcode == CEPH_OSD_OP_DELETE)
2390 osd_req_op_init(osd_request, num_ops, opcode, 0);
2392 osd_req_op_extent_init(osd_request, num_ops, opcode,
2393 offset, length, 0, 0);
2395 if (obj_request->type == OBJ_REQUEST_BIO)
2396 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2397 obj_request->bio_list, length);
2398 else if (obj_request->type == OBJ_REQUEST_PAGES)
2399 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2400 obj_request->pages, length,
2401 offset & ~PAGE_MASK, false, false);
2403 /* Discards are also writes */
2404 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2405 rbd_osd_req_format_write(obj_request);
2407 rbd_osd_req_format_read(obj_request);
2411 * Split up an image request into one or more object requests, each
2412 * to a different object. The "type" parameter indicates whether
2413 * "data_desc" is the pointer to the head of a list of bio
2414 * structures, or the base of a page array. In either case this
2415 * function assumes data_desc describes memory sufficient to hold
2416 * all data described by the image request.
2418 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2419 enum obj_request_type type,
2422 struct rbd_device *rbd_dev = img_request->rbd_dev;
2423 struct rbd_obj_request *obj_request = NULL;
2424 struct rbd_obj_request *next_obj_request;
2425 struct bio *bio_list = NULL;
2426 unsigned int bio_offset = 0;
2427 struct page **pages = NULL;
2428 enum obj_operation_type op_type;
2432 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2433 (int)type, data_desc);
2435 img_offset = img_request->offset;
2436 resid = img_request->length;
2437 rbd_assert(resid > 0);
2438 op_type = rbd_img_request_op_type(img_request);
2440 if (type == OBJ_REQUEST_BIO) {
2441 bio_list = data_desc;
2442 rbd_assert(img_offset ==
2443 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2444 } else if (type == OBJ_REQUEST_PAGES) {
2449 struct ceph_osd_request *osd_req;
2450 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2451 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2452 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2454 obj_request = rbd_obj_request_create(type);
2458 obj_request->object_no = object_no;
2459 obj_request->offset = offset;
2460 obj_request->length = length;
2463 * set obj_request->img_request before creating the
2464 * osd_request so that it gets the right snapc
2466 rbd_img_obj_request_add(img_request, obj_request);
2468 if (type == OBJ_REQUEST_BIO) {
2469 unsigned int clone_size;
2471 rbd_assert(length <= (u64)UINT_MAX);
2472 clone_size = (unsigned int)length;
2473 obj_request->bio_list =
2474 bio_chain_clone_range(&bio_list,
2478 if (!obj_request->bio_list)
2480 } else if (type == OBJ_REQUEST_PAGES) {
2481 unsigned int page_count;
2483 obj_request->pages = pages;
2484 page_count = (u32)calc_pages_for(offset, length);
2485 obj_request->page_count = page_count;
2486 if ((offset + length) & ~PAGE_MASK)
2487 page_count--; /* more on last page */
2488 pages += page_count;
2491 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2492 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2497 obj_request->osd_req = osd_req;
2498 obj_request->callback = rbd_img_obj_callback;
2499 obj_request->img_offset = img_offset;
2501 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2503 img_offset += length;
2510 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2511 rbd_img_obj_request_del(img_request, obj_request);
2517 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2519 struct rbd_img_request *img_request;
2520 struct rbd_device *rbd_dev;
2521 struct page **pages;
2524 dout("%s: obj %p\n", __func__, obj_request);
2526 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2527 obj_request->type == OBJ_REQUEST_NODATA);
2528 rbd_assert(obj_request_img_data_test(obj_request));
2529 img_request = obj_request->img_request;
2530 rbd_assert(img_request);
2532 rbd_dev = img_request->rbd_dev;
2533 rbd_assert(rbd_dev);
2535 pages = obj_request->copyup_pages;
2536 rbd_assert(pages != NULL);
2537 obj_request->copyup_pages = NULL;
2538 page_count = obj_request->copyup_page_count;
2539 rbd_assert(page_count);
2540 obj_request->copyup_page_count = 0;
2541 ceph_release_page_vector(pages, page_count);
2544 * We want the transfer count to reflect the size of the
2545 * original write request. There is no such thing as a
2546 * successful short write, so if the request was successful
2547 * we can just set it to the originally-requested length.
2549 if (!obj_request->result)
2550 obj_request->xferred = obj_request->length;
2552 obj_request_done_set(obj_request);
2556 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2558 struct rbd_obj_request *orig_request;
2559 struct ceph_osd_request *osd_req;
2560 struct rbd_device *rbd_dev;
2561 struct page **pages;
2562 enum obj_operation_type op_type;
2567 rbd_assert(img_request_child_test(img_request));
2569 /* First get what we need from the image request */
2571 pages = img_request->copyup_pages;
2572 rbd_assert(pages != NULL);
2573 img_request->copyup_pages = NULL;
2574 page_count = img_request->copyup_page_count;
2575 rbd_assert(page_count);
2576 img_request->copyup_page_count = 0;
2578 orig_request = img_request->obj_request;
2579 rbd_assert(orig_request != NULL);
2580 rbd_assert(obj_request_type_valid(orig_request->type));
2581 img_result = img_request->result;
2582 parent_length = img_request->length;
2583 rbd_assert(img_result || parent_length == img_request->xferred);
2584 rbd_img_request_put(img_request);
2586 rbd_assert(orig_request->img_request);
2587 rbd_dev = orig_request->img_request->rbd_dev;
2588 rbd_assert(rbd_dev);
2591 * If the overlap has become 0 (most likely because the
2592 * image has been flattened) we need to free the pages
2593 * and re-submit the original write request.
2595 if (!rbd_dev->parent_overlap) {
2596 ceph_release_page_vector(pages, page_count);
2597 rbd_obj_request_submit(orig_request);
2605 * The original osd request is of no use to use any more.
2606 * We need a new one that can hold the three ops in a copyup
2607 * request. Allocate the new copyup osd request for the
2608 * original request, and release the old one.
2610 img_result = -ENOMEM;
2611 osd_req = rbd_osd_req_create_copyup(orig_request);
2614 rbd_osd_req_destroy(orig_request->osd_req);
2615 orig_request->osd_req = osd_req;
2616 orig_request->copyup_pages = pages;
2617 orig_request->copyup_page_count = page_count;
2619 /* Initialize the copyup op */
2621 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2622 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2625 /* Add the other op(s) */
2627 op_type = rbd_img_request_op_type(orig_request->img_request);
2628 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2630 /* All set, send it off. */
2632 rbd_obj_request_submit(orig_request);
2636 ceph_release_page_vector(pages, page_count);
2637 rbd_obj_request_error(orig_request, img_result);
2641 * Read from the parent image the range of data that covers the
2642 * entire target of the given object request. This is used for
2643 * satisfying a layered image write request when the target of an
2644 * object request from the image request does not exist.
2646 * A page array big enough to hold the returned data is allocated
2647 * and supplied to rbd_img_request_fill() as the "data descriptor."
2648 * When the read completes, this page array will be transferred to
2649 * the original object request for the copyup operation.
2651 * If an error occurs, it is recorded as the result of the original
2652 * object request in rbd_img_obj_exists_callback().
2654 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2656 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2657 struct rbd_img_request *parent_request = NULL;
2660 struct page **pages = NULL;
2664 rbd_assert(rbd_dev->parent != NULL);
2667 * Determine the byte range covered by the object in the
2668 * child image to which the original request was to be sent.
2670 img_offset = obj_request->img_offset - obj_request->offset;
2671 length = rbd_obj_bytes(&rbd_dev->header);
2674 * There is no defined parent data beyond the parent
2675 * overlap, so limit what we read at that boundary if
2678 if (img_offset + length > rbd_dev->parent_overlap) {
2679 rbd_assert(img_offset < rbd_dev->parent_overlap);
2680 length = rbd_dev->parent_overlap - img_offset;
2684 * Allocate a page array big enough to receive the data read
2687 page_count = (u32)calc_pages_for(0, length);
2688 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2689 if (IS_ERR(pages)) {
2690 result = PTR_ERR(pages);
2696 parent_request = rbd_parent_request_create(obj_request,
2697 img_offset, length);
2698 if (!parent_request)
2701 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2705 parent_request->copyup_pages = pages;
2706 parent_request->copyup_page_count = page_count;
2707 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2709 result = rbd_img_request_submit(parent_request);
2713 parent_request->copyup_pages = NULL;
2714 parent_request->copyup_page_count = 0;
2715 parent_request->obj_request = NULL;
2716 rbd_obj_request_put(obj_request);
2719 ceph_release_page_vector(pages, page_count);
2721 rbd_img_request_put(parent_request);
2725 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2727 struct rbd_obj_request *orig_request;
2728 struct rbd_device *rbd_dev;
2731 rbd_assert(!obj_request_img_data_test(obj_request));
2734 * All we need from the object request is the original
2735 * request and the result of the STAT op. Grab those, then
2736 * we're done with the request.
2738 orig_request = obj_request->obj_request;
2739 obj_request->obj_request = NULL;
2740 rbd_obj_request_put(orig_request);
2741 rbd_assert(orig_request);
2742 rbd_assert(orig_request->img_request);
2744 result = obj_request->result;
2745 obj_request->result = 0;
2747 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2748 obj_request, orig_request, result,
2749 obj_request->xferred, obj_request->length);
2750 rbd_obj_request_put(obj_request);
2753 * If the overlap has become 0 (most likely because the
2754 * image has been flattened) we need to re-submit the
2757 rbd_dev = orig_request->img_request->rbd_dev;
2758 if (!rbd_dev->parent_overlap) {
2759 rbd_obj_request_submit(orig_request);
2764 * Our only purpose here is to determine whether the object
2765 * exists, and we don't want to treat the non-existence as
2766 * an error. If something else comes back, transfer the
2767 * error to the original request and complete it now.
2770 obj_request_existence_set(orig_request, true);
2771 } else if (result == -ENOENT) {
2772 obj_request_existence_set(orig_request, false);
2774 goto fail_orig_request;
2778 * Resubmit the original request now that we have recorded
2779 * whether the target object exists.
2781 result = rbd_img_obj_request_submit(orig_request);
2783 goto fail_orig_request;
2788 rbd_obj_request_error(orig_request, result);
2791 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2793 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2794 struct rbd_obj_request *stat_request;
2795 struct page **pages;
2800 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2804 stat_request->object_no = obj_request->object_no;
2806 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2808 if (!stat_request->osd_req) {
2810 goto fail_stat_request;
2814 * The response data for a STAT call consists of:
2821 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2822 page_count = (u32)calc_pages_for(0, size);
2823 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2824 if (IS_ERR(pages)) {
2825 ret = PTR_ERR(pages);
2826 goto fail_stat_request;
2829 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2830 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2833 rbd_obj_request_get(obj_request);
2834 stat_request->obj_request = obj_request;
2835 stat_request->pages = pages;
2836 stat_request->page_count = page_count;
2837 stat_request->callback = rbd_img_obj_exists_callback;
2839 rbd_obj_request_submit(stat_request);
2843 rbd_obj_request_put(stat_request);
2847 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2849 struct rbd_img_request *img_request = obj_request->img_request;
2850 struct rbd_device *rbd_dev = img_request->rbd_dev;
2853 if (!img_request_write_test(img_request) &&
2854 !img_request_discard_test(img_request))
2857 /* Non-layered writes */
2858 if (!img_request_layered_test(img_request))
2862 * Layered writes outside of the parent overlap range don't
2863 * share any data with the parent.
2865 if (!obj_request_overlaps_parent(obj_request))
2869 * Entire-object layered writes - we will overwrite whatever
2870 * parent data there is anyway.
2872 if (!obj_request->offset &&
2873 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2877 * If the object is known to already exist, its parent data has
2878 * already been copied.
2880 if (obj_request_known_test(obj_request) &&
2881 obj_request_exists_test(obj_request))
2887 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2889 rbd_assert(obj_request_img_data_test(obj_request));
2890 rbd_assert(obj_request_type_valid(obj_request->type));
2891 rbd_assert(obj_request->img_request);
2893 if (img_obj_request_simple(obj_request)) {
2894 rbd_obj_request_submit(obj_request);
2899 * It's a layered write. The target object might exist but
2900 * we may not know that yet. If we know it doesn't exist,
2901 * start by reading the data for the full target object from
2902 * the parent so we can use it for a copyup to the target.
2904 if (obj_request_known_test(obj_request))
2905 return rbd_img_obj_parent_read_full(obj_request);
2907 /* We don't know whether the target exists. Go find out. */
2909 return rbd_img_obj_exists_submit(obj_request);
2912 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2914 struct rbd_obj_request *obj_request;
2915 struct rbd_obj_request *next_obj_request;
2918 dout("%s: img %p\n", __func__, img_request);
2920 rbd_img_request_get(img_request);
2921 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2922 ret = rbd_img_obj_request_submit(obj_request);
2928 rbd_img_request_put(img_request);
2932 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2934 struct rbd_obj_request *obj_request;
2935 struct rbd_device *rbd_dev;
2940 rbd_assert(img_request_child_test(img_request));
2942 /* First get what we need from the image request and release it */
2944 obj_request = img_request->obj_request;
2945 img_xferred = img_request->xferred;
2946 img_result = img_request->result;
2947 rbd_img_request_put(img_request);
2950 * If the overlap has become 0 (most likely because the
2951 * image has been flattened) we need to re-submit the
2954 rbd_assert(obj_request);
2955 rbd_assert(obj_request->img_request);
2956 rbd_dev = obj_request->img_request->rbd_dev;
2957 if (!rbd_dev->parent_overlap) {
2958 rbd_obj_request_submit(obj_request);
2962 obj_request->result = img_result;
2963 if (obj_request->result)
2967 * We need to zero anything beyond the parent overlap
2968 * boundary. Since rbd_img_obj_request_read_callback()
2969 * will zero anything beyond the end of a short read, an
2970 * easy way to do this is to pretend the data from the
2971 * parent came up short--ending at the overlap boundary.
2973 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2974 obj_end = obj_request->img_offset + obj_request->length;
2975 if (obj_end > rbd_dev->parent_overlap) {
2978 if (obj_request->img_offset < rbd_dev->parent_overlap)
2979 xferred = rbd_dev->parent_overlap -
2980 obj_request->img_offset;
2982 obj_request->xferred = min(img_xferred, xferred);
2984 obj_request->xferred = img_xferred;
2987 rbd_img_obj_request_read_callback(obj_request);
2988 rbd_obj_request_complete(obj_request);
2991 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2993 struct rbd_img_request *img_request;
2996 rbd_assert(obj_request_img_data_test(obj_request));
2997 rbd_assert(obj_request->img_request != NULL);
2998 rbd_assert(obj_request->result == (s32) -ENOENT);
2999 rbd_assert(obj_request_type_valid(obj_request->type));
3001 /* rbd_read_finish(obj_request, obj_request->length); */
3002 img_request = rbd_parent_request_create(obj_request,
3003 obj_request->img_offset,
3004 obj_request->length);
3009 if (obj_request->type == OBJ_REQUEST_BIO)
3010 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3011 obj_request->bio_list);
3013 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3014 obj_request->pages);
3018 img_request->callback = rbd_img_parent_read_callback;
3019 result = rbd_img_request_submit(img_request);
3026 rbd_img_request_put(img_request);
3027 obj_request->result = result;
3028 obj_request->xferred = 0;
3029 obj_request_done_set(obj_request);
3032 static const struct rbd_client_id rbd_empty_cid;
3034 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3035 const struct rbd_client_id *rhs)
3037 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3040 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3042 struct rbd_client_id cid;
3044 mutex_lock(&rbd_dev->watch_mutex);
3045 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3046 cid.handle = rbd_dev->watch_cookie;
3047 mutex_unlock(&rbd_dev->watch_mutex);
3052 * lock_rwsem must be held for write
3054 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3055 const struct rbd_client_id *cid)
3057 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3058 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3059 cid->gid, cid->handle);
3060 rbd_dev->owner_cid = *cid; /* struct */
3063 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3065 mutex_lock(&rbd_dev->watch_mutex);
3066 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3067 mutex_unlock(&rbd_dev->watch_mutex);
3070 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3072 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3074 strcpy(rbd_dev->lock_cookie, cookie);
3075 rbd_set_owner_cid(rbd_dev, &cid);
3076 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3080 * lock_rwsem must be held for write
3082 static int rbd_lock(struct rbd_device *rbd_dev)
3084 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3088 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3089 rbd_dev->lock_cookie[0] != '\0');
3091 format_lock_cookie(rbd_dev, cookie);
3092 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3093 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3094 RBD_LOCK_TAG, "", 0);
3098 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3099 __rbd_lock(rbd_dev, cookie);
3104 * lock_rwsem must be held for write
3106 static void rbd_unlock(struct rbd_device *rbd_dev)
3108 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3111 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3112 rbd_dev->lock_cookie[0] == '\0');
3114 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3115 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3116 if (ret && ret != -ENOENT)
3117 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3119 /* treat errors as the image is unlocked */
3120 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3121 rbd_dev->lock_cookie[0] = '\0';
3122 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3123 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3126 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3127 enum rbd_notify_op notify_op,
3128 struct page ***preply_pages,
3131 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3132 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3133 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3137 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3139 /* encode *LockPayload NotifyMessage (op + ClientId) */
3140 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3141 ceph_encode_32(&p, notify_op);
3142 ceph_encode_64(&p, cid.gid);
3143 ceph_encode_64(&p, cid.handle);
3145 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3146 &rbd_dev->header_oloc, buf, buf_size,
3147 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3150 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3151 enum rbd_notify_op notify_op)
3153 struct page **reply_pages;
3156 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3157 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3160 static void rbd_notify_acquired_lock(struct work_struct *work)
3162 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3163 acquired_lock_work);
3165 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3168 static void rbd_notify_released_lock(struct work_struct *work)
3170 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3171 released_lock_work);
3173 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3176 static int rbd_request_lock(struct rbd_device *rbd_dev)
3178 struct page **reply_pages;
3180 bool lock_owner_responded = false;
3183 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3185 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3186 &reply_pages, &reply_len);
3187 if (ret && ret != -ETIMEDOUT) {
3188 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3192 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3193 void *p = page_address(reply_pages[0]);
3194 void *const end = p + reply_len;
3197 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3202 ceph_decode_need(&p, end, 8 + 8, e_inval);
3203 p += 8 + 8; /* skip gid and cookie */
3205 ceph_decode_32_safe(&p, end, len, e_inval);
3209 if (lock_owner_responded) {
3211 "duplicate lock owners detected");
3216 lock_owner_responded = true;
3217 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3221 "failed to decode ResponseMessage: %d",
3226 ret = ceph_decode_32(&p);
3230 if (!lock_owner_responded) {
3231 rbd_warn(rbd_dev, "no lock owners detected");
3236 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3244 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3246 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3248 cancel_delayed_work(&rbd_dev->lock_dwork);
3250 wake_up_all(&rbd_dev->lock_waitq);
3252 wake_up(&rbd_dev->lock_waitq);
3255 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3256 struct ceph_locker **lockers, u32 *num_lockers)
3258 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3263 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3265 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3266 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3267 &lock_type, &lock_tag, lockers, num_lockers);
3271 if (*num_lockers == 0) {
3272 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3276 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3277 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3283 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3284 rbd_warn(rbd_dev, "shared lock type detected");
3289 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3290 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3291 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3292 (*lockers)[0].id.cookie);
3302 static int find_watcher(struct rbd_device *rbd_dev,
3303 const struct ceph_locker *locker)
3305 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3306 struct ceph_watch_item *watchers;
3312 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3313 &rbd_dev->header_oloc, &watchers,
3318 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3319 for (i = 0; i < num_watchers; i++) {
3320 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3321 sizeof(locker->info.addr)) &&
3322 watchers[i].cookie == cookie) {
3323 struct rbd_client_id cid = {
3324 .gid = le64_to_cpu(watchers[i].name.num),
3328 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3329 rbd_dev, cid.gid, cid.handle);
3330 rbd_set_owner_cid(rbd_dev, &cid);
3336 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3344 * lock_rwsem must be held for write
3346 static int rbd_try_lock(struct rbd_device *rbd_dev)
3348 struct ceph_client *client = rbd_dev->rbd_client->client;
3349 struct ceph_locker *lockers;
3354 ret = rbd_lock(rbd_dev);
3358 /* determine if the current lock holder is still alive */
3359 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3363 if (num_lockers == 0)
3366 ret = find_watcher(rbd_dev, lockers);
3369 ret = 0; /* have to request lock */
3373 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3374 ENTITY_NAME(lockers[0].id.name));
3376 ret = ceph_monc_blacklist_add(&client->monc,
3377 &lockers[0].info.addr);
3379 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3380 ENTITY_NAME(lockers[0].id.name), ret);
3384 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3385 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3386 lockers[0].id.cookie,
3387 &lockers[0].id.name);
3388 if (ret && ret != -ENOENT)
3392 ceph_free_lockers(lockers, num_lockers);
3396 ceph_free_lockers(lockers, num_lockers);
3401 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3403 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3406 enum rbd_lock_state lock_state;
3408 down_read(&rbd_dev->lock_rwsem);
3409 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3410 rbd_dev->lock_state);
3411 if (__rbd_is_lock_owner(rbd_dev)) {
3412 lock_state = rbd_dev->lock_state;
3413 up_read(&rbd_dev->lock_rwsem);
3417 up_read(&rbd_dev->lock_rwsem);
3418 down_write(&rbd_dev->lock_rwsem);
3419 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3420 rbd_dev->lock_state);
3421 if (!__rbd_is_lock_owner(rbd_dev)) {
3422 *pret = rbd_try_lock(rbd_dev);
3424 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3427 lock_state = rbd_dev->lock_state;
3428 up_write(&rbd_dev->lock_rwsem);
3432 static void rbd_acquire_lock(struct work_struct *work)
3434 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3435 struct rbd_device, lock_dwork);
3436 enum rbd_lock_state lock_state;
3439 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3441 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3442 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3443 if (lock_state == RBD_LOCK_STATE_LOCKED)
3444 wake_requests(rbd_dev, true);
3445 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3446 rbd_dev, lock_state, ret);
3450 ret = rbd_request_lock(rbd_dev);
3451 if (ret == -ETIMEDOUT) {
3452 goto again; /* treat this as a dead client */
3453 } else if (ret == -EROFS) {
3454 rbd_warn(rbd_dev, "peer will not release lock");
3456 * If this is rbd_add_acquire_lock(), we want to fail
3457 * immediately -- reuse BLACKLISTED flag. Otherwise we
3460 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3461 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3462 /* wake "rbd map --exclusive" process */
3463 wake_requests(rbd_dev, false);
3465 } else if (ret < 0) {
3466 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3467 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3471 * lock owner acked, but resend if we don't see them
3474 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3476 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3477 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3482 * lock_rwsem must be held for write
3484 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3486 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3487 rbd_dev->lock_state);
3488 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3491 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3492 downgrade_write(&rbd_dev->lock_rwsem);
3494 * Ensure that all in-flight IO is flushed.
3496 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3497 * may be shared with other devices.
3499 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3500 up_read(&rbd_dev->lock_rwsem);
3502 down_write(&rbd_dev->lock_rwsem);
3503 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3504 rbd_dev->lock_state);
3505 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3508 rbd_unlock(rbd_dev);
3510 * Give others a chance to grab the lock - we would re-acquire
3511 * almost immediately if we got new IO during ceph_osdc_sync()
3512 * otherwise. We need to ack our own notifications, so this
3513 * lock_dwork will be requeued from rbd_wait_state_locked()
3514 * after wake_requests() in rbd_handle_released_lock().
3516 cancel_delayed_work(&rbd_dev->lock_dwork);
3520 static void rbd_release_lock_work(struct work_struct *work)
3522 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3525 down_write(&rbd_dev->lock_rwsem);
3526 rbd_release_lock(rbd_dev);
3527 up_write(&rbd_dev->lock_rwsem);
3530 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3533 struct rbd_client_id cid = { 0 };
3535 if (struct_v >= 2) {
3536 cid.gid = ceph_decode_64(p);
3537 cid.handle = ceph_decode_64(p);
3540 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3542 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3543 down_write(&rbd_dev->lock_rwsem);
3544 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3546 * we already know that the remote client is
3549 up_write(&rbd_dev->lock_rwsem);
3553 rbd_set_owner_cid(rbd_dev, &cid);
3554 downgrade_write(&rbd_dev->lock_rwsem);
3556 down_read(&rbd_dev->lock_rwsem);
3559 if (!__rbd_is_lock_owner(rbd_dev))
3560 wake_requests(rbd_dev, false);
3561 up_read(&rbd_dev->lock_rwsem);
3564 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3567 struct rbd_client_id cid = { 0 };
3569 if (struct_v >= 2) {
3570 cid.gid = ceph_decode_64(p);
3571 cid.handle = ceph_decode_64(p);
3574 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3576 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3577 down_write(&rbd_dev->lock_rwsem);
3578 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3579 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3580 __func__, rbd_dev, cid.gid, cid.handle,
3581 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3582 up_write(&rbd_dev->lock_rwsem);
3586 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3587 downgrade_write(&rbd_dev->lock_rwsem);
3589 down_read(&rbd_dev->lock_rwsem);
3592 if (!__rbd_is_lock_owner(rbd_dev))
3593 wake_requests(rbd_dev, false);
3594 up_read(&rbd_dev->lock_rwsem);
3598 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3599 * ResponseMessage is needed.
3601 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3604 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3605 struct rbd_client_id cid = { 0 };
3608 if (struct_v >= 2) {
3609 cid.gid = ceph_decode_64(p);
3610 cid.handle = ceph_decode_64(p);
3613 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3615 if (rbd_cid_equal(&cid, &my_cid))
3618 down_read(&rbd_dev->lock_rwsem);
3619 if (__rbd_is_lock_owner(rbd_dev)) {
3620 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3621 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3625 * encode ResponseMessage(0) so the peer can detect
3630 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3631 if (!rbd_dev->opts->exclusive) {
3632 dout("%s rbd_dev %p queueing unlock_work\n",
3634 queue_work(rbd_dev->task_wq,
3635 &rbd_dev->unlock_work);
3637 /* refuse to release the lock */
3644 up_read(&rbd_dev->lock_rwsem);
3648 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3649 u64 notify_id, u64 cookie, s32 *result)
3651 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3652 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3659 /* encode ResponseMessage */
3660 ceph_start_encoding(&p, 1, 1,
3661 buf_size - CEPH_ENCODING_START_BLK_LEN);
3662 ceph_encode_32(&p, *result);
3667 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3668 &rbd_dev->header_oloc, notify_id, cookie,
3671 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3674 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3677 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3678 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3681 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3682 u64 notify_id, u64 cookie, s32 result)
3684 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3685 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3688 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3689 u64 notifier_id, void *data, size_t data_len)
3691 struct rbd_device *rbd_dev = arg;
3693 void *const end = p + data_len;
3699 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3700 __func__, rbd_dev, cookie, notify_id, data_len);
3702 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3705 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3710 notify_op = ceph_decode_32(&p);
3712 /* legacy notification for header updates */
3713 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3717 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3718 switch (notify_op) {
3719 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3720 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3721 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3723 case RBD_NOTIFY_OP_RELEASED_LOCK:
3724 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3725 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3727 case RBD_NOTIFY_OP_REQUEST_LOCK:
3728 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3730 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3733 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3735 case RBD_NOTIFY_OP_HEADER_UPDATE:
3736 ret = rbd_dev_refresh(rbd_dev);
3738 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3740 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3743 if (rbd_is_lock_owner(rbd_dev))
3744 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3745 cookie, -EOPNOTSUPP);
3747 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3752 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3754 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3756 struct rbd_device *rbd_dev = arg;
3758 rbd_warn(rbd_dev, "encountered watch error: %d", err);
3760 down_write(&rbd_dev->lock_rwsem);
3761 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3762 up_write(&rbd_dev->lock_rwsem);
3764 mutex_lock(&rbd_dev->watch_mutex);
3765 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3766 __rbd_unregister_watch(rbd_dev);
3767 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3769 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3771 mutex_unlock(&rbd_dev->watch_mutex);
3775 * watch_mutex must be locked
3777 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3779 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3780 struct ceph_osd_linger_request *handle;
3782 rbd_assert(!rbd_dev->watch_handle);
3783 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3785 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3786 &rbd_dev->header_oloc, rbd_watch_cb,
3787 rbd_watch_errcb, rbd_dev);
3789 return PTR_ERR(handle);
3791 rbd_dev->watch_handle = handle;
3796 * watch_mutex must be locked
3798 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3800 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3803 rbd_assert(rbd_dev->watch_handle);
3804 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3806 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3808 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3810 rbd_dev->watch_handle = NULL;
3813 static int rbd_register_watch(struct rbd_device *rbd_dev)
3817 mutex_lock(&rbd_dev->watch_mutex);
3818 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3819 ret = __rbd_register_watch(rbd_dev);
3823 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3824 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3827 mutex_unlock(&rbd_dev->watch_mutex);
3831 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3833 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3835 cancel_work_sync(&rbd_dev->acquired_lock_work);
3836 cancel_work_sync(&rbd_dev->released_lock_work);
3837 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3838 cancel_work_sync(&rbd_dev->unlock_work);
3842 * header_rwsem must not be held to avoid a deadlock with
3843 * rbd_dev_refresh() when flushing notifies.
3845 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3847 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3848 cancel_tasks_sync(rbd_dev);
3850 mutex_lock(&rbd_dev->watch_mutex);
3851 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3852 __rbd_unregister_watch(rbd_dev);
3853 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3854 mutex_unlock(&rbd_dev->watch_mutex);
3856 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3857 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3861 * lock_rwsem must be held for write
3863 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3865 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3869 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3871 format_lock_cookie(rbd_dev, cookie);
3872 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3873 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3874 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3875 RBD_LOCK_TAG, cookie);
3877 if (ret != -EOPNOTSUPP)
3878 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3882 * Lock cookie cannot be updated on older OSDs, so do
3883 * a manual release and queue an acquire.
3885 if (rbd_release_lock(rbd_dev))
3886 queue_delayed_work(rbd_dev->task_wq,
3887 &rbd_dev->lock_dwork, 0);
3889 __rbd_lock(rbd_dev, cookie);
3893 static void rbd_reregister_watch(struct work_struct *work)
3895 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3896 struct rbd_device, watch_dwork);
3899 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3901 mutex_lock(&rbd_dev->watch_mutex);
3902 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3903 mutex_unlock(&rbd_dev->watch_mutex);
3907 ret = __rbd_register_watch(rbd_dev);
3909 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3910 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3911 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3912 wake_requests(rbd_dev, true);
3914 queue_delayed_work(rbd_dev->task_wq,
3915 &rbd_dev->watch_dwork,
3918 mutex_unlock(&rbd_dev->watch_mutex);
3922 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3923 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3924 mutex_unlock(&rbd_dev->watch_mutex);
3926 down_write(&rbd_dev->lock_rwsem);
3927 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3928 rbd_reacquire_lock(rbd_dev);
3929 up_write(&rbd_dev->lock_rwsem);
3931 ret = rbd_dev_refresh(rbd_dev);
3933 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3937 * Synchronous osd object method call. Returns the number of bytes
3938 * returned in the outbound buffer, or a negative error code.
3940 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3941 struct ceph_object_id *oid,
3942 struct ceph_object_locator *oloc,
3943 const char *method_name,
3944 const void *outbound,
3945 size_t outbound_size,
3947 size_t inbound_size)
3949 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3950 struct page *req_page = NULL;
3951 struct page *reply_page;
3955 * Method calls are ultimately read operations. The result
3956 * should placed into the inbound buffer provided. They
3957 * also supply outbound data--parameters for the object
3958 * method. Currently if this is present it will be a
3962 if (outbound_size > PAGE_SIZE)
3965 req_page = alloc_page(GFP_KERNEL);
3969 memcpy(page_address(req_page), outbound, outbound_size);
3972 reply_page = alloc_page(GFP_KERNEL);
3975 __free_page(req_page);
3979 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3980 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3981 reply_page, &inbound_size);
3983 memcpy(inbound, page_address(reply_page), inbound_size);
3988 __free_page(req_page);
3989 __free_page(reply_page);
3994 * lock_rwsem must be held for read
3996 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4002 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4003 * and cancel_delayed_work() in wake_requests().
4005 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4006 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4007 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4008 TASK_UNINTERRUPTIBLE);
4009 up_read(&rbd_dev->lock_rwsem);
4011 down_read(&rbd_dev->lock_rwsem);
4012 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4013 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4015 finish_wait(&rbd_dev->lock_waitq, &wait);
4018 static void rbd_queue_workfn(struct work_struct *work)
4020 struct request *rq = blk_mq_rq_from_pdu(work);
4021 struct rbd_device *rbd_dev = rq->q->queuedata;
4022 struct rbd_img_request *img_request;
4023 struct ceph_snap_context *snapc = NULL;
4024 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4025 u64 length = blk_rq_bytes(rq);
4026 enum obj_operation_type op_type;
4028 bool must_be_locked;
4031 switch (req_op(rq)) {
4032 case REQ_OP_DISCARD:
4033 case REQ_OP_WRITE_ZEROES:
4034 op_type = OBJ_OP_DISCARD;
4037 op_type = OBJ_OP_WRITE;
4040 op_type = OBJ_OP_READ;
4043 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4048 /* Ignore/skip any zero-length requests */
4051 dout("%s: zero-length request\n", __func__);
4056 /* Only reads are allowed to a read-only device */
4058 if (op_type != OBJ_OP_READ) {
4059 if (rbd_dev->mapping.read_only) {
4063 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4067 * Quit early if the mapped snapshot no longer exists. It's
4068 * still possible the snapshot will have disappeared by the
4069 * time our request arrives at the osd, but there's no sense in
4070 * sending it if we already know.
4072 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4073 dout("request for non-existent snapshot");
4074 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4079 if (offset && length > U64_MAX - offset + 1) {
4080 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4083 goto err_rq; /* Shouldn't happen */
4086 blk_mq_start_request(rq);
4088 down_read(&rbd_dev->header_rwsem);
4089 mapping_size = rbd_dev->mapping.size;
4090 if (op_type != OBJ_OP_READ) {
4091 snapc = rbd_dev->header.snapc;
4092 ceph_get_snap_context(snapc);
4094 up_read(&rbd_dev->header_rwsem);
4096 if (offset + length > mapping_size) {
4097 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4098 length, mapping_size);
4104 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4105 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4106 if (must_be_locked) {
4107 down_read(&rbd_dev->lock_rwsem);
4108 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4109 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4110 if (rbd_dev->opts->exclusive) {
4111 rbd_warn(rbd_dev, "exclusive lock required");
4115 rbd_wait_state_locked(rbd_dev);
4117 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4118 result = -EBLACKLISTED;
4123 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4129 img_request->rq = rq;
4130 snapc = NULL; /* img_request consumes a ref */
4132 if (op_type == OBJ_OP_DISCARD)
4133 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4136 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4139 goto err_img_request;
4141 result = rbd_img_request_submit(img_request);
4143 goto err_img_request;
4146 up_read(&rbd_dev->lock_rwsem);
4150 rbd_img_request_put(img_request);
4153 up_read(&rbd_dev->lock_rwsem);
4156 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4157 obj_op_name(op_type), length, offset, result);
4158 ceph_put_snap_context(snapc);
4160 blk_mq_end_request(rq, errno_to_blk_status(result));
4163 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4164 const struct blk_mq_queue_data *bd)
4166 struct request *rq = bd->rq;
4167 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4169 queue_work(rbd_wq, work);
4173 static void rbd_free_disk(struct rbd_device *rbd_dev)
4175 blk_cleanup_queue(rbd_dev->disk->queue);
4176 blk_mq_free_tag_set(&rbd_dev->tag_set);
4177 put_disk(rbd_dev->disk);
4178 rbd_dev->disk = NULL;
4181 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4182 struct ceph_object_id *oid,
4183 struct ceph_object_locator *oloc,
4184 void *buf, int buf_len)
4187 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4188 struct ceph_osd_request *req;
4189 struct page **pages;
4190 int num_pages = calc_pages_for(0, buf_len);
4193 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4197 ceph_oid_copy(&req->r_base_oid, oid);
4198 ceph_oloc_copy(&req->r_base_oloc, oloc);
4199 req->r_flags = CEPH_OSD_FLAG_READ;
4201 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4205 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4206 if (IS_ERR(pages)) {
4207 ret = PTR_ERR(pages);
4211 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4212 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4215 ceph_osdc_start_request(osdc, req, false);
4216 ret = ceph_osdc_wait_request(osdc, req);
4218 ceph_copy_from_page_vector(pages, buf, 0, ret);
4221 ceph_osdc_put_request(req);
4226 * Read the complete header for the given rbd device. On successful
4227 * return, the rbd_dev->header field will contain up-to-date
4228 * information about the image.
4230 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4232 struct rbd_image_header_ondisk *ondisk = NULL;
4239 * The complete header will include an array of its 64-bit
4240 * snapshot ids, followed by the names of those snapshots as
4241 * a contiguous block of NUL-terminated strings. Note that
4242 * the number of snapshots could change by the time we read
4243 * it in, in which case we re-read it.
4250 size = sizeof (*ondisk);
4251 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4253 ondisk = kmalloc(size, GFP_KERNEL);
4257 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4258 &rbd_dev->header_oloc, ondisk, size);
4261 if ((size_t)ret < size) {
4263 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4267 if (!rbd_dev_ondisk_valid(ondisk)) {
4269 rbd_warn(rbd_dev, "invalid header");
4273 names_size = le64_to_cpu(ondisk->snap_names_len);
4274 want_count = snap_count;
4275 snap_count = le32_to_cpu(ondisk->snap_count);
4276 } while (snap_count != want_count);
4278 ret = rbd_header_from_disk(rbd_dev, ondisk);
4286 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4287 * has disappeared from the (just updated) snapshot context.
4289 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4293 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4296 snap_id = rbd_dev->spec->snap_id;
4297 if (snap_id == CEPH_NOSNAP)
4300 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4301 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4304 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4309 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4310 * try to update its size. If REMOVING is set, updating size
4311 * is just useless work since the device can't be opened.
4313 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4314 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4315 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4316 dout("setting size to %llu sectors", (unsigned long long)size);
4317 set_capacity(rbd_dev->disk, size);
4318 revalidate_disk(rbd_dev->disk);
4322 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4327 down_write(&rbd_dev->header_rwsem);
4328 mapping_size = rbd_dev->mapping.size;
4330 ret = rbd_dev_header_info(rbd_dev);
4335 * If there is a parent, see if it has disappeared due to the
4336 * mapped image getting flattened.
4338 if (rbd_dev->parent) {
4339 ret = rbd_dev_v2_parent_info(rbd_dev);
4344 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4345 rbd_dev->mapping.size = rbd_dev->header.image_size;
4347 /* validate mapped snapshot's EXISTS flag */
4348 rbd_exists_validate(rbd_dev);
4352 up_write(&rbd_dev->header_rwsem);
4353 if (!ret && mapping_size != rbd_dev->mapping.size)
4354 rbd_dev_update_size(rbd_dev);
4359 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4360 unsigned int hctx_idx, unsigned int numa_node)
4362 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4364 INIT_WORK(work, rbd_queue_workfn);
4368 static const struct blk_mq_ops rbd_mq_ops = {
4369 .queue_rq = rbd_queue_rq,
4370 .init_request = rbd_init_request,
4373 static int rbd_init_disk(struct rbd_device *rbd_dev)
4375 struct gendisk *disk;
4376 struct request_queue *q;
4380 /* create gendisk info */
4381 disk = alloc_disk(single_major ?
4382 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4383 RBD_MINORS_PER_MAJOR);
4387 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4389 disk->major = rbd_dev->major;
4390 disk->first_minor = rbd_dev->minor;
4392 disk->flags |= GENHD_FL_EXT_DEVT;
4393 disk->fops = &rbd_bd_ops;
4394 disk->private_data = rbd_dev;
4396 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4397 rbd_dev->tag_set.ops = &rbd_mq_ops;
4398 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4399 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4400 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4401 rbd_dev->tag_set.nr_hw_queues = 1;
4402 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4404 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4408 q = blk_mq_init_queue(&rbd_dev->tag_set);
4414 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4415 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4417 /* set io sizes to object size */
4418 segment_size = rbd_obj_bytes(&rbd_dev->header);
4419 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4420 q->limits.max_sectors = queue_max_hw_sectors(q);
4421 blk_queue_max_segments(q, USHRT_MAX);
4422 blk_queue_max_segment_size(q, segment_size);
4423 blk_queue_io_min(q, segment_size);
4424 blk_queue_io_opt(q, segment_size);
4426 /* enable the discard support */
4427 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4428 q->limits.discard_granularity = segment_size;
4429 q->limits.discard_alignment = segment_size;
4430 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4431 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4433 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4434 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4437 * disk_release() expects a queue ref from add_disk() and will
4438 * put it. Hold an extra ref until add_disk() is called.
4440 WARN_ON(!blk_get_queue(q));
4442 q->queuedata = rbd_dev;
4444 rbd_dev->disk = disk;
4448 blk_mq_free_tag_set(&rbd_dev->tag_set);
4458 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4460 return container_of(dev, struct rbd_device, dev);
4463 static ssize_t rbd_size_show(struct device *dev,
4464 struct device_attribute *attr, char *buf)
4466 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4468 return sprintf(buf, "%llu\n",
4469 (unsigned long long)rbd_dev->mapping.size);
4473 * Note this shows the features for whatever's mapped, which is not
4474 * necessarily the base image.
4476 static ssize_t rbd_features_show(struct device *dev,
4477 struct device_attribute *attr, char *buf)
4479 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4481 return sprintf(buf, "0x%016llx\n",
4482 (unsigned long long)rbd_dev->mapping.features);
4485 static ssize_t rbd_major_show(struct device *dev,
4486 struct device_attribute *attr, char *buf)
4488 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4491 return sprintf(buf, "%d\n", rbd_dev->major);
4493 return sprintf(buf, "(none)\n");
4496 static ssize_t rbd_minor_show(struct device *dev,
4497 struct device_attribute *attr, char *buf)
4499 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4501 return sprintf(buf, "%d\n", rbd_dev->minor);
4504 static ssize_t rbd_client_addr_show(struct device *dev,
4505 struct device_attribute *attr, char *buf)
4507 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4508 struct ceph_entity_addr *client_addr =
4509 ceph_client_addr(rbd_dev->rbd_client->client);
4511 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4512 le32_to_cpu(client_addr->nonce));
4515 static ssize_t rbd_client_id_show(struct device *dev,
4516 struct device_attribute *attr, char *buf)
4518 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4520 return sprintf(buf, "client%lld\n",
4521 ceph_client_gid(rbd_dev->rbd_client->client));
4524 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4525 struct device_attribute *attr, char *buf)
4527 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4529 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4532 static ssize_t rbd_config_info_show(struct device *dev,
4533 struct device_attribute *attr, char *buf)
4535 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4537 if (!capable(CAP_SYS_ADMIN))
4540 return sprintf(buf, "%s\n", rbd_dev->config_info);
4543 static ssize_t rbd_pool_show(struct device *dev,
4544 struct device_attribute *attr, char *buf)
4546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4548 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4551 static ssize_t rbd_pool_id_show(struct device *dev,
4552 struct device_attribute *attr, char *buf)
4554 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4556 return sprintf(buf, "%llu\n",
4557 (unsigned long long) rbd_dev->spec->pool_id);
4560 static ssize_t rbd_name_show(struct device *dev,
4561 struct device_attribute *attr, char *buf)
4563 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4565 if (rbd_dev->spec->image_name)
4566 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4568 return sprintf(buf, "(unknown)\n");
4571 static ssize_t rbd_image_id_show(struct device *dev,
4572 struct device_attribute *attr, char *buf)
4574 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4576 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4580 * Shows the name of the currently-mapped snapshot (or
4581 * RBD_SNAP_HEAD_NAME for the base image).
4583 static ssize_t rbd_snap_show(struct device *dev,
4584 struct device_attribute *attr,
4587 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4589 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4592 static ssize_t rbd_snap_id_show(struct device *dev,
4593 struct device_attribute *attr, char *buf)
4595 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4597 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4601 * For a v2 image, shows the chain of parent images, separated by empty
4602 * lines. For v1 images or if there is no parent, shows "(no parent
4605 static ssize_t rbd_parent_show(struct device *dev,
4606 struct device_attribute *attr,
4609 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4612 if (!rbd_dev->parent)
4613 return sprintf(buf, "(no parent image)\n");
4615 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4616 struct rbd_spec *spec = rbd_dev->parent_spec;
4618 count += sprintf(&buf[count], "%s"
4619 "pool_id %llu\npool_name %s\n"
4620 "image_id %s\nimage_name %s\n"
4621 "snap_id %llu\nsnap_name %s\n"
4623 !count ? "" : "\n", /* first? */
4624 spec->pool_id, spec->pool_name,
4625 spec->image_id, spec->image_name ?: "(unknown)",
4626 spec->snap_id, spec->snap_name,
4627 rbd_dev->parent_overlap);
4633 static ssize_t rbd_image_refresh(struct device *dev,
4634 struct device_attribute *attr,
4638 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4641 if (!capable(CAP_SYS_ADMIN))
4644 ret = rbd_dev_refresh(rbd_dev);
4651 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4652 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4653 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4654 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4655 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4656 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4657 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4658 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4659 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4660 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4661 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4662 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4663 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4664 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4665 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4666 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4668 static struct attribute *rbd_attrs[] = {
4669 &dev_attr_size.attr,
4670 &dev_attr_features.attr,
4671 &dev_attr_major.attr,
4672 &dev_attr_minor.attr,
4673 &dev_attr_client_addr.attr,
4674 &dev_attr_client_id.attr,
4675 &dev_attr_cluster_fsid.attr,
4676 &dev_attr_config_info.attr,
4677 &dev_attr_pool.attr,
4678 &dev_attr_pool_id.attr,
4679 &dev_attr_name.attr,
4680 &dev_attr_image_id.attr,
4681 &dev_attr_current_snap.attr,
4682 &dev_attr_snap_id.attr,
4683 &dev_attr_parent.attr,
4684 &dev_attr_refresh.attr,
4688 static struct attribute_group rbd_attr_group = {
4692 static const struct attribute_group *rbd_attr_groups[] = {
4697 static void rbd_dev_release(struct device *dev);
4699 static const struct device_type rbd_device_type = {
4701 .groups = rbd_attr_groups,
4702 .release = rbd_dev_release,
4705 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4707 kref_get(&spec->kref);
4712 static void rbd_spec_free(struct kref *kref);
4713 static void rbd_spec_put(struct rbd_spec *spec)
4716 kref_put(&spec->kref, rbd_spec_free);
4719 static struct rbd_spec *rbd_spec_alloc(void)
4721 struct rbd_spec *spec;
4723 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4727 spec->pool_id = CEPH_NOPOOL;
4728 spec->snap_id = CEPH_NOSNAP;
4729 kref_init(&spec->kref);
4734 static void rbd_spec_free(struct kref *kref)
4736 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4738 kfree(spec->pool_name);
4739 kfree(spec->image_id);
4740 kfree(spec->image_name);
4741 kfree(spec->snap_name);
4745 static void rbd_dev_free(struct rbd_device *rbd_dev)
4747 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4748 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4750 ceph_oid_destroy(&rbd_dev->header_oid);
4751 ceph_oloc_destroy(&rbd_dev->header_oloc);
4752 kfree(rbd_dev->config_info);
4754 rbd_put_client(rbd_dev->rbd_client);
4755 rbd_spec_put(rbd_dev->spec);
4756 kfree(rbd_dev->opts);
4760 static void rbd_dev_release(struct device *dev)
4762 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4763 bool need_put = !!rbd_dev->opts;
4766 destroy_workqueue(rbd_dev->task_wq);
4767 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4770 rbd_dev_free(rbd_dev);
4773 * This is racy, but way better than putting module outside of
4774 * the release callback. The race window is pretty small, so
4775 * doing something similar to dm (dm-builtin.c) is overkill.
4778 module_put(THIS_MODULE);
4781 static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec)
4783 struct rbd_device *rbd_dev;
4785 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4789 spin_lock_init(&rbd_dev->lock);
4790 INIT_LIST_HEAD(&rbd_dev->node);
4791 init_rwsem(&rbd_dev->header_rwsem);
4793 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4794 ceph_oid_init(&rbd_dev->header_oid);
4795 rbd_dev->header_oloc.pool = spec->pool_id;
4797 mutex_init(&rbd_dev->watch_mutex);
4798 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4799 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4801 init_rwsem(&rbd_dev->lock_rwsem);
4802 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4803 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4804 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4805 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4806 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4807 init_waitqueue_head(&rbd_dev->lock_waitq);
4809 rbd_dev->dev.bus = &rbd_bus_type;
4810 rbd_dev->dev.type = &rbd_device_type;
4811 rbd_dev->dev.parent = &rbd_root_dev;
4812 device_initialize(&rbd_dev->dev);
4818 * Create a mapping rbd_dev.
4820 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4821 struct rbd_spec *spec,
4822 struct rbd_options *opts)
4824 struct rbd_device *rbd_dev;
4826 rbd_dev = __rbd_dev_create(spec);
4830 /* get an id and fill in device name */
4831 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4832 minor_to_rbd_dev_id(1 << MINORBITS),
4834 if (rbd_dev->dev_id < 0)
4837 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4838 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4840 if (!rbd_dev->task_wq)
4843 /* we have a ref from do_rbd_add() */
4844 __module_get(THIS_MODULE);
4846 rbd_dev->rbd_client = rbdc;
4847 rbd_dev->spec = spec;
4848 rbd_dev->opts = opts;
4850 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4854 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4856 rbd_dev_free(rbd_dev);
4860 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4863 put_device(&rbd_dev->dev);
4867 * Get the size and object order for an image snapshot, or if
4868 * snap_id is CEPH_NOSNAP, gets this information for the base
4871 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4872 u8 *order, u64 *snap_size)
4874 __le64 snapid = cpu_to_le64(snap_id);
4879 } __attribute__ ((packed)) size_buf = { 0 };
4881 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4882 &rbd_dev->header_oloc, "get_size",
4883 &snapid, sizeof(snapid),
4884 &size_buf, sizeof(size_buf));
4885 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4888 if (ret < sizeof (size_buf))
4892 *order = size_buf.order;
4893 dout(" order %u", (unsigned int)*order);
4895 *snap_size = le64_to_cpu(size_buf.size);
4897 dout(" snap_id 0x%016llx snap_size = %llu\n",
4898 (unsigned long long)snap_id,
4899 (unsigned long long)*snap_size);
4904 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4906 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4907 &rbd_dev->header.obj_order,
4908 &rbd_dev->header.image_size);
4911 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4917 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4921 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4922 &rbd_dev->header_oloc, "get_object_prefix",
4923 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4924 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4929 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4930 p + ret, NULL, GFP_NOIO);
4933 if (IS_ERR(rbd_dev->header.object_prefix)) {
4934 ret = PTR_ERR(rbd_dev->header.object_prefix);
4935 rbd_dev->header.object_prefix = NULL;
4937 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4945 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4948 __le64 snapid = cpu_to_le64(snap_id);
4952 } __attribute__ ((packed)) features_buf = { 0 };
4956 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4957 &rbd_dev->header_oloc, "get_features",
4958 &snapid, sizeof(snapid),
4959 &features_buf, sizeof(features_buf));
4960 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4963 if (ret < sizeof (features_buf))
4966 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4968 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4973 *snap_features = le64_to_cpu(features_buf.features);
4975 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4976 (unsigned long long)snap_id,
4977 (unsigned long long)*snap_features,
4978 (unsigned long long)le64_to_cpu(features_buf.incompat));
4983 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4985 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4986 &rbd_dev->header.features);
4989 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4991 struct rbd_spec *parent_spec;
4993 void *reply_buf = NULL;
5003 parent_spec = rbd_spec_alloc();
5007 size = sizeof (__le64) + /* pool_id */
5008 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5009 sizeof (__le64) + /* snap_id */
5010 sizeof (__le64); /* overlap */
5011 reply_buf = kmalloc(size, GFP_KERNEL);
5017 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5018 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5019 &rbd_dev->header_oloc, "get_parent",
5020 &snapid, sizeof(snapid), reply_buf, size);
5021 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5026 end = reply_buf + ret;
5028 ceph_decode_64_safe(&p, end, pool_id, out_err);
5029 if (pool_id == CEPH_NOPOOL) {
5031 * Either the parent never existed, or we have
5032 * record of it but the image got flattened so it no
5033 * longer has a parent. When the parent of a
5034 * layered image disappears we immediately set the
5035 * overlap to 0. The effect of this is that all new
5036 * requests will be treated as if the image had no
5039 if (rbd_dev->parent_overlap) {
5040 rbd_dev->parent_overlap = 0;
5041 rbd_dev_parent_put(rbd_dev);
5042 pr_info("%s: clone image has been flattened\n",
5043 rbd_dev->disk->disk_name);
5046 goto out; /* No parent? No problem. */
5049 /* The ceph file layout needs to fit pool id in 32 bits */
5052 if (pool_id > (u64)U32_MAX) {
5053 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5054 (unsigned long long)pool_id, U32_MAX);
5058 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5059 if (IS_ERR(image_id)) {
5060 ret = PTR_ERR(image_id);
5063 ceph_decode_64_safe(&p, end, snap_id, out_err);
5064 ceph_decode_64_safe(&p, end, overlap, out_err);
5067 * The parent won't change (except when the clone is
5068 * flattened, already handled that). So we only need to
5069 * record the parent spec we have not already done so.
5071 if (!rbd_dev->parent_spec) {
5072 parent_spec->pool_id = pool_id;
5073 parent_spec->image_id = image_id;
5074 parent_spec->snap_id = snap_id;
5075 rbd_dev->parent_spec = parent_spec;
5076 parent_spec = NULL; /* rbd_dev now owns this */
5082 * We always update the parent overlap. If it's zero we issue
5083 * a warning, as we will proceed as if there was no parent.
5087 /* refresh, careful to warn just once */
5088 if (rbd_dev->parent_overlap)
5090 "clone now standalone (overlap became 0)");
5093 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5096 rbd_dev->parent_overlap = overlap;
5102 rbd_spec_put(parent_spec);
5107 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5111 __le64 stripe_count;
5112 } __attribute__ ((packed)) striping_info_buf = { 0 };
5113 size_t size = sizeof (striping_info_buf);
5120 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5121 &rbd_dev->header_oloc, "get_stripe_unit_count",
5122 NULL, 0, &striping_info_buf, size);
5123 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5130 * We don't actually support the "fancy striping" feature
5131 * (STRIPINGV2) yet, but if the striping sizes are the
5132 * defaults the behavior is the same as before. So find
5133 * out, and only fail if the image has non-default values.
5136 obj_size = rbd_obj_bytes(&rbd_dev->header);
5137 p = &striping_info_buf;
5138 stripe_unit = ceph_decode_64(&p);
5139 if (stripe_unit != obj_size) {
5140 rbd_warn(rbd_dev, "unsupported stripe unit "
5141 "(got %llu want %llu)",
5142 stripe_unit, obj_size);
5145 stripe_count = ceph_decode_64(&p);
5146 if (stripe_count != 1) {
5147 rbd_warn(rbd_dev, "unsupported stripe count "
5148 "(got %llu want 1)", stripe_count);
5151 rbd_dev->header.stripe_unit = stripe_unit;
5152 rbd_dev->header.stripe_count = stripe_count;
5157 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5159 __le64 data_pool_id;
5162 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5163 &rbd_dev->header_oloc, "get_data_pool",
5164 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5167 if (ret < sizeof(data_pool_id))
5170 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5171 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5175 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5177 CEPH_DEFINE_OID_ONSTACK(oid);
5178 size_t image_id_size;
5183 void *reply_buf = NULL;
5185 char *image_name = NULL;
5188 rbd_assert(!rbd_dev->spec->image_name);
5190 len = strlen(rbd_dev->spec->image_id);
5191 image_id_size = sizeof (__le32) + len;
5192 image_id = kmalloc(image_id_size, GFP_KERNEL);
5197 end = image_id + image_id_size;
5198 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5200 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5201 reply_buf = kmalloc(size, GFP_KERNEL);
5205 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5206 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5207 "dir_get_name", image_id, image_id_size,
5212 end = reply_buf + ret;
5214 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5215 if (IS_ERR(image_name))
5218 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5226 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5228 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5229 const char *snap_name;
5232 /* Skip over names until we find the one we are looking for */
5234 snap_name = rbd_dev->header.snap_names;
5235 while (which < snapc->num_snaps) {
5236 if (!strcmp(name, snap_name))
5237 return snapc->snaps[which];
5238 snap_name += strlen(snap_name) + 1;
5244 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5246 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5251 for (which = 0; !found && which < snapc->num_snaps; which++) {
5252 const char *snap_name;
5254 snap_id = snapc->snaps[which];
5255 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5256 if (IS_ERR(snap_name)) {
5257 /* ignore no-longer existing snapshots */
5258 if (PTR_ERR(snap_name) == -ENOENT)
5263 found = !strcmp(name, snap_name);
5266 return found ? snap_id : CEPH_NOSNAP;
5270 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5271 * no snapshot by that name is found, or if an error occurs.
5273 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5275 if (rbd_dev->image_format == 1)
5276 return rbd_v1_snap_id_by_name(rbd_dev, name);
5278 return rbd_v2_snap_id_by_name(rbd_dev, name);
5282 * An image being mapped will have everything but the snap id.
5284 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5286 struct rbd_spec *spec = rbd_dev->spec;
5288 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5289 rbd_assert(spec->image_id && spec->image_name);
5290 rbd_assert(spec->snap_name);
5292 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5295 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5296 if (snap_id == CEPH_NOSNAP)
5299 spec->snap_id = snap_id;
5301 spec->snap_id = CEPH_NOSNAP;
5308 * A parent image will have all ids but none of the names.
5310 * All names in an rbd spec are dynamically allocated. It's OK if we
5311 * can't figure out the name for an image id.
5313 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5315 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5316 struct rbd_spec *spec = rbd_dev->spec;
5317 const char *pool_name;
5318 const char *image_name;
5319 const char *snap_name;
5322 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5323 rbd_assert(spec->image_id);
5324 rbd_assert(spec->snap_id != CEPH_NOSNAP);
5326 /* Get the pool name; we have to make our own copy of this */
5328 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5330 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5333 pool_name = kstrdup(pool_name, GFP_KERNEL);
5337 /* Fetch the image name; tolerate failure here */
5339 image_name = rbd_dev_image_name(rbd_dev);
5341 rbd_warn(rbd_dev, "unable to get image name");
5343 /* Fetch the snapshot name */
5345 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5346 if (IS_ERR(snap_name)) {
5347 ret = PTR_ERR(snap_name);
5351 spec->pool_name = pool_name;
5352 spec->image_name = image_name;
5353 spec->snap_name = snap_name;
5363 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5372 struct ceph_snap_context *snapc;
5376 * We'll need room for the seq value (maximum snapshot id),
5377 * snapshot count, and array of that many snapshot ids.
5378 * For now we have a fixed upper limit on the number we're
5379 * prepared to receive.
5381 size = sizeof (__le64) + sizeof (__le32) +
5382 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5383 reply_buf = kzalloc(size, GFP_KERNEL);
5387 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5388 &rbd_dev->header_oloc, "get_snapcontext",
5389 NULL, 0, reply_buf, size);
5390 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5395 end = reply_buf + ret;
5397 ceph_decode_64_safe(&p, end, seq, out);
5398 ceph_decode_32_safe(&p, end, snap_count, out);
5401 * Make sure the reported number of snapshot ids wouldn't go
5402 * beyond the end of our buffer. But before checking that,
5403 * make sure the computed size of the snapshot context we
5404 * allocate is representable in a size_t.
5406 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5411 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5415 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5421 for (i = 0; i < snap_count; i++)
5422 snapc->snaps[i] = ceph_decode_64(&p);
5424 ceph_put_snap_context(rbd_dev->header.snapc);
5425 rbd_dev->header.snapc = snapc;
5427 dout(" snap context seq = %llu, snap_count = %u\n",
5428 (unsigned long long)seq, (unsigned int)snap_count);
5435 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5446 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5447 reply_buf = kmalloc(size, GFP_KERNEL);
5449 return ERR_PTR(-ENOMEM);
5451 snapid = cpu_to_le64(snap_id);
5452 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5453 &rbd_dev->header_oloc, "get_snapshot_name",
5454 &snapid, sizeof(snapid), reply_buf, size);
5455 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5457 snap_name = ERR_PTR(ret);
5462 end = reply_buf + ret;
5463 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5464 if (IS_ERR(snap_name))
5467 dout(" snap_id 0x%016llx snap_name = %s\n",
5468 (unsigned long long)snap_id, snap_name);
5475 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5477 bool first_time = rbd_dev->header.object_prefix == NULL;
5480 ret = rbd_dev_v2_image_size(rbd_dev);
5485 ret = rbd_dev_v2_header_onetime(rbd_dev);
5490 ret = rbd_dev_v2_snap_context(rbd_dev);
5491 if (ret && first_time) {
5492 kfree(rbd_dev->header.object_prefix);
5493 rbd_dev->header.object_prefix = NULL;
5499 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5501 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5503 if (rbd_dev->image_format == 1)
5504 return rbd_dev_v1_header_info(rbd_dev);
5506 return rbd_dev_v2_header_info(rbd_dev);
5510 * Skips over white space at *buf, and updates *buf to point to the
5511 * first found non-space character (if any). Returns the length of
5512 * the token (string of non-white space characters) found. Note
5513 * that *buf must be terminated with '\0'.
5515 static inline size_t next_token(const char **buf)
5518 * These are the characters that produce nonzero for
5519 * isspace() in the "C" and "POSIX" locales.
5521 const char *spaces = " \f\n\r\t\v";
5523 *buf += strspn(*buf, spaces); /* Find start of token */
5525 return strcspn(*buf, spaces); /* Return token length */
5529 * Finds the next token in *buf, dynamically allocates a buffer big
5530 * enough to hold a copy of it, and copies the token into the new
5531 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5532 * that a duplicate buffer is created even for a zero-length token.
5534 * Returns a pointer to the newly-allocated duplicate, or a null
5535 * pointer if memory for the duplicate was not available. If
5536 * the lenp argument is a non-null pointer, the length of the token
5537 * (not including the '\0') is returned in *lenp.
5539 * If successful, the *buf pointer will be updated to point beyond
5540 * the end of the found token.
5542 * Note: uses GFP_KERNEL for allocation.
5544 static inline char *dup_token(const char **buf, size_t *lenp)
5549 len = next_token(buf);
5550 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5553 *(dup + len) = '\0';
5563 * Parse the options provided for an "rbd add" (i.e., rbd image
5564 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5565 * and the data written is passed here via a NUL-terminated buffer.
5566 * Returns 0 if successful or an error code otherwise.
5568 * The information extracted from these options is recorded in
5569 * the other parameters which return dynamically-allocated
5572 * The address of a pointer that will refer to a ceph options
5573 * structure. Caller must release the returned pointer using
5574 * ceph_destroy_options() when it is no longer needed.
5576 * Address of an rbd options pointer. Fully initialized by
5577 * this function; caller must release with kfree().
5579 * Address of an rbd image specification pointer. Fully
5580 * initialized by this function based on parsed options.
5581 * Caller must release with rbd_spec_put().
5583 * The options passed take this form:
5584 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5587 * A comma-separated list of one or more monitor addresses.
5588 * A monitor address is an ip address, optionally followed
5589 * by a port number (separated by a colon).
5590 * I.e.: ip1[:port1][,ip2[:port2]...]
5592 * A comma-separated list of ceph and/or rbd options.
5594 * The name of the rados pool containing the rbd image.
5596 * The name of the image in that pool to map.
5598 * An optional snapshot id. If provided, the mapping will
5599 * present data from the image at the time that snapshot was
5600 * created. The image head is used if no snapshot id is
5601 * provided. Snapshot mappings are always read-only.
5603 static int rbd_add_parse_args(const char *buf,
5604 struct ceph_options **ceph_opts,
5605 struct rbd_options **opts,
5606 struct rbd_spec **rbd_spec)
5610 const char *mon_addrs;
5612 size_t mon_addrs_size;
5613 struct rbd_spec *spec = NULL;
5614 struct rbd_options *rbd_opts = NULL;
5615 struct ceph_options *copts;
5618 /* The first four tokens are required */
5620 len = next_token(&buf);
5622 rbd_warn(NULL, "no monitor address(es) provided");
5626 mon_addrs_size = len + 1;
5630 options = dup_token(&buf, NULL);
5634 rbd_warn(NULL, "no options provided");
5638 spec = rbd_spec_alloc();
5642 spec->pool_name = dup_token(&buf, NULL);
5643 if (!spec->pool_name)
5645 if (!*spec->pool_name) {
5646 rbd_warn(NULL, "no pool name provided");
5650 spec->image_name = dup_token(&buf, NULL);
5651 if (!spec->image_name)
5653 if (!*spec->image_name) {
5654 rbd_warn(NULL, "no image name provided");
5659 * Snapshot name is optional; default is to use "-"
5660 * (indicating the head/no snapshot).
5662 len = next_token(&buf);
5664 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5665 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5666 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5667 ret = -ENAMETOOLONG;
5670 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5673 *(snap_name + len) = '\0';
5674 spec->snap_name = snap_name;
5676 /* Initialize all rbd options to the defaults */
5678 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5682 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5683 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5684 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5685 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5687 copts = ceph_parse_options(options, mon_addrs,
5688 mon_addrs + mon_addrs_size - 1,
5689 parse_rbd_opts_token, rbd_opts);
5690 if (IS_ERR(copts)) {
5691 ret = PTR_ERR(copts);
5712 * Return pool id (>= 0) or a negative error code.
5714 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5716 struct ceph_options *opts = rbdc->client->options;
5722 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5723 if (ret == -ENOENT && tries++ < 1) {
5724 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5729 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5730 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5731 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5733 opts->mount_timeout);
5736 /* the osdmap we have is new enough */
5744 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5746 down_write(&rbd_dev->lock_rwsem);
5747 if (__rbd_is_lock_owner(rbd_dev))
5748 rbd_unlock(rbd_dev);
5749 up_write(&rbd_dev->lock_rwsem);
5752 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5754 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5755 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5759 /* FIXME: "rbd map --exclusive" should be in interruptible */
5760 down_read(&rbd_dev->lock_rwsem);
5761 rbd_wait_state_locked(rbd_dev);
5762 up_read(&rbd_dev->lock_rwsem);
5763 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5764 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5772 * An rbd format 2 image has a unique identifier, distinct from the
5773 * name given to it by the user. Internally, that identifier is
5774 * what's used to specify the names of objects related to the image.
5776 * A special "rbd id" object is used to map an rbd image name to its
5777 * id. If that object doesn't exist, then there is no v2 rbd image
5778 * with the supplied name.
5780 * This function will record the given rbd_dev's image_id field if
5781 * it can be determined, and in that case will return 0. If any
5782 * errors occur a negative errno will be returned and the rbd_dev's
5783 * image_id field will be unchanged (and should be NULL).
5785 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5789 CEPH_DEFINE_OID_ONSTACK(oid);
5794 * When probing a parent image, the image id is already
5795 * known (and the image name likely is not). There's no
5796 * need to fetch the image id again in this case. We
5797 * do still need to set the image format though.
5799 if (rbd_dev->spec->image_id) {
5800 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5806 * First, see if the format 2 image id file exists, and if
5807 * so, get the image's persistent id from it.
5809 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5810 rbd_dev->spec->image_name);
5814 dout("rbd id object name is %s\n", oid.name);
5816 /* Response will be an encoded string, which includes a length */
5818 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5819 response = kzalloc(size, GFP_NOIO);
5825 /* If it doesn't exist we'll assume it's a format 1 image */
5827 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5829 response, RBD_IMAGE_ID_LEN_MAX);
5830 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5831 if (ret == -ENOENT) {
5832 image_id = kstrdup("", GFP_KERNEL);
5833 ret = image_id ? 0 : -ENOMEM;
5835 rbd_dev->image_format = 1;
5836 } else if (ret >= 0) {
5839 image_id = ceph_extract_encoded_string(&p, p + ret,
5841 ret = PTR_ERR_OR_ZERO(image_id);
5843 rbd_dev->image_format = 2;
5847 rbd_dev->spec->image_id = image_id;
5848 dout("image_id is %s\n", image_id);
5852 ceph_oid_destroy(&oid);
5857 * Undo whatever state changes are made by v1 or v2 header info
5860 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5862 struct rbd_image_header *header;
5864 rbd_dev_parent_put(rbd_dev);
5866 /* Free dynamic fields from the header, then zero it out */
5868 header = &rbd_dev->header;
5869 ceph_put_snap_context(header->snapc);
5870 kfree(header->snap_sizes);
5871 kfree(header->snap_names);
5872 kfree(header->object_prefix);
5873 memset(header, 0, sizeof (*header));
5876 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5880 ret = rbd_dev_v2_object_prefix(rbd_dev);
5885 * Get the and check features for the image. Currently the
5886 * features are assumed to never change.
5888 ret = rbd_dev_v2_features(rbd_dev);
5892 /* If the image supports fancy striping, get its parameters */
5894 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5895 ret = rbd_dev_v2_striping_info(rbd_dev);
5900 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5901 ret = rbd_dev_v2_data_pool(rbd_dev);
5906 rbd_init_layout(rbd_dev);
5910 rbd_dev->header.features = 0;
5911 kfree(rbd_dev->header.object_prefix);
5912 rbd_dev->header.object_prefix = NULL;
5917 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5918 * rbd_dev_image_probe() recursion depth, which means it's also the
5919 * length of the already discovered part of the parent chain.
5921 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5923 struct rbd_device *parent = NULL;
5926 if (!rbd_dev->parent_spec)
5929 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5930 pr_info("parent chain is too long (%d)\n", depth);
5935 parent = __rbd_dev_create(rbd_dev->parent_spec);
5942 * Images related by parent/child relationships always share
5943 * rbd_client and spec/parent_spec, so bump their refcounts.
5945 parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client);
5946 parent->spec = rbd_spec_get(rbd_dev->parent_spec);
5948 ret = rbd_dev_image_probe(parent, depth);
5952 rbd_dev->parent = parent;
5953 atomic_set(&rbd_dev->parent_ref, 1);
5957 rbd_dev_unparent(rbd_dev);
5958 rbd_dev_destroy(parent);
5962 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5964 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5965 rbd_dev_mapping_clear(rbd_dev);
5966 rbd_free_disk(rbd_dev);
5968 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5972 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5975 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5979 /* Record our major and minor device numbers. */
5981 if (!single_major) {
5982 ret = register_blkdev(0, rbd_dev->name);
5984 goto err_out_unlock;
5986 rbd_dev->major = ret;
5989 rbd_dev->major = rbd_major;
5990 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5993 /* Set up the blkdev mapping. */
5995 ret = rbd_init_disk(rbd_dev);
5997 goto err_out_blkdev;
5999 ret = rbd_dev_mapping_set(rbd_dev);
6003 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6004 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
6006 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6008 goto err_out_mapping;
6010 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6011 up_write(&rbd_dev->header_rwsem);
6015 rbd_dev_mapping_clear(rbd_dev);
6017 rbd_free_disk(rbd_dev);
6020 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6022 up_write(&rbd_dev->header_rwsem);
6026 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6028 struct rbd_spec *spec = rbd_dev->spec;
6031 /* Record the header object name for this rbd image. */
6033 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6034 if (rbd_dev->image_format == 1)
6035 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6036 spec->image_name, RBD_SUFFIX);
6038 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6039 RBD_HEADER_PREFIX, spec->image_id);
6044 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6047 rbd_unregister_watch(rbd_dev);
6049 rbd_dev_unprobe(rbd_dev);
6050 rbd_dev->image_format = 0;
6051 kfree(rbd_dev->spec->image_id);
6052 rbd_dev->spec->image_id = NULL;
6056 * Probe for the existence of the header object for the given rbd
6057 * device. If this image is the one being mapped (i.e., not a
6058 * parent), initiate a watch on its header object before using that
6059 * object to get detailed information about the rbd image.
6061 * On success, returns with header_rwsem held for write if called
6064 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6069 * Get the id from the image id object. Unless there's an
6070 * error, rbd_dev->spec->image_id will be filled in with
6071 * a dynamically-allocated string, and rbd_dev->image_format
6072 * will be set to either 1 or 2.
6074 ret = rbd_dev_image_id(rbd_dev);
6078 ret = rbd_dev_header_name(rbd_dev);
6080 goto err_out_format;
6083 ret = rbd_register_watch(rbd_dev);
6086 pr_info("image %s/%s does not exist\n",
6087 rbd_dev->spec->pool_name,
6088 rbd_dev->spec->image_name);
6089 goto err_out_format;
6094 down_write(&rbd_dev->header_rwsem);
6096 ret = rbd_dev_header_info(rbd_dev);
6101 * If this image is the one being mapped, we have pool name and
6102 * id, image name and id, and snap name - need to fill snap id.
6103 * Otherwise this is a parent image, identified by pool, image
6104 * and snap ids - need to fill in names for those ids.
6107 ret = rbd_spec_fill_snap_id(rbd_dev);
6109 ret = rbd_spec_fill_names(rbd_dev);
6112 pr_info("snap %s/%s@%s does not exist\n",
6113 rbd_dev->spec->pool_name,
6114 rbd_dev->spec->image_name,
6115 rbd_dev->spec->snap_name);
6119 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6120 ret = rbd_dev_v2_parent_info(rbd_dev);
6125 * Need to warn users if this image is the one being
6126 * mapped and has a parent.
6128 if (!depth && rbd_dev->parent_spec)
6130 "WARNING: kernel layering is EXPERIMENTAL!");
6133 ret = rbd_dev_probe_parent(rbd_dev, depth);
6137 dout("discovered format %u image, header name is %s\n",
6138 rbd_dev->image_format, rbd_dev->header_oid.name);
6143 up_write(&rbd_dev->header_rwsem);
6145 rbd_unregister_watch(rbd_dev);
6146 rbd_dev_unprobe(rbd_dev);
6148 rbd_dev->image_format = 0;
6149 kfree(rbd_dev->spec->image_id);
6150 rbd_dev->spec->image_id = NULL;
6154 static ssize_t do_rbd_add(struct bus_type *bus,
6158 struct rbd_device *rbd_dev = NULL;
6159 struct ceph_options *ceph_opts = NULL;
6160 struct rbd_options *rbd_opts = NULL;
6161 struct rbd_spec *spec = NULL;
6162 struct rbd_client *rbdc;
6166 if (!capable(CAP_SYS_ADMIN))
6169 if (!try_module_get(THIS_MODULE))
6172 /* parse add command */
6173 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6177 rbdc = rbd_get_client(ceph_opts);
6184 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6187 pr_info("pool %s does not exist\n", spec->pool_name);
6188 goto err_out_client;
6190 spec->pool_id = (u64)rc;
6192 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6195 goto err_out_client;
6197 rbdc = NULL; /* rbd_dev now owns this */
6198 spec = NULL; /* rbd_dev now owns this */
6199 rbd_opts = NULL; /* rbd_dev now owns this */
6201 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6202 if (!rbd_dev->config_info) {
6204 goto err_out_rbd_dev;
6207 rc = rbd_dev_image_probe(rbd_dev, 0);
6209 goto err_out_rbd_dev;
6211 /* If we are mapping a snapshot it must be marked read-only */
6213 read_only = rbd_dev->opts->read_only;
6214 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6216 rbd_dev->mapping.read_only = read_only;
6218 rc = rbd_dev_device_setup(rbd_dev);
6220 goto err_out_image_probe;
6222 if (rbd_dev->opts->exclusive) {
6223 rc = rbd_add_acquire_lock(rbd_dev);
6225 goto err_out_device_setup;
6228 /* Everything's ready. Announce the disk to the world. */
6230 rc = device_add(&rbd_dev->dev);
6232 goto err_out_image_lock;
6234 add_disk(rbd_dev->disk);
6235 /* see rbd_init_disk() */
6236 blk_put_queue(rbd_dev->disk->queue);
6238 spin_lock(&rbd_dev_list_lock);
6239 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6240 spin_unlock(&rbd_dev_list_lock);
6242 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6243 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6244 rbd_dev->header.features);
6247 module_put(THIS_MODULE);
6251 rbd_dev_image_unlock(rbd_dev);
6252 err_out_device_setup:
6253 rbd_dev_device_release(rbd_dev);
6254 err_out_image_probe:
6255 rbd_dev_image_release(rbd_dev);
6257 rbd_dev_destroy(rbd_dev);
6259 rbd_put_client(rbdc);
6266 static ssize_t rbd_add(struct bus_type *bus,
6273 return do_rbd_add(bus, buf, count);
6276 static ssize_t rbd_add_single_major(struct bus_type *bus,
6280 return do_rbd_add(bus, buf, count);
6283 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6285 while (rbd_dev->parent) {
6286 struct rbd_device *first = rbd_dev;
6287 struct rbd_device *second = first->parent;
6288 struct rbd_device *third;
6291 * Follow to the parent with no grandparent and
6294 while (second && (third = second->parent)) {
6299 rbd_dev_image_release(second);
6300 rbd_dev_destroy(second);
6301 first->parent = NULL;
6302 first->parent_overlap = 0;
6304 rbd_assert(first->parent_spec);
6305 rbd_spec_put(first->parent_spec);
6306 first->parent_spec = NULL;
6310 static ssize_t do_rbd_remove(struct bus_type *bus,
6314 struct rbd_device *rbd_dev = NULL;
6315 struct list_head *tmp;
6321 if (!capable(CAP_SYS_ADMIN))
6326 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6328 pr_err("dev_id out of range\n");
6331 if (opt_buf[0] != '\0') {
6332 if (!strcmp(opt_buf, "force")) {
6335 pr_err("bad remove option at '%s'\n", opt_buf);
6341 spin_lock(&rbd_dev_list_lock);
6342 list_for_each(tmp, &rbd_dev_list) {
6343 rbd_dev = list_entry(tmp, struct rbd_device, node);
6344 if (rbd_dev->dev_id == dev_id) {
6350 spin_lock_irq(&rbd_dev->lock);
6351 if (rbd_dev->open_count && !force)
6353 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6356 spin_unlock_irq(&rbd_dev->lock);
6358 spin_unlock(&rbd_dev_list_lock);
6364 * Prevent new IO from being queued and wait for existing
6365 * IO to complete/fail.
6367 blk_mq_freeze_queue(rbd_dev->disk->queue);
6368 blk_set_queue_dying(rbd_dev->disk->queue);
6371 del_gendisk(rbd_dev->disk);
6372 spin_lock(&rbd_dev_list_lock);
6373 list_del_init(&rbd_dev->node);
6374 spin_unlock(&rbd_dev_list_lock);
6375 device_del(&rbd_dev->dev);
6377 rbd_dev_image_unlock(rbd_dev);
6378 rbd_dev_device_release(rbd_dev);
6379 rbd_dev_image_release(rbd_dev);
6380 rbd_dev_destroy(rbd_dev);
6384 static ssize_t rbd_remove(struct bus_type *bus,
6391 return do_rbd_remove(bus, buf, count);
6394 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6398 return do_rbd_remove(bus, buf, count);
6402 * create control files in sysfs
6405 static int rbd_sysfs_init(void)
6409 ret = device_register(&rbd_root_dev);
6413 ret = bus_register(&rbd_bus_type);
6415 device_unregister(&rbd_root_dev);
6420 static void rbd_sysfs_cleanup(void)
6422 bus_unregister(&rbd_bus_type);
6423 device_unregister(&rbd_root_dev);
6426 static int rbd_slab_init(void)
6428 rbd_assert(!rbd_img_request_cache);
6429 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6430 if (!rbd_img_request_cache)
6433 rbd_assert(!rbd_obj_request_cache);
6434 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6435 if (!rbd_obj_request_cache)
6438 rbd_assert(!rbd_bio_clone);
6439 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6446 kmem_cache_destroy(rbd_obj_request_cache);
6447 rbd_obj_request_cache = NULL;
6449 kmem_cache_destroy(rbd_img_request_cache);
6450 rbd_img_request_cache = NULL;
6454 static void rbd_slab_exit(void)
6456 rbd_assert(rbd_obj_request_cache);
6457 kmem_cache_destroy(rbd_obj_request_cache);
6458 rbd_obj_request_cache = NULL;
6460 rbd_assert(rbd_img_request_cache);
6461 kmem_cache_destroy(rbd_img_request_cache);
6462 rbd_img_request_cache = NULL;
6464 rbd_assert(rbd_bio_clone);
6465 bioset_free(rbd_bio_clone);
6466 rbd_bio_clone = NULL;
6469 static int __init rbd_init(void)
6473 if (!libceph_compatible(NULL)) {
6474 rbd_warn(NULL, "libceph incompatibility (quitting)");
6478 rc = rbd_slab_init();
6483 * The number of active work items is limited by the number of
6484 * rbd devices * queue depth, so leave @max_active at default.
6486 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6493 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6494 if (rbd_major < 0) {
6500 rc = rbd_sysfs_init();
6502 goto err_out_blkdev;
6505 pr_info("loaded (major %d)\n", rbd_major);
6507 pr_info("loaded\n");
6513 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6515 destroy_workqueue(rbd_wq);
6521 static void __exit rbd_exit(void)
6523 ida_destroy(&rbd_dev_id_ida);
6524 rbd_sysfs_cleanup();
6526 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6527 destroy_workqueue(rbd_wq);
6531 module_init(rbd_init);
6532 module_exit(rbd_exit);
6534 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6535 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6536 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6537 /* following authorship retained from original osdblk.c */
6538 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6540 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6541 MODULE_LICENSE("GPL");