IB/uverbs: Fix device cleanup
[linux/fpc-iii.git] / drivers / block / rbd.c
blobb008b6a980980ab56a82bc8b35f388b971b294aa
2 /*
3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
49 #include "rbd_types.h"
51 #define RBD_DEBUG /* Activate rbd_assert() calls */
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
59 #define SECTOR_SHIFT 9
60 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
68 static int atomic_inc_return_safe(atomic_t *v)
70 unsigned int counter;
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
76 atomic_dec(v);
78 return -EINVAL;
81 /* Decrement the counter. Return the resulting value, or -EINVAL */
82 static int atomic_dec_return_safe(atomic_t *v)
84 int counter;
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
90 atomic_inc(v);
92 return -EINVAL;
95 #define RBD_DRV_NAME "rbd"
97 #define RBD_MINORS_PER_MAJOR 256
98 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
100 #define RBD_MAX_PARENT_CHAIN_LEN 16
102 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103 #define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
106 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
108 #define RBD_SNAP_HEAD_NAME "-"
110 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
112 /* This allows a single page to hold an image name sent by OSD */
113 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
114 #define RBD_IMAGE_ID_LEN_MAX 64
116 #define RBD_OBJ_PREFIX_LEN_MAX 64
118 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */
119 #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
121 /* Feature bits */
123 #define RBD_FEATURE_LAYERING (1ULL<<0)
124 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
125 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
126 #define RBD_FEATURE_DATA_POOL (1ULL<<7)
128 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
129 RBD_FEATURE_STRIPINGV2 | \
130 RBD_FEATURE_EXCLUSIVE_LOCK | \
131 RBD_FEATURE_DATA_POOL)
133 /* Features supported by this (client software) implementation. */
135 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
138 * An RBD device name will be "rbd#", where the "rbd" comes from
139 * RBD_DRV_NAME above, and # is a unique integer identifier.
141 #define DEV_NAME_LEN 32
144 * block device image metadata (in-memory version)
146 struct rbd_image_header {
147 /* These six fields never change for a given rbd image */
148 char *object_prefix;
149 __u8 obj_order;
150 u64 stripe_unit;
151 u64 stripe_count;
152 s64 data_pool_id;
153 u64 features; /* Might be changeable someday? */
155 /* The remaining fields need to be updated occasionally */
156 u64 image_size;
157 struct ceph_snap_context *snapc;
158 char *snap_names; /* format 1 only */
159 u64 *snap_sizes; /* format 1 only */
163 * An rbd image specification.
165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166 * identify an image. Each rbd_dev structure includes a pointer to
167 * an rbd_spec structure that encapsulates this identity.
169 * Each of the id's in an rbd_spec has an associated name. For a
170 * user-mapped image, the names are supplied and the id's associated
171 * with them are looked up. For a layered image, a parent image is
172 * defined by the tuple, and the names are looked up.
174 * An rbd_dev structure contains a parent_spec pointer which is
175 * non-null if the image it represents is a child in a layered
176 * image. This pointer will refer to the rbd_spec structure used
177 * by the parent rbd_dev for its own identity (i.e., the structure
178 * is shared between the parent and child).
180 * Since these structures are populated once, during the discovery
181 * phase of image construction, they are effectively immutable so
182 * we make no effort to synchronize access to them.
184 * Note that code herein does not assume the image name is known (it
185 * could be a null pointer).
187 struct rbd_spec {
188 u64 pool_id;
189 const char *pool_name;
191 const char *image_id;
192 const char *image_name;
194 u64 snap_id;
195 const char *snap_name;
197 struct kref kref;
201 * an instance of the client. multiple devices may share an rbd client.
203 struct rbd_client {
204 struct ceph_client *client;
205 struct kref kref;
206 struct list_head node;
209 struct rbd_img_request;
210 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
212 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
214 struct rbd_obj_request;
215 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
217 enum obj_request_type {
218 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
221 enum obj_operation_type {
222 OBJ_OP_WRITE,
223 OBJ_OP_READ,
224 OBJ_OP_DISCARD,
227 enum obj_req_flags {
228 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
229 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
230 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
231 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
234 struct rbd_obj_request {
235 u64 object_no;
236 u64 offset; /* object start byte */
237 u64 length; /* bytes from offset */
238 unsigned long flags;
241 * An object request associated with an image will have its
242 * img_data flag set; a standalone object request will not.
244 * A standalone object request will have which == BAD_WHICH
245 * and a null obj_request pointer.
247 * An object request initiated in support of a layered image
248 * object (to check for its existence before a write) will
249 * have which == BAD_WHICH and a non-null obj_request pointer.
251 * Finally, an object request for rbd image data will have
252 * which != BAD_WHICH, and will have a non-null img_request
253 * pointer. The value of which will be in the range
254 * 0..(img_request->obj_request_count-1).
256 union {
257 struct rbd_obj_request *obj_request; /* STAT op */
258 struct {
259 struct rbd_img_request *img_request;
260 u64 img_offset;
261 /* links for img_request->obj_requests list */
262 struct list_head links;
265 u32 which; /* posn image request list */
267 enum obj_request_type type;
268 union {
269 struct bio *bio_list;
270 struct {
271 struct page **pages;
272 u32 page_count;
275 struct page **copyup_pages;
276 u32 copyup_page_count;
278 struct ceph_osd_request *osd_req;
280 u64 xferred; /* bytes transferred */
281 int result;
283 rbd_obj_callback_t callback;
284 struct completion completion;
286 struct kref kref;
289 enum img_req_flags {
290 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
291 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
292 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
293 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
296 struct rbd_img_request {
297 struct rbd_device *rbd_dev;
298 u64 offset; /* starting image byte offset */
299 u64 length; /* byte count from offset */
300 unsigned long flags;
301 union {
302 u64 snap_id; /* for reads */
303 struct ceph_snap_context *snapc; /* for writes */
305 union {
306 struct request *rq; /* block request */
307 struct rbd_obj_request *obj_request; /* obj req initiator */
309 struct page **copyup_pages;
310 u32 copyup_page_count;
311 spinlock_t completion_lock;/* protects next_completion */
312 u32 next_completion;
313 rbd_img_callback_t callback;
314 u64 xferred;/* aggregate bytes transferred */
315 int result; /* first nonzero obj_request result */
317 u32 obj_request_count;
318 struct list_head obj_requests; /* rbd_obj_request structs */
320 struct kref kref;
323 #define for_each_obj_request(ireq, oreq) \
324 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
325 #define for_each_obj_request_from(ireq, oreq) \
326 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
327 #define for_each_obj_request_safe(ireq, oreq, n) \
328 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
330 enum rbd_watch_state {
331 RBD_WATCH_STATE_UNREGISTERED,
332 RBD_WATCH_STATE_REGISTERED,
333 RBD_WATCH_STATE_ERROR,
336 enum rbd_lock_state {
337 RBD_LOCK_STATE_UNLOCKED,
338 RBD_LOCK_STATE_LOCKED,
339 RBD_LOCK_STATE_RELEASING,
342 /* WatchNotify::ClientId */
343 struct rbd_client_id {
344 u64 gid;
345 u64 handle;
348 struct rbd_mapping {
349 u64 size;
350 u64 features;
351 bool read_only;
355 * a single device
357 struct rbd_device {
358 int dev_id; /* blkdev unique id */
360 int major; /* blkdev assigned major */
361 int minor;
362 struct gendisk *disk; /* blkdev's gendisk and rq */
364 u32 image_format; /* Either 1 or 2 */
365 struct rbd_client *rbd_client;
367 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
369 spinlock_t lock; /* queue, flags, open_count */
371 struct rbd_image_header header;
372 unsigned long flags; /* possibly lock protected */
373 struct rbd_spec *spec;
374 struct rbd_options *opts;
375 char *config_info; /* add{,_single_major} string */
377 struct ceph_object_id header_oid;
378 struct ceph_object_locator header_oloc;
380 struct ceph_file_layout layout; /* used for all rbd requests */
382 struct mutex watch_mutex;
383 enum rbd_watch_state watch_state;
384 struct ceph_osd_linger_request *watch_handle;
385 u64 watch_cookie;
386 struct delayed_work watch_dwork;
388 struct rw_semaphore lock_rwsem;
389 enum rbd_lock_state lock_state;
390 char lock_cookie[32];
391 struct rbd_client_id owner_cid;
392 struct work_struct acquired_lock_work;
393 struct work_struct released_lock_work;
394 struct delayed_work lock_dwork;
395 struct work_struct unlock_work;
396 wait_queue_head_t lock_waitq;
398 struct workqueue_struct *task_wq;
400 struct rbd_spec *parent_spec;
401 u64 parent_overlap;
402 atomic_t parent_ref;
403 struct rbd_device *parent;
405 /* Block layer tags. */
406 struct blk_mq_tag_set tag_set;
408 /* protects updating the header */
409 struct rw_semaphore header_rwsem;
411 struct rbd_mapping mapping;
413 struct list_head node;
415 /* sysfs related */
416 struct device dev;
417 unsigned long open_count; /* protected by lock */
421 * Flag bits for rbd_dev->flags:
422 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
423 * by rbd_dev->lock
424 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
426 enum rbd_dev_flags {
427 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
428 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
429 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
432 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
434 static LIST_HEAD(rbd_dev_list); /* devices */
435 static DEFINE_SPINLOCK(rbd_dev_list_lock);
437 static LIST_HEAD(rbd_client_list); /* clients */
438 static DEFINE_SPINLOCK(rbd_client_list_lock);
440 /* Slab caches for frequently-allocated structures */
442 static struct kmem_cache *rbd_img_request_cache;
443 static struct kmem_cache *rbd_obj_request_cache;
445 static struct bio_set *rbd_bio_clone;
447 static int rbd_major;
448 static DEFINE_IDA(rbd_dev_id_ida);
450 static struct workqueue_struct *rbd_wq;
453 * Default to false for now, as single-major requires >= 0.75 version of
454 * userspace rbd utility.
456 static bool single_major = false;
457 module_param(single_major, bool, S_IRUGO);
458 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
460 static int rbd_img_request_submit(struct rbd_img_request *img_request);
462 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
463 size_t count);
464 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
465 size_t count);
466 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
467 size_t count);
468 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
469 size_t count);
470 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
471 static void rbd_spec_put(struct rbd_spec *spec);
473 static int rbd_dev_id_to_minor(int dev_id)
475 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
478 static int minor_to_rbd_dev_id(int minor)
480 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
483 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
485 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
486 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
489 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
491 bool is_lock_owner;
493 down_read(&rbd_dev->lock_rwsem);
494 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
495 up_read(&rbd_dev->lock_rwsem);
496 return is_lock_owner;
499 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
501 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
504 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
505 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
506 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
507 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
508 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
510 static struct attribute *rbd_bus_attrs[] = {
511 &bus_attr_add.attr,
512 &bus_attr_remove.attr,
513 &bus_attr_add_single_major.attr,
514 &bus_attr_remove_single_major.attr,
515 &bus_attr_supported_features.attr,
516 NULL,
519 static umode_t rbd_bus_is_visible(struct kobject *kobj,
520 struct attribute *attr, int index)
522 if (!single_major &&
523 (attr == &bus_attr_add_single_major.attr ||
524 attr == &bus_attr_remove_single_major.attr))
525 return 0;
527 return attr->mode;
530 static const struct attribute_group rbd_bus_group = {
531 .attrs = rbd_bus_attrs,
532 .is_visible = rbd_bus_is_visible,
534 __ATTRIBUTE_GROUPS(rbd_bus);
536 static struct bus_type rbd_bus_type = {
537 .name = "rbd",
538 .bus_groups = rbd_bus_groups,
541 static void rbd_root_dev_release(struct device *dev)
545 static struct device rbd_root_dev = {
546 .init_name = "rbd",
547 .release = rbd_root_dev_release,
550 static __printf(2, 3)
551 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
553 struct va_format vaf;
554 va_list args;
556 va_start(args, fmt);
557 vaf.fmt = fmt;
558 vaf.va = &args;
560 if (!rbd_dev)
561 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
562 else if (rbd_dev->disk)
563 printk(KERN_WARNING "%s: %s: %pV\n",
564 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
565 else if (rbd_dev->spec && rbd_dev->spec->image_name)
566 printk(KERN_WARNING "%s: image %s: %pV\n",
567 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
568 else if (rbd_dev->spec && rbd_dev->spec->image_id)
569 printk(KERN_WARNING "%s: id %s: %pV\n",
570 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
571 else /* punt */
572 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
573 RBD_DRV_NAME, rbd_dev, &vaf);
574 va_end(args);
577 #ifdef RBD_DEBUG
578 #define rbd_assert(expr) \
579 if (unlikely(!(expr))) { \
580 printk(KERN_ERR "\nAssertion failure in %s() " \
581 "at line %d:\n\n" \
582 "\trbd_assert(%s);\n\n", \
583 __func__, __LINE__, #expr); \
584 BUG(); \
586 #else /* !RBD_DEBUG */
587 # define rbd_assert(expr) ((void) 0)
588 #endif /* !RBD_DEBUG */
590 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
591 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
592 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
593 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
595 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
596 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
597 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
598 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
599 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
600 u64 snap_id);
601 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
602 u8 *order, u64 *snap_size);
603 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
604 u64 *snap_features);
606 static int rbd_open(struct block_device *bdev, fmode_t mode)
608 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
609 bool removing = false;
611 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
612 return -EROFS;
614 spin_lock_irq(&rbd_dev->lock);
615 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
616 removing = true;
617 else
618 rbd_dev->open_count++;
619 spin_unlock_irq(&rbd_dev->lock);
620 if (removing)
621 return -ENOENT;
623 (void) get_device(&rbd_dev->dev);
625 return 0;
628 static void rbd_release(struct gendisk *disk, fmode_t mode)
630 struct rbd_device *rbd_dev = disk->private_data;
631 unsigned long open_count_before;
633 spin_lock_irq(&rbd_dev->lock);
634 open_count_before = rbd_dev->open_count--;
635 spin_unlock_irq(&rbd_dev->lock);
636 rbd_assert(open_count_before > 0);
638 put_device(&rbd_dev->dev);
641 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
643 int ret = 0;
644 int val;
645 bool ro;
646 bool ro_changed = false;
648 /* get_user() may sleep, so call it before taking rbd_dev->lock */
649 if (get_user(val, (int __user *)(arg)))
650 return -EFAULT;
652 ro = val ? true : false;
653 /* Snapshot doesn't allow to write*/
654 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
655 return -EROFS;
657 spin_lock_irq(&rbd_dev->lock);
658 /* prevent others open this device */
659 if (rbd_dev->open_count > 1) {
660 ret = -EBUSY;
661 goto out;
664 if (rbd_dev->mapping.read_only != ro) {
665 rbd_dev->mapping.read_only = ro;
666 ro_changed = true;
669 out:
670 spin_unlock_irq(&rbd_dev->lock);
671 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
672 if (ret == 0 && ro_changed)
673 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
675 return ret;
678 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
679 unsigned int cmd, unsigned long arg)
681 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
682 int ret = 0;
684 switch (cmd) {
685 case BLKROSET:
686 ret = rbd_ioctl_set_ro(rbd_dev, arg);
687 break;
688 default:
689 ret = -ENOTTY;
692 return ret;
695 #ifdef CONFIG_COMPAT
696 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
697 unsigned int cmd, unsigned long arg)
699 return rbd_ioctl(bdev, mode, cmd, arg);
701 #endif /* CONFIG_COMPAT */
703 static const struct block_device_operations rbd_bd_ops = {
704 .owner = THIS_MODULE,
705 .open = rbd_open,
706 .release = rbd_release,
707 .ioctl = rbd_ioctl,
708 #ifdef CONFIG_COMPAT
709 .compat_ioctl = rbd_compat_ioctl,
710 #endif
714 * Initialize an rbd client instance. Success or not, this function
715 * consumes ceph_opts. Caller holds client_mutex.
717 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
719 struct rbd_client *rbdc;
720 int ret = -ENOMEM;
722 dout("%s:\n", __func__);
723 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
724 if (!rbdc)
725 goto out_opt;
727 kref_init(&rbdc->kref);
728 INIT_LIST_HEAD(&rbdc->node);
730 rbdc->client = ceph_create_client(ceph_opts, rbdc);
731 if (IS_ERR(rbdc->client))
732 goto out_rbdc;
733 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
735 ret = ceph_open_session(rbdc->client);
736 if (ret < 0)
737 goto out_client;
739 spin_lock(&rbd_client_list_lock);
740 list_add_tail(&rbdc->node, &rbd_client_list);
741 spin_unlock(&rbd_client_list_lock);
743 dout("%s: rbdc %p\n", __func__, rbdc);
745 return rbdc;
746 out_client:
747 ceph_destroy_client(rbdc->client);
748 out_rbdc:
749 kfree(rbdc);
750 out_opt:
751 if (ceph_opts)
752 ceph_destroy_options(ceph_opts);
753 dout("%s: error %d\n", __func__, ret);
755 return ERR_PTR(ret);
758 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
760 kref_get(&rbdc->kref);
762 return rbdc;
766 * Find a ceph client with specific addr and configuration. If
767 * found, bump its reference count.
769 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
771 struct rbd_client *client_node;
772 bool found = false;
774 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
775 return NULL;
777 spin_lock(&rbd_client_list_lock);
778 list_for_each_entry(client_node, &rbd_client_list, node) {
779 if (!ceph_compare_options(ceph_opts, client_node->client)) {
780 __rbd_get_client(client_node);
782 found = true;
783 break;
786 spin_unlock(&rbd_client_list_lock);
788 return found ? client_node : NULL;
792 * (Per device) rbd map options
794 enum {
795 Opt_queue_depth,
796 Opt_last_int,
797 /* int args above */
798 Opt_last_string,
799 /* string args above */
800 Opt_read_only,
801 Opt_read_write,
802 Opt_lock_on_read,
803 Opt_exclusive,
804 Opt_err
807 static match_table_t rbd_opts_tokens = {
808 {Opt_queue_depth, "queue_depth=%d"},
809 /* int args above */
810 /* string args above */
811 {Opt_read_only, "read_only"},
812 {Opt_read_only, "ro"}, /* Alternate spelling */
813 {Opt_read_write, "read_write"},
814 {Opt_read_write, "rw"}, /* Alternate spelling */
815 {Opt_lock_on_read, "lock_on_read"},
816 {Opt_exclusive, "exclusive"},
817 {Opt_err, NULL}
820 struct rbd_options {
821 int queue_depth;
822 bool read_only;
823 bool lock_on_read;
824 bool exclusive;
827 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
828 #define RBD_READ_ONLY_DEFAULT false
829 #define RBD_LOCK_ON_READ_DEFAULT false
830 #define RBD_EXCLUSIVE_DEFAULT false
832 static int parse_rbd_opts_token(char *c, void *private)
834 struct rbd_options *rbd_opts = private;
835 substring_t argstr[MAX_OPT_ARGS];
836 int token, intval, ret;
838 token = match_token(c, rbd_opts_tokens, argstr);
839 if (token < Opt_last_int) {
840 ret = match_int(&argstr[0], &intval);
841 if (ret < 0) {
842 pr_err("bad mount option arg (not int) at '%s'\n", c);
843 return ret;
845 dout("got int token %d val %d\n", token, intval);
846 } else if (token > Opt_last_int && token < Opt_last_string) {
847 dout("got string token %d val %s\n", token, argstr[0].from);
848 } else {
849 dout("got token %d\n", token);
852 switch (token) {
853 case Opt_queue_depth:
854 if (intval < 1) {
855 pr_err("queue_depth out of range\n");
856 return -EINVAL;
858 rbd_opts->queue_depth = intval;
859 break;
860 case Opt_read_only:
861 rbd_opts->read_only = true;
862 break;
863 case Opt_read_write:
864 rbd_opts->read_only = false;
865 break;
866 case Opt_lock_on_read:
867 rbd_opts->lock_on_read = true;
868 break;
869 case Opt_exclusive:
870 rbd_opts->exclusive = true;
871 break;
872 default:
873 /* libceph prints "bad option" msg */
874 return -EINVAL;
877 return 0;
880 static char* obj_op_name(enum obj_operation_type op_type)
882 switch (op_type) {
883 case OBJ_OP_READ:
884 return "read";
885 case OBJ_OP_WRITE:
886 return "write";
887 case OBJ_OP_DISCARD:
888 return "discard";
889 default:
890 return "???";
895 * Get a ceph client with specific addr and configuration, if one does
896 * not exist create it. Either way, ceph_opts is consumed by this
897 * function.
899 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
901 struct rbd_client *rbdc;
903 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
904 rbdc = rbd_client_find(ceph_opts);
905 if (rbdc) /* using an existing client */
906 ceph_destroy_options(ceph_opts);
907 else
908 rbdc = rbd_client_create(ceph_opts);
909 mutex_unlock(&client_mutex);
911 return rbdc;
915 * Destroy ceph client
917 * Caller must hold rbd_client_list_lock.
919 static void rbd_client_release(struct kref *kref)
921 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
923 dout("%s: rbdc %p\n", __func__, rbdc);
924 spin_lock(&rbd_client_list_lock);
925 list_del(&rbdc->node);
926 spin_unlock(&rbd_client_list_lock);
928 ceph_destroy_client(rbdc->client);
929 kfree(rbdc);
933 * Drop reference to ceph client node. If it's not referenced anymore, release
934 * it.
936 static void rbd_put_client(struct rbd_client *rbdc)
938 if (rbdc)
939 kref_put(&rbdc->kref, rbd_client_release);
942 static bool rbd_image_format_valid(u32 image_format)
944 return image_format == 1 || image_format == 2;
947 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
949 size_t size;
950 u32 snap_count;
952 /* The header has to start with the magic rbd header text */
953 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
954 return false;
956 /* The bio layer requires at least sector-sized I/O */
958 if (ondisk->options.order < SECTOR_SHIFT)
959 return false;
961 /* If we use u64 in a few spots we may be able to loosen this */
963 if (ondisk->options.order > 8 * sizeof (int) - 1)
964 return false;
967 * The size of a snapshot header has to fit in a size_t, and
968 * that limits the number of snapshots.
970 snap_count = le32_to_cpu(ondisk->snap_count);
971 size = SIZE_MAX - sizeof (struct ceph_snap_context);
972 if (snap_count > size / sizeof (__le64))
973 return false;
976 * Not only that, but the size of the entire the snapshot
977 * header must also be representable in a size_t.
979 size -= snap_count * sizeof (__le64);
980 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
981 return false;
983 return true;
987 * returns the size of an object in the image
989 static u32 rbd_obj_bytes(struct rbd_image_header *header)
991 return 1U << header->obj_order;
994 static void rbd_init_layout(struct rbd_device *rbd_dev)
996 if (rbd_dev->header.stripe_unit == 0 ||
997 rbd_dev->header.stripe_count == 0) {
998 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
999 rbd_dev->header.stripe_count = 1;
1002 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1003 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1004 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1005 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1006 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1007 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1011 * Fill an rbd image header with information from the given format 1
1012 * on-disk header.
1014 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1015 struct rbd_image_header_ondisk *ondisk)
1017 struct rbd_image_header *header = &rbd_dev->header;
1018 bool first_time = header->object_prefix == NULL;
1019 struct ceph_snap_context *snapc;
1020 char *object_prefix = NULL;
1021 char *snap_names = NULL;
1022 u64 *snap_sizes = NULL;
1023 u32 snap_count;
1024 int ret = -ENOMEM;
1025 u32 i;
1027 /* Allocate this now to avoid having to handle failure below */
1029 if (first_time) {
1030 object_prefix = kstrndup(ondisk->object_prefix,
1031 sizeof(ondisk->object_prefix),
1032 GFP_KERNEL);
1033 if (!object_prefix)
1034 return -ENOMEM;
1037 /* Allocate the snapshot context and fill it in */
1039 snap_count = le32_to_cpu(ondisk->snap_count);
1040 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1041 if (!snapc)
1042 goto out_err;
1043 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1044 if (snap_count) {
1045 struct rbd_image_snap_ondisk *snaps;
1046 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1048 /* We'll keep a copy of the snapshot names... */
1050 if (snap_names_len > (u64)SIZE_MAX)
1051 goto out_2big;
1052 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1053 if (!snap_names)
1054 goto out_err;
1056 /* ...as well as the array of their sizes. */
1057 snap_sizes = kmalloc_array(snap_count,
1058 sizeof(*header->snap_sizes),
1059 GFP_KERNEL);
1060 if (!snap_sizes)
1061 goto out_err;
1064 * Copy the names, and fill in each snapshot's id
1065 * and size.
1067 * Note that rbd_dev_v1_header_info() guarantees the
1068 * ondisk buffer we're working with has
1069 * snap_names_len bytes beyond the end of the
1070 * snapshot id array, this memcpy() is safe.
1072 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1073 snaps = ondisk->snaps;
1074 for (i = 0; i < snap_count; i++) {
1075 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1076 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1080 /* We won't fail any more, fill in the header */
1082 if (first_time) {
1083 header->object_prefix = object_prefix;
1084 header->obj_order = ondisk->options.order;
1085 rbd_init_layout(rbd_dev);
1086 } else {
1087 ceph_put_snap_context(header->snapc);
1088 kfree(header->snap_names);
1089 kfree(header->snap_sizes);
1092 /* The remaining fields always get updated (when we refresh) */
1094 header->image_size = le64_to_cpu(ondisk->image_size);
1095 header->snapc = snapc;
1096 header->snap_names = snap_names;
1097 header->snap_sizes = snap_sizes;
1099 return 0;
1100 out_2big:
1101 ret = -EIO;
1102 out_err:
1103 kfree(snap_sizes);
1104 kfree(snap_names);
1105 ceph_put_snap_context(snapc);
1106 kfree(object_prefix);
1108 return ret;
1111 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1113 const char *snap_name;
1115 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1117 /* Skip over names until we find the one we are looking for */
1119 snap_name = rbd_dev->header.snap_names;
1120 while (which--)
1121 snap_name += strlen(snap_name) + 1;
1123 return kstrdup(snap_name, GFP_KERNEL);
1127 * Snapshot id comparison function for use with qsort()/bsearch().
1128 * Note that result is for snapshots in *descending* order.
1130 static int snapid_compare_reverse(const void *s1, const void *s2)
1132 u64 snap_id1 = *(u64 *)s1;
1133 u64 snap_id2 = *(u64 *)s2;
1135 if (snap_id1 < snap_id2)
1136 return 1;
1137 return snap_id1 == snap_id2 ? 0 : -1;
1141 * Search a snapshot context to see if the given snapshot id is
1142 * present.
1144 * Returns the position of the snapshot id in the array if it's found,
1145 * or BAD_SNAP_INDEX otherwise.
1147 * Note: The snapshot array is in kept sorted (by the osd) in
1148 * reverse order, highest snapshot id first.
1150 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1152 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1153 u64 *found;
1155 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1156 sizeof (snap_id), snapid_compare_reverse);
1158 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1161 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1162 u64 snap_id)
1164 u32 which;
1165 const char *snap_name;
1167 which = rbd_dev_snap_index(rbd_dev, snap_id);
1168 if (which == BAD_SNAP_INDEX)
1169 return ERR_PTR(-ENOENT);
1171 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1172 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1175 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1177 if (snap_id == CEPH_NOSNAP)
1178 return RBD_SNAP_HEAD_NAME;
1180 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1181 if (rbd_dev->image_format == 1)
1182 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1184 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1187 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1188 u64 *snap_size)
1190 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1191 if (snap_id == CEPH_NOSNAP) {
1192 *snap_size = rbd_dev->header.image_size;
1193 } else if (rbd_dev->image_format == 1) {
1194 u32 which;
1196 which = rbd_dev_snap_index(rbd_dev, snap_id);
1197 if (which == BAD_SNAP_INDEX)
1198 return -ENOENT;
1200 *snap_size = rbd_dev->header.snap_sizes[which];
1201 } else {
1202 u64 size = 0;
1203 int ret;
1205 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1206 if (ret)
1207 return ret;
1209 *snap_size = size;
1211 return 0;
1214 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1215 u64 *snap_features)
1217 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1218 if (snap_id == CEPH_NOSNAP) {
1219 *snap_features = rbd_dev->header.features;
1220 } else if (rbd_dev->image_format == 1) {
1221 *snap_features = 0; /* No features for format 1 */
1222 } else {
1223 u64 features = 0;
1224 int ret;
1226 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1227 if (ret)
1228 return ret;
1230 *snap_features = features;
1232 return 0;
1235 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1237 u64 snap_id = rbd_dev->spec->snap_id;
1238 u64 size = 0;
1239 u64 features = 0;
1240 int ret;
1242 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1243 if (ret)
1244 return ret;
1245 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1246 if (ret)
1247 return ret;
1249 rbd_dev->mapping.size = size;
1250 rbd_dev->mapping.features = features;
1252 return 0;
1255 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1257 rbd_dev->mapping.size = 0;
1258 rbd_dev->mapping.features = 0;
1261 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1263 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1265 return offset & (segment_size - 1);
1268 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1269 u64 offset, u64 length)
1271 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1273 offset &= segment_size - 1;
1275 rbd_assert(length <= U64_MAX - offset);
1276 if (offset + length > segment_size)
1277 length = segment_size - offset;
1279 return length;
1283 * bio helpers
1286 static void bio_chain_put(struct bio *chain)
1288 struct bio *tmp;
1290 while (chain) {
1291 tmp = chain;
1292 chain = chain->bi_next;
1293 bio_put(tmp);
1298 * zeros a bio chain, starting at specific offset
1300 static void zero_bio_chain(struct bio *chain, int start_ofs)
1302 struct bio_vec bv;
1303 struct bvec_iter iter;
1304 unsigned long flags;
1305 void *buf;
1306 int pos = 0;
1308 while (chain) {
1309 bio_for_each_segment(bv, chain, iter) {
1310 if (pos + bv.bv_len > start_ofs) {
1311 int remainder = max(start_ofs - pos, 0);
1312 buf = bvec_kmap_irq(&bv, &flags);
1313 memset(buf + remainder, 0,
1314 bv.bv_len - remainder);
1315 flush_dcache_page(bv.bv_page);
1316 bvec_kunmap_irq(buf, &flags);
1318 pos += bv.bv_len;
1321 chain = chain->bi_next;
1326 * similar to zero_bio_chain(), zeros data defined by a page array,
1327 * starting at the given byte offset from the start of the array and
1328 * continuing up to the given end offset. The pages array is
1329 * assumed to be big enough to hold all bytes up to the end.
1331 static void zero_pages(struct page **pages, u64 offset, u64 end)
1333 struct page **page = &pages[offset >> PAGE_SHIFT];
1335 rbd_assert(end > offset);
1336 rbd_assert(end - offset <= (u64)SIZE_MAX);
1337 while (offset < end) {
1338 size_t page_offset;
1339 size_t length;
1340 unsigned long flags;
1341 void *kaddr;
1343 page_offset = offset & ~PAGE_MASK;
1344 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1345 local_irq_save(flags);
1346 kaddr = kmap_atomic(*page);
1347 memset(kaddr + page_offset, 0, length);
1348 flush_dcache_page(*page);
1349 kunmap_atomic(kaddr);
1350 local_irq_restore(flags);
1352 offset += length;
1353 page++;
1358 * Clone a portion of a bio, starting at the given byte offset
1359 * and continuing for the number of bytes indicated.
1361 static struct bio *bio_clone_range(struct bio *bio_src,
1362 unsigned int offset,
1363 unsigned int len,
1364 gfp_t gfpmask)
1366 struct bio *bio;
1368 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1369 if (!bio)
1370 return NULL; /* ENOMEM */
1372 bio_advance(bio, offset);
1373 bio->bi_iter.bi_size = len;
1375 return bio;
1379 * Clone a portion of a bio chain, starting at the given byte offset
1380 * into the first bio in the source chain and continuing for the
1381 * number of bytes indicated. The result is another bio chain of
1382 * exactly the given length, or a null pointer on error.
1384 * The bio_src and offset parameters are both in-out. On entry they
1385 * refer to the first source bio and the offset into that bio where
1386 * the start of data to be cloned is located.
1388 * On return, bio_src is updated to refer to the bio in the source
1389 * chain that contains first un-cloned byte, and *offset will
1390 * contain the offset of that byte within that bio.
1392 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1393 unsigned int *offset,
1394 unsigned int len,
1395 gfp_t gfpmask)
1397 struct bio *bi = *bio_src;
1398 unsigned int off = *offset;
1399 struct bio *chain = NULL;
1400 struct bio **end;
1402 /* Build up a chain of clone bios up to the limit */
1404 if (!bi || off >= bi->bi_iter.bi_size || !len)
1405 return NULL; /* Nothing to clone */
1407 end = &chain;
1408 while (len) {
1409 unsigned int bi_size;
1410 struct bio *bio;
1412 if (!bi) {
1413 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1414 goto out_err; /* EINVAL; ran out of bio's */
1416 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1417 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1418 if (!bio)
1419 goto out_err; /* ENOMEM */
1421 *end = bio;
1422 end = &bio->bi_next;
1424 off += bi_size;
1425 if (off == bi->bi_iter.bi_size) {
1426 bi = bi->bi_next;
1427 off = 0;
1429 len -= bi_size;
1431 *bio_src = bi;
1432 *offset = off;
1434 return chain;
1435 out_err:
1436 bio_chain_put(chain);
1438 return NULL;
1442 * The default/initial value for all object request flags is 0. For
1443 * each flag, once its value is set to 1 it is never reset to 0
1444 * again.
1446 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1448 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1449 struct rbd_device *rbd_dev;
1451 rbd_dev = obj_request->img_request->rbd_dev;
1452 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1453 obj_request);
1457 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1459 smp_mb();
1460 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1463 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1465 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1466 struct rbd_device *rbd_dev = NULL;
1468 if (obj_request_img_data_test(obj_request))
1469 rbd_dev = obj_request->img_request->rbd_dev;
1470 rbd_warn(rbd_dev, "obj_request %p already marked done",
1471 obj_request);
1475 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1477 smp_mb();
1478 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1482 * This sets the KNOWN flag after (possibly) setting the EXISTS
1483 * flag. The latter is set based on the "exists" value provided.
1485 * Note that for our purposes once an object exists it never goes
1486 * away again. It's possible that the response from two existence
1487 * checks are separated by the creation of the target object, and
1488 * the first ("doesn't exist") response arrives *after* the second
1489 * ("does exist"). In that case we ignore the second one.
1491 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1492 bool exists)
1494 if (exists)
1495 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1496 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1497 smp_mb();
1500 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1502 smp_mb();
1503 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1506 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1508 smp_mb();
1509 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1512 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1514 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1516 return obj_request->img_offset <
1517 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1520 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1522 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1523 kref_read(&obj_request->kref));
1524 kref_get(&obj_request->kref);
1527 static void rbd_obj_request_destroy(struct kref *kref);
1528 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1530 rbd_assert(obj_request != NULL);
1531 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1532 kref_read(&obj_request->kref));
1533 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1536 static void rbd_img_request_get(struct rbd_img_request *img_request)
1538 dout("%s: img %p (was %d)\n", __func__, img_request,
1539 kref_read(&img_request->kref));
1540 kref_get(&img_request->kref);
1543 static bool img_request_child_test(struct rbd_img_request *img_request);
1544 static void rbd_parent_request_destroy(struct kref *kref);
1545 static void rbd_img_request_destroy(struct kref *kref);
1546 static void rbd_img_request_put(struct rbd_img_request *img_request)
1548 rbd_assert(img_request != NULL);
1549 dout("%s: img %p (was %d)\n", __func__, img_request,
1550 kref_read(&img_request->kref));
1551 if (img_request_child_test(img_request))
1552 kref_put(&img_request->kref, rbd_parent_request_destroy);
1553 else
1554 kref_put(&img_request->kref, rbd_img_request_destroy);
1557 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1558 struct rbd_obj_request *obj_request)
1560 rbd_assert(obj_request->img_request == NULL);
1562 /* Image request now owns object's original reference */
1563 obj_request->img_request = img_request;
1564 obj_request->which = img_request->obj_request_count;
1565 rbd_assert(!obj_request_img_data_test(obj_request));
1566 obj_request_img_data_set(obj_request);
1567 rbd_assert(obj_request->which != BAD_WHICH);
1568 img_request->obj_request_count++;
1569 list_add_tail(&obj_request->links, &img_request->obj_requests);
1570 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1571 obj_request->which);
1574 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1575 struct rbd_obj_request *obj_request)
1577 rbd_assert(obj_request->which != BAD_WHICH);
1579 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1580 obj_request->which);
1581 list_del(&obj_request->links);
1582 rbd_assert(img_request->obj_request_count > 0);
1583 img_request->obj_request_count--;
1584 rbd_assert(obj_request->which == img_request->obj_request_count);
1585 obj_request->which = BAD_WHICH;
1586 rbd_assert(obj_request_img_data_test(obj_request));
1587 rbd_assert(obj_request->img_request == img_request);
1588 obj_request->img_request = NULL;
1589 obj_request->callback = NULL;
1590 rbd_obj_request_put(obj_request);
1593 static bool obj_request_type_valid(enum obj_request_type type)
1595 switch (type) {
1596 case OBJ_REQUEST_NODATA:
1597 case OBJ_REQUEST_BIO:
1598 case OBJ_REQUEST_PAGES:
1599 return true;
1600 default:
1601 return false;
1605 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1607 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1609 struct ceph_osd_request *osd_req = obj_request->osd_req;
1611 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1612 obj_request, obj_request->object_no, obj_request->offset,
1613 obj_request->length, osd_req);
1614 if (obj_request_img_data_test(obj_request)) {
1615 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1616 rbd_img_request_get(obj_request->img_request);
1618 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1621 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1624 dout("%s: img %p\n", __func__, img_request);
1627 * If no error occurred, compute the aggregate transfer
1628 * count for the image request. We could instead use
1629 * atomic64_cmpxchg() to update it as each object request
1630 * completes; not clear which way is better off hand.
1632 if (!img_request->result) {
1633 struct rbd_obj_request *obj_request;
1634 u64 xferred = 0;
1636 for_each_obj_request(img_request, obj_request)
1637 xferred += obj_request->xferred;
1638 img_request->xferred = xferred;
1641 if (img_request->callback)
1642 img_request->callback(img_request);
1643 else
1644 rbd_img_request_put(img_request);
1648 * The default/initial value for all image request flags is 0. Each
1649 * is conditionally set to 1 at image request initialization time
1650 * and currently never change thereafter.
1652 static void img_request_write_set(struct rbd_img_request *img_request)
1654 set_bit(IMG_REQ_WRITE, &img_request->flags);
1655 smp_mb();
1658 static bool img_request_write_test(struct rbd_img_request *img_request)
1660 smp_mb();
1661 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1665 * Set the discard flag when the img_request is an discard request
1667 static void img_request_discard_set(struct rbd_img_request *img_request)
1669 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1670 smp_mb();
1673 static bool img_request_discard_test(struct rbd_img_request *img_request)
1675 smp_mb();
1676 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1679 static void img_request_child_set(struct rbd_img_request *img_request)
1681 set_bit(IMG_REQ_CHILD, &img_request->flags);
1682 smp_mb();
1685 static void img_request_child_clear(struct rbd_img_request *img_request)
1687 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1688 smp_mb();
1691 static bool img_request_child_test(struct rbd_img_request *img_request)
1693 smp_mb();
1694 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1697 static void img_request_layered_set(struct rbd_img_request *img_request)
1699 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1700 smp_mb();
1703 static void img_request_layered_clear(struct rbd_img_request *img_request)
1705 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1706 smp_mb();
1709 static bool img_request_layered_test(struct rbd_img_request *img_request)
1711 smp_mb();
1712 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1715 static enum obj_operation_type
1716 rbd_img_request_op_type(struct rbd_img_request *img_request)
1718 if (img_request_write_test(img_request))
1719 return OBJ_OP_WRITE;
1720 else if (img_request_discard_test(img_request))
1721 return OBJ_OP_DISCARD;
1722 else
1723 return OBJ_OP_READ;
1726 static void
1727 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1729 u64 xferred = obj_request->xferred;
1730 u64 length = obj_request->length;
1732 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1733 obj_request, obj_request->img_request, obj_request->result,
1734 xferred, length);
1736 * ENOENT means a hole in the image. We zero-fill the entire
1737 * length of the request. A short read also implies zero-fill
1738 * to the end of the request. An error requires the whole
1739 * length of the request to be reported finished with an error
1740 * to the block layer. In each case we update the xferred
1741 * count to indicate the whole request was satisfied.
1743 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1744 if (obj_request->result == -ENOENT) {
1745 if (obj_request->type == OBJ_REQUEST_BIO)
1746 zero_bio_chain(obj_request->bio_list, 0);
1747 else
1748 zero_pages(obj_request->pages, 0, length);
1749 obj_request->result = 0;
1750 } else if (xferred < length && !obj_request->result) {
1751 if (obj_request->type == OBJ_REQUEST_BIO)
1752 zero_bio_chain(obj_request->bio_list, xferred);
1753 else
1754 zero_pages(obj_request->pages, xferred, length);
1756 obj_request->xferred = length;
1757 obj_request_done_set(obj_request);
1760 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1762 dout("%s: obj %p cb %p\n", __func__, obj_request,
1763 obj_request->callback);
1764 if (obj_request->callback)
1765 obj_request->callback(obj_request);
1766 else
1767 complete_all(&obj_request->completion);
1770 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1772 obj_request->result = err;
1773 obj_request->xferred = 0;
1775 * kludge - mirror rbd_obj_request_submit() to match a put in
1776 * rbd_img_obj_callback()
1778 if (obj_request_img_data_test(obj_request)) {
1779 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1780 rbd_img_request_get(obj_request->img_request);
1782 obj_request_done_set(obj_request);
1783 rbd_obj_request_complete(obj_request);
1786 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1788 struct rbd_img_request *img_request = NULL;
1789 struct rbd_device *rbd_dev = NULL;
1790 bool layered = false;
1792 if (obj_request_img_data_test(obj_request)) {
1793 img_request = obj_request->img_request;
1794 layered = img_request && img_request_layered_test(img_request);
1795 rbd_dev = img_request->rbd_dev;
1798 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1799 obj_request, img_request, obj_request->result,
1800 obj_request->xferred, obj_request->length);
1801 if (layered && obj_request->result == -ENOENT &&
1802 obj_request->img_offset < rbd_dev->parent_overlap)
1803 rbd_img_parent_read(obj_request);
1804 else if (img_request)
1805 rbd_img_obj_request_read_callback(obj_request);
1806 else
1807 obj_request_done_set(obj_request);
1810 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1812 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1813 obj_request->result, obj_request->length);
1815 * There is no such thing as a successful short write. Set
1816 * it to our originally-requested length.
1818 obj_request->xferred = obj_request->length;
1819 obj_request_done_set(obj_request);
1822 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1824 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1825 obj_request->result, obj_request->length);
1827 * There is no such thing as a successful short discard. Set
1828 * it to our originally-requested length.
1830 obj_request->xferred = obj_request->length;
1831 /* discarding a non-existent object is not a problem */
1832 if (obj_request->result == -ENOENT)
1833 obj_request->result = 0;
1834 obj_request_done_set(obj_request);
1838 * For a simple stat call there's nothing to do. We'll do more if
1839 * this is part of a write sequence for a layered image.
1841 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1843 dout("%s: obj %p\n", __func__, obj_request);
1844 obj_request_done_set(obj_request);
1847 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1849 dout("%s: obj %p\n", __func__, obj_request);
1851 if (obj_request_img_data_test(obj_request))
1852 rbd_osd_copyup_callback(obj_request);
1853 else
1854 obj_request_done_set(obj_request);
1857 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1859 struct rbd_obj_request *obj_request = osd_req->r_priv;
1860 u16 opcode;
1862 dout("%s: osd_req %p\n", __func__, osd_req);
1863 rbd_assert(osd_req == obj_request->osd_req);
1864 if (obj_request_img_data_test(obj_request)) {
1865 rbd_assert(obj_request->img_request);
1866 rbd_assert(obj_request->which != BAD_WHICH);
1867 } else {
1868 rbd_assert(obj_request->which == BAD_WHICH);
1871 if (osd_req->r_result < 0)
1872 obj_request->result = osd_req->r_result;
1875 * We support a 64-bit length, but ultimately it has to be
1876 * passed to the block layer, which just supports a 32-bit
1877 * length field.
1879 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1880 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1882 opcode = osd_req->r_ops[0].op;
1883 switch (opcode) {
1884 case CEPH_OSD_OP_READ:
1885 rbd_osd_read_callback(obj_request);
1886 break;
1887 case CEPH_OSD_OP_SETALLOCHINT:
1888 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1889 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1890 /* fall through */
1891 case CEPH_OSD_OP_WRITE:
1892 case CEPH_OSD_OP_WRITEFULL:
1893 rbd_osd_write_callback(obj_request);
1894 break;
1895 case CEPH_OSD_OP_STAT:
1896 rbd_osd_stat_callback(obj_request);
1897 break;
1898 case CEPH_OSD_OP_DELETE:
1899 case CEPH_OSD_OP_TRUNCATE:
1900 case CEPH_OSD_OP_ZERO:
1901 rbd_osd_discard_callback(obj_request);
1902 break;
1903 case CEPH_OSD_OP_CALL:
1904 rbd_osd_call_callback(obj_request);
1905 break;
1906 default:
1907 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1908 obj_request->object_no, opcode);
1909 break;
1912 if (obj_request_done_test(obj_request))
1913 rbd_obj_request_complete(obj_request);
1916 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1918 struct ceph_osd_request *osd_req = obj_request->osd_req;
1920 rbd_assert(obj_request_img_data_test(obj_request));
1921 osd_req->r_snapid = obj_request->img_request->snap_id;
1924 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1926 struct ceph_osd_request *osd_req = obj_request->osd_req;
1928 ktime_get_real_ts(&osd_req->r_mtime);
1929 osd_req->r_data_offset = obj_request->offset;
1932 static struct ceph_osd_request *
1933 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1934 struct ceph_snap_context *snapc,
1935 int num_ops, unsigned int flags,
1936 struct rbd_obj_request *obj_request)
1938 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1939 struct ceph_osd_request *req;
1940 const char *name_format = rbd_dev->image_format == 1 ?
1941 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1943 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1944 if (!req)
1945 return NULL;
1947 req->r_flags = flags;
1948 req->r_callback = rbd_osd_req_callback;
1949 req->r_priv = obj_request;
1951 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1952 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1953 rbd_dev->header.object_prefix, obj_request->object_no))
1954 goto err_req;
1956 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1957 goto err_req;
1959 return req;
1961 err_req:
1962 ceph_osdc_put_request(req);
1963 return NULL;
1967 * Create an osd request. A read request has one osd op (read).
1968 * A write request has either one (watch) or two (hint+write) osd ops.
1969 * (All rbd data writes are prefixed with an allocation hint op, but
1970 * technically osd watch is a write request, hence this distinction.)
1972 static struct ceph_osd_request *rbd_osd_req_create(
1973 struct rbd_device *rbd_dev,
1974 enum obj_operation_type op_type,
1975 unsigned int num_ops,
1976 struct rbd_obj_request *obj_request)
1978 struct ceph_snap_context *snapc = NULL;
1980 if (obj_request_img_data_test(obj_request) &&
1981 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1982 struct rbd_img_request *img_request = obj_request->img_request;
1983 if (op_type == OBJ_OP_WRITE) {
1984 rbd_assert(img_request_write_test(img_request));
1985 } else {
1986 rbd_assert(img_request_discard_test(img_request));
1988 snapc = img_request->snapc;
1991 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1993 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1994 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1995 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1999 * Create a copyup osd request based on the information in the object
2000 * request supplied. A copyup request has two or three osd ops, a
2001 * copyup method call, potentially a hint op, and a write or truncate
2002 * or zero op.
2004 static struct ceph_osd_request *
2005 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2007 struct rbd_img_request *img_request;
2008 int num_osd_ops = 3;
2010 rbd_assert(obj_request_img_data_test(obj_request));
2011 img_request = obj_request->img_request;
2012 rbd_assert(img_request);
2013 rbd_assert(img_request_write_test(img_request) ||
2014 img_request_discard_test(img_request));
2016 if (img_request_discard_test(img_request))
2017 num_osd_ops = 2;
2019 return __rbd_osd_req_create(img_request->rbd_dev,
2020 img_request->snapc, num_osd_ops,
2021 CEPH_OSD_FLAG_WRITE, obj_request);
2024 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2026 ceph_osdc_put_request(osd_req);
2029 static struct rbd_obj_request *
2030 rbd_obj_request_create(enum obj_request_type type)
2032 struct rbd_obj_request *obj_request;
2034 rbd_assert(obj_request_type_valid(type));
2036 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2037 if (!obj_request)
2038 return NULL;
2040 obj_request->which = BAD_WHICH;
2041 obj_request->type = type;
2042 INIT_LIST_HEAD(&obj_request->links);
2043 init_completion(&obj_request->completion);
2044 kref_init(&obj_request->kref);
2046 dout("%s %p\n", __func__, obj_request);
2047 return obj_request;
2050 static void rbd_obj_request_destroy(struct kref *kref)
2052 struct rbd_obj_request *obj_request;
2054 obj_request = container_of(kref, struct rbd_obj_request, kref);
2056 dout("%s: obj %p\n", __func__, obj_request);
2058 rbd_assert(obj_request->img_request == NULL);
2059 rbd_assert(obj_request->which == BAD_WHICH);
2061 if (obj_request->osd_req)
2062 rbd_osd_req_destroy(obj_request->osd_req);
2064 rbd_assert(obj_request_type_valid(obj_request->type));
2065 switch (obj_request->type) {
2066 case OBJ_REQUEST_NODATA:
2067 break; /* Nothing to do */
2068 case OBJ_REQUEST_BIO:
2069 if (obj_request->bio_list)
2070 bio_chain_put(obj_request->bio_list);
2071 break;
2072 case OBJ_REQUEST_PAGES:
2073 /* img_data requests don't own their page array */
2074 if (obj_request->pages &&
2075 !obj_request_img_data_test(obj_request))
2076 ceph_release_page_vector(obj_request->pages,
2077 obj_request->page_count);
2078 break;
2081 kmem_cache_free(rbd_obj_request_cache, obj_request);
2084 /* It's OK to call this for a device with no parent */
2086 static void rbd_spec_put(struct rbd_spec *spec);
2087 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2089 rbd_dev_remove_parent(rbd_dev);
2090 rbd_spec_put(rbd_dev->parent_spec);
2091 rbd_dev->parent_spec = NULL;
2092 rbd_dev->parent_overlap = 0;
2096 * Parent image reference counting is used to determine when an
2097 * image's parent fields can be safely torn down--after there are no
2098 * more in-flight requests to the parent image. When the last
2099 * reference is dropped, cleaning them up is safe.
2101 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2103 int counter;
2105 if (!rbd_dev->parent_spec)
2106 return;
2108 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2109 if (counter > 0)
2110 return;
2112 /* Last reference; clean up parent data structures */
2114 if (!counter)
2115 rbd_dev_unparent(rbd_dev);
2116 else
2117 rbd_warn(rbd_dev, "parent reference underflow");
2121 * If an image has a non-zero parent overlap, get a reference to its
2122 * parent.
2124 * Returns true if the rbd device has a parent with a non-zero
2125 * overlap and a reference for it was successfully taken, or
2126 * false otherwise.
2128 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2130 int counter = 0;
2132 if (!rbd_dev->parent_spec)
2133 return false;
2135 down_read(&rbd_dev->header_rwsem);
2136 if (rbd_dev->parent_overlap)
2137 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2138 up_read(&rbd_dev->header_rwsem);
2140 if (counter < 0)
2141 rbd_warn(rbd_dev, "parent reference overflow");
2143 return counter > 0;
2147 * Caller is responsible for filling in the list of object requests
2148 * that comprises the image request, and the Linux request pointer
2149 * (if there is one).
2151 static struct rbd_img_request *rbd_img_request_create(
2152 struct rbd_device *rbd_dev,
2153 u64 offset, u64 length,
2154 enum obj_operation_type op_type,
2155 struct ceph_snap_context *snapc)
2157 struct rbd_img_request *img_request;
2159 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2160 if (!img_request)
2161 return NULL;
2163 img_request->rq = NULL;
2164 img_request->rbd_dev = rbd_dev;
2165 img_request->offset = offset;
2166 img_request->length = length;
2167 img_request->flags = 0;
2168 if (op_type == OBJ_OP_DISCARD) {
2169 img_request_discard_set(img_request);
2170 img_request->snapc = snapc;
2171 } else if (op_type == OBJ_OP_WRITE) {
2172 img_request_write_set(img_request);
2173 img_request->snapc = snapc;
2174 } else {
2175 img_request->snap_id = rbd_dev->spec->snap_id;
2177 if (rbd_dev_parent_get(rbd_dev))
2178 img_request_layered_set(img_request);
2179 spin_lock_init(&img_request->completion_lock);
2180 img_request->next_completion = 0;
2181 img_request->callback = NULL;
2182 img_request->result = 0;
2183 img_request->obj_request_count = 0;
2184 INIT_LIST_HEAD(&img_request->obj_requests);
2185 kref_init(&img_request->kref);
2187 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2188 obj_op_name(op_type), offset, length, img_request);
2190 return img_request;
2193 static void rbd_img_request_destroy(struct kref *kref)
2195 struct rbd_img_request *img_request;
2196 struct rbd_obj_request *obj_request;
2197 struct rbd_obj_request *next_obj_request;
2199 img_request = container_of(kref, struct rbd_img_request, kref);
2201 dout("%s: img %p\n", __func__, img_request);
2203 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2204 rbd_img_obj_request_del(img_request, obj_request);
2205 rbd_assert(img_request->obj_request_count == 0);
2207 if (img_request_layered_test(img_request)) {
2208 img_request_layered_clear(img_request);
2209 rbd_dev_parent_put(img_request->rbd_dev);
2212 if (img_request_write_test(img_request) ||
2213 img_request_discard_test(img_request))
2214 ceph_put_snap_context(img_request->snapc);
2216 kmem_cache_free(rbd_img_request_cache, img_request);
2219 static struct rbd_img_request *rbd_parent_request_create(
2220 struct rbd_obj_request *obj_request,
2221 u64 img_offset, u64 length)
2223 struct rbd_img_request *parent_request;
2224 struct rbd_device *rbd_dev;
2226 rbd_assert(obj_request->img_request);
2227 rbd_dev = obj_request->img_request->rbd_dev;
2229 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2230 length, OBJ_OP_READ, NULL);
2231 if (!parent_request)
2232 return NULL;
2234 img_request_child_set(parent_request);
2235 rbd_obj_request_get(obj_request);
2236 parent_request->obj_request = obj_request;
2238 return parent_request;
2241 static void rbd_parent_request_destroy(struct kref *kref)
2243 struct rbd_img_request *parent_request;
2244 struct rbd_obj_request *orig_request;
2246 parent_request = container_of(kref, struct rbd_img_request, kref);
2247 orig_request = parent_request->obj_request;
2249 parent_request->obj_request = NULL;
2250 rbd_obj_request_put(orig_request);
2251 img_request_child_clear(parent_request);
2253 rbd_img_request_destroy(kref);
2256 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2258 struct rbd_img_request *img_request;
2259 unsigned int xferred;
2260 int result;
2261 bool more;
2263 rbd_assert(obj_request_img_data_test(obj_request));
2264 img_request = obj_request->img_request;
2266 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2267 xferred = (unsigned int)obj_request->xferred;
2268 result = obj_request->result;
2269 if (result) {
2270 struct rbd_device *rbd_dev = img_request->rbd_dev;
2271 enum obj_operation_type op_type;
2273 if (img_request_discard_test(img_request))
2274 op_type = OBJ_OP_DISCARD;
2275 else if (img_request_write_test(img_request))
2276 op_type = OBJ_OP_WRITE;
2277 else
2278 op_type = OBJ_OP_READ;
2280 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2281 obj_op_name(op_type), obj_request->length,
2282 obj_request->img_offset, obj_request->offset);
2283 rbd_warn(rbd_dev, " result %d xferred %x",
2284 result, xferred);
2285 if (!img_request->result)
2286 img_request->result = result;
2288 * Need to end I/O on the entire obj_request worth of
2289 * bytes in case of error.
2291 xferred = obj_request->length;
2294 if (img_request_child_test(img_request)) {
2295 rbd_assert(img_request->obj_request != NULL);
2296 more = obj_request->which < img_request->obj_request_count - 1;
2297 } else {
2298 blk_status_t status = errno_to_blk_status(result);
2300 rbd_assert(img_request->rq != NULL);
2302 more = blk_update_request(img_request->rq, status, xferred);
2303 if (!more)
2304 __blk_mq_end_request(img_request->rq, status);
2307 return more;
2310 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2312 struct rbd_img_request *img_request;
2313 u32 which = obj_request->which;
2314 bool more = true;
2316 rbd_assert(obj_request_img_data_test(obj_request));
2317 img_request = obj_request->img_request;
2319 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2320 rbd_assert(img_request != NULL);
2321 rbd_assert(img_request->obj_request_count > 0);
2322 rbd_assert(which != BAD_WHICH);
2323 rbd_assert(which < img_request->obj_request_count);
2325 spin_lock_irq(&img_request->completion_lock);
2326 if (which != img_request->next_completion)
2327 goto out;
2329 for_each_obj_request_from(img_request, obj_request) {
2330 rbd_assert(more);
2331 rbd_assert(which < img_request->obj_request_count);
2333 if (!obj_request_done_test(obj_request))
2334 break;
2335 more = rbd_img_obj_end_request(obj_request);
2336 which++;
2339 rbd_assert(more ^ (which == img_request->obj_request_count));
2340 img_request->next_completion = which;
2341 out:
2342 spin_unlock_irq(&img_request->completion_lock);
2343 rbd_img_request_put(img_request);
2345 if (!more)
2346 rbd_img_request_complete(img_request);
2350 * Add individual osd ops to the given ceph_osd_request and prepare
2351 * them for submission. num_ops is the current number of
2352 * osd operations already to the object request.
2354 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2355 struct ceph_osd_request *osd_request,
2356 enum obj_operation_type op_type,
2357 unsigned int num_ops)
2359 struct rbd_img_request *img_request = obj_request->img_request;
2360 struct rbd_device *rbd_dev = img_request->rbd_dev;
2361 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2362 u64 offset = obj_request->offset;
2363 u64 length = obj_request->length;
2364 u64 img_end;
2365 u16 opcode;
2367 if (op_type == OBJ_OP_DISCARD) {
2368 if (!offset && length == object_size &&
2369 (!img_request_layered_test(img_request) ||
2370 !obj_request_overlaps_parent(obj_request))) {
2371 opcode = CEPH_OSD_OP_DELETE;
2372 } else if ((offset + length == object_size)) {
2373 opcode = CEPH_OSD_OP_TRUNCATE;
2374 } else {
2375 down_read(&rbd_dev->header_rwsem);
2376 img_end = rbd_dev->header.image_size;
2377 up_read(&rbd_dev->header_rwsem);
2379 if (obj_request->img_offset + length == img_end)
2380 opcode = CEPH_OSD_OP_TRUNCATE;
2381 else
2382 opcode = CEPH_OSD_OP_ZERO;
2384 } else if (op_type == OBJ_OP_WRITE) {
2385 if (!offset && length == object_size)
2386 opcode = CEPH_OSD_OP_WRITEFULL;
2387 else
2388 opcode = CEPH_OSD_OP_WRITE;
2389 osd_req_op_alloc_hint_init(osd_request, num_ops,
2390 object_size, object_size);
2391 num_ops++;
2392 } else {
2393 opcode = CEPH_OSD_OP_READ;
2396 if (opcode == CEPH_OSD_OP_DELETE)
2397 osd_req_op_init(osd_request, num_ops, opcode, 0);
2398 else
2399 osd_req_op_extent_init(osd_request, num_ops, opcode,
2400 offset, length, 0, 0);
2402 if (obj_request->type == OBJ_REQUEST_BIO)
2403 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2404 obj_request->bio_list, length);
2405 else if (obj_request->type == OBJ_REQUEST_PAGES)
2406 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2407 obj_request->pages, length,
2408 offset & ~PAGE_MASK, false, false);
2410 /* Discards are also writes */
2411 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2412 rbd_osd_req_format_write(obj_request);
2413 else
2414 rbd_osd_req_format_read(obj_request);
2418 * Split up an image request into one or more object requests, each
2419 * to a different object. The "type" parameter indicates whether
2420 * "data_desc" is the pointer to the head of a list of bio
2421 * structures, or the base of a page array. In either case this
2422 * function assumes data_desc describes memory sufficient to hold
2423 * all data described by the image request.
2425 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2426 enum obj_request_type type,
2427 void *data_desc)
2429 struct rbd_device *rbd_dev = img_request->rbd_dev;
2430 struct rbd_obj_request *obj_request = NULL;
2431 struct rbd_obj_request *next_obj_request;
2432 struct bio *bio_list = NULL;
2433 unsigned int bio_offset = 0;
2434 struct page **pages = NULL;
2435 enum obj_operation_type op_type;
2436 u64 img_offset;
2437 u64 resid;
2439 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2440 (int)type, data_desc);
2442 img_offset = img_request->offset;
2443 resid = img_request->length;
2444 rbd_assert(resid > 0);
2445 op_type = rbd_img_request_op_type(img_request);
2447 if (type == OBJ_REQUEST_BIO) {
2448 bio_list = data_desc;
2449 rbd_assert(img_offset ==
2450 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2451 } else if (type == OBJ_REQUEST_PAGES) {
2452 pages = data_desc;
2455 while (resid) {
2456 struct ceph_osd_request *osd_req;
2457 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2458 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2459 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2461 obj_request = rbd_obj_request_create(type);
2462 if (!obj_request)
2463 goto out_unwind;
2465 obj_request->object_no = object_no;
2466 obj_request->offset = offset;
2467 obj_request->length = length;
2470 * set obj_request->img_request before creating the
2471 * osd_request so that it gets the right snapc
2473 rbd_img_obj_request_add(img_request, obj_request);
2475 if (type == OBJ_REQUEST_BIO) {
2476 unsigned int clone_size;
2478 rbd_assert(length <= (u64)UINT_MAX);
2479 clone_size = (unsigned int)length;
2480 obj_request->bio_list =
2481 bio_chain_clone_range(&bio_list,
2482 &bio_offset,
2483 clone_size,
2484 GFP_NOIO);
2485 if (!obj_request->bio_list)
2486 goto out_unwind;
2487 } else if (type == OBJ_REQUEST_PAGES) {
2488 unsigned int page_count;
2490 obj_request->pages = pages;
2491 page_count = (u32)calc_pages_for(offset, length);
2492 obj_request->page_count = page_count;
2493 if ((offset + length) & ~PAGE_MASK)
2494 page_count--; /* more on last page */
2495 pages += page_count;
2498 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2499 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2500 obj_request);
2501 if (!osd_req)
2502 goto out_unwind;
2504 obj_request->osd_req = osd_req;
2505 obj_request->callback = rbd_img_obj_callback;
2506 obj_request->img_offset = img_offset;
2508 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2510 img_offset += length;
2511 resid -= length;
2514 return 0;
2516 out_unwind:
2517 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2518 rbd_img_obj_request_del(img_request, obj_request);
2520 return -ENOMEM;
2523 static void
2524 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2526 struct rbd_img_request *img_request;
2527 struct rbd_device *rbd_dev;
2528 struct page **pages;
2529 u32 page_count;
2531 dout("%s: obj %p\n", __func__, obj_request);
2533 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2534 obj_request->type == OBJ_REQUEST_NODATA);
2535 rbd_assert(obj_request_img_data_test(obj_request));
2536 img_request = obj_request->img_request;
2537 rbd_assert(img_request);
2539 rbd_dev = img_request->rbd_dev;
2540 rbd_assert(rbd_dev);
2542 pages = obj_request->copyup_pages;
2543 rbd_assert(pages != NULL);
2544 obj_request->copyup_pages = NULL;
2545 page_count = obj_request->copyup_page_count;
2546 rbd_assert(page_count);
2547 obj_request->copyup_page_count = 0;
2548 ceph_release_page_vector(pages, page_count);
2551 * We want the transfer count to reflect the size of the
2552 * original write request. There is no such thing as a
2553 * successful short write, so if the request was successful
2554 * we can just set it to the originally-requested length.
2556 if (!obj_request->result)
2557 obj_request->xferred = obj_request->length;
2559 obj_request_done_set(obj_request);
2562 static void
2563 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2565 struct rbd_obj_request *orig_request;
2566 struct ceph_osd_request *osd_req;
2567 struct rbd_device *rbd_dev;
2568 struct page **pages;
2569 enum obj_operation_type op_type;
2570 u32 page_count;
2571 int img_result;
2572 u64 parent_length;
2574 rbd_assert(img_request_child_test(img_request));
2576 /* First get what we need from the image request */
2578 pages = img_request->copyup_pages;
2579 rbd_assert(pages != NULL);
2580 img_request->copyup_pages = NULL;
2581 page_count = img_request->copyup_page_count;
2582 rbd_assert(page_count);
2583 img_request->copyup_page_count = 0;
2585 orig_request = img_request->obj_request;
2586 rbd_assert(orig_request != NULL);
2587 rbd_assert(obj_request_type_valid(orig_request->type));
2588 img_result = img_request->result;
2589 parent_length = img_request->length;
2590 rbd_assert(img_result || parent_length == img_request->xferred);
2591 rbd_img_request_put(img_request);
2593 rbd_assert(orig_request->img_request);
2594 rbd_dev = orig_request->img_request->rbd_dev;
2595 rbd_assert(rbd_dev);
2598 * If the overlap has become 0 (most likely because the
2599 * image has been flattened) we need to free the pages
2600 * and re-submit the original write request.
2602 if (!rbd_dev->parent_overlap) {
2603 ceph_release_page_vector(pages, page_count);
2604 rbd_obj_request_submit(orig_request);
2605 return;
2608 if (img_result)
2609 goto out_err;
2612 * The original osd request is of no use to use any more.
2613 * We need a new one that can hold the three ops in a copyup
2614 * request. Allocate the new copyup osd request for the
2615 * original request, and release the old one.
2617 img_result = -ENOMEM;
2618 osd_req = rbd_osd_req_create_copyup(orig_request);
2619 if (!osd_req)
2620 goto out_err;
2621 rbd_osd_req_destroy(orig_request->osd_req);
2622 orig_request->osd_req = osd_req;
2623 orig_request->copyup_pages = pages;
2624 orig_request->copyup_page_count = page_count;
2626 /* Initialize the copyup op */
2628 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2629 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2630 false, false);
2632 /* Add the other op(s) */
2634 op_type = rbd_img_request_op_type(orig_request->img_request);
2635 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2637 /* All set, send it off. */
2639 rbd_obj_request_submit(orig_request);
2640 return;
2642 out_err:
2643 ceph_release_page_vector(pages, page_count);
2644 rbd_obj_request_error(orig_request, img_result);
2648 * Read from the parent image the range of data that covers the
2649 * entire target of the given object request. This is used for
2650 * satisfying a layered image write request when the target of an
2651 * object request from the image request does not exist.
2653 * A page array big enough to hold the returned data is allocated
2654 * and supplied to rbd_img_request_fill() as the "data descriptor."
2655 * When the read completes, this page array will be transferred to
2656 * the original object request for the copyup operation.
2658 * If an error occurs, it is recorded as the result of the original
2659 * object request in rbd_img_obj_exists_callback().
2661 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2663 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2664 struct rbd_img_request *parent_request = NULL;
2665 u64 img_offset;
2666 u64 length;
2667 struct page **pages = NULL;
2668 u32 page_count;
2669 int result;
2671 rbd_assert(rbd_dev->parent != NULL);
2674 * Determine the byte range covered by the object in the
2675 * child image to which the original request was to be sent.
2677 img_offset = obj_request->img_offset - obj_request->offset;
2678 length = rbd_obj_bytes(&rbd_dev->header);
2681 * There is no defined parent data beyond the parent
2682 * overlap, so limit what we read at that boundary if
2683 * necessary.
2685 if (img_offset + length > rbd_dev->parent_overlap) {
2686 rbd_assert(img_offset < rbd_dev->parent_overlap);
2687 length = rbd_dev->parent_overlap - img_offset;
2691 * Allocate a page array big enough to receive the data read
2692 * from the parent.
2694 page_count = (u32)calc_pages_for(0, length);
2695 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2696 if (IS_ERR(pages)) {
2697 result = PTR_ERR(pages);
2698 pages = NULL;
2699 goto out_err;
2702 result = -ENOMEM;
2703 parent_request = rbd_parent_request_create(obj_request,
2704 img_offset, length);
2705 if (!parent_request)
2706 goto out_err;
2708 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2709 if (result)
2710 goto out_err;
2712 parent_request->copyup_pages = pages;
2713 parent_request->copyup_page_count = page_count;
2714 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2716 result = rbd_img_request_submit(parent_request);
2717 if (!result)
2718 return 0;
2720 parent_request->copyup_pages = NULL;
2721 parent_request->copyup_page_count = 0;
2722 parent_request->obj_request = NULL;
2723 rbd_obj_request_put(obj_request);
2724 out_err:
2725 if (pages)
2726 ceph_release_page_vector(pages, page_count);
2727 if (parent_request)
2728 rbd_img_request_put(parent_request);
2729 return result;
2732 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2734 struct rbd_obj_request *orig_request;
2735 struct rbd_device *rbd_dev;
2736 int result;
2738 rbd_assert(!obj_request_img_data_test(obj_request));
2741 * All we need from the object request is the original
2742 * request and the result of the STAT op. Grab those, then
2743 * we're done with the request.
2745 orig_request = obj_request->obj_request;
2746 obj_request->obj_request = NULL;
2747 rbd_obj_request_put(orig_request);
2748 rbd_assert(orig_request);
2749 rbd_assert(orig_request->img_request);
2751 result = obj_request->result;
2752 obj_request->result = 0;
2754 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2755 obj_request, orig_request, result,
2756 obj_request->xferred, obj_request->length);
2757 rbd_obj_request_put(obj_request);
2760 * If the overlap has become 0 (most likely because the
2761 * image has been flattened) we need to re-submit the
2762 * original request.
2764 rbd_dev = orig_request->img_request->rbd_dev;
2765 if (!rbd_dev->parent_overlap) {
2766 rbd_obj_request_submit(orig_request);
2767 return;
2771 * Our only purpose here is to determine whether the object
2772 * exists, and we don't want to treat the non-existence as
2773 * an error. If something else comes back, transfer the
2774 * error to the original request and complete it now.
2776 if (!result) {
2777 obj_request_existence_set(orig_request, true);
2778 } else if (result == -ENOENT) {
2779 obj_request_existence_set(orig_request, false);
2780 } else {
2781 goto fail_orig_request;
2785 * Resubmit the original request now that we have recorded
2786 * whether the target object exists.
2788 result = rbd_img_obj_request_submit(orig_request);
2789 if (result)
2790 goto fail_orig_request;
2792 return;
2794 fail_orig_request:
2795 rbd_obj_request_error(orig_request, result);
2798 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2800 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2801 struct rbd_obj_request *stat_request;
2802 struct page **pages;
2803 u32 page_count;
2804 size_t size;
2805 int ret;
2807 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2808 if (!stat_request)
2809 return -ENOMEM;
2811 stat_request->object_no = obj_request->object_no;
2813 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2814 stat_request);
2815 if (!stat_request->osd_req) {
2816 ret = -ENOMEM;
2817 goto fail_stat_request;
2821 * The response data for a STAT call consists of:
2822 * le64 length;
2823 * struct {
2824 * le32 tv_sec;
2825 * le32 tv_nsec;
2826 * } mtime;
2828 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2829 page_count = (u32)calc_pages_for(0, size);
2830 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2831 if (IS_ERR(pages)) {
2832 ret = PTR_ERR(pages);
2833 goto fail_stat_request;
2836 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2837 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2838 false, false);
2840 rbd_obj_request_get(obj_request);
2841 stat_request->obj_request = obj_request;
2842 stat_request->pages = pages;
2843 stat_request->page_count = page_count;
2844 stat_request->callback = rbd_img_obj_exists_callback;
2846 rbd_obj_request_submit(stat_request);
2847 return 0;
2849 fail_stat_request:
2850 rbd_obj_request_put(stat_request);
2851 return ret;
2854 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2856 struct rbd_img_request *img_request = obj_request->img_request;
2857 struct rbd_device *rbd_dev = img_request->rbd_dev;
2859 /* Reads */
2860 if (!img_request_write_test(img_request) &&
2861 !img_request_discard_test(img_request))
2862 return true;
2864 /* Non-layered writes */
2865 if (!img_request_layered_test(img_request))
2866 return true;
2869 * Layered writes outside of the parent overlap range don't
2870 * share any data with the parent.
2872 if (!obj_request_overlaps_parent(obj_request))
2873 return true;
2876 * Entire-object layered writes - we will overwrite whatever
2877 * parent data there is anyway.
2879 if (!obj_request->offset &&
2880 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2881 return true;
2884 * If the object is known to already exist, its parent data has
2885 * already been copied.
2887 if (obj_request_known_test(obj_request) &&
2888 obj_request_exists_test(obj_request))
2889 return true;
2891 return false;
2894 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2896 rbd_assert(obj_request_img_data_test(obj_request));
2897 rbd_assert(obj_request_type_valid(obj_request->type));
2898 rbd_assert(obj_request->img_request);
2900 if (img_obj_request_simple(obj_request)) {
2901 rbd_obj_request_submit(obj_request);
2902 return 0;
2906 * It's a layered write. The target object might exist but
2907 * we may not know that yet. If we know it doesn't exist,
2908 * start by reading the data for the full target object from
2909 * the parent so we can use it for a copyup to the target.
2911 if (obj_request_known_test(obj_request))
2912 return rbd_img_obj_parent_read_full(obj_request);
2914 /* We don't know whether the target exists. Go find out. */
2916 return rbd_img_obj_exists_submit(obj_request);
2919 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2921 struct rbd_obj_request *obj_request;
2922 struct rbd_obj_request *next_obj_request;
2923 int ret = 0;
2925 dout("%s: img %p\n", __func__, img_request);
2927 rbd_img_request_get(img_request);
2928 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2929 ret = rbd_img_obj_request_submit(obj_request);
2930 if (ret)
2931 goto out_put_ireq;
2934 out_put_ireq:
2935 rbd_img_request_put(img_request);
2936 return ret;
2939 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2941 struct rbd_obj_request *obj_request;
2942 struct rbd_device *rbd_dev;
2943 u64 obj_end;
2944 u64 img_xferred;
2945 int img_result;
2947 rbd_assert(img_request_child_test(img_request));
2949 /* First get what we need from the image request and release it */
2951 obj_request = img_request->obj_request;
2952 img_xferred = img_request->xferred;
2953 img_result = img_request->result;
2954 rbd_img_request_put(img_request);
2957 * If the overlap has become 0 (most likely because the
2958 * image has been flattened) we need to re-submit the
2959 * original request.
2961 rbd_assert(obj_request);
2962 rbd_assert(obj_request->img_request);
2963 rbd_dev = obj_request->img_request->rbd_dev;
2964 if (!rbd_dev->parent_overlap) {
2965 rbd_obj_request_submit(obj_request);
2966 return;
2969 obj_request->result = img_result;
2970 if (obj_request->result)
2971 goto out;
2974 * We need to zero anything beyond the parent overlap
2975 * boundary. Since rbd_img_obj_request_read_callback()
2976 * will zero anything beyond the end of a short read, an
2977 * easy way to do this is to pretend the data from the
2978 * parent came up short--ending at the overlap boundary.
2980 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2981 obj_end = obj_request->img_offset + obj_request->length;
2982 if (obj_end > rbd_dev->parent_overlap) {
2983 u64 xferred = 0;
2985 if (obj_request->img_offset < rbd_dev->parent_overlap)
2986 xferred = rbd_dev->parent_overlap -
2987 obj_request->img_offset;
2989 obj_request->xferred = min(img_xferred, xferred);
2990 } else {
2991 obj_request->xferred = img_xferred;
2993 out:
2994 rbd_img_obj_request_read_callback(obj_request);
2995 rbd_obj_request_complete(obj_request);
2998 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3000 struct rbd_img_request *img_request;
3001 int result;
3003 rbd_assert(obj_request_img_data_test(obj_request));
3004 rbd_assert(obj_request->img_request != NULL);
3005 rbd_assert(obj_request->result == (s32) -ENOENT);
3006 rbd_assert(obj_request_type_valid(obj_request->type));
3008 /* rbd_read_finish(obj_request, obj_request->length); */
3009 img_request = rbd_parent_request_create(obj_request,
3010 obj_request->img_offset,
3011 obj_request->length);
3012 result = -ENOMEM;
3013 if (!img_request)
3014 goto out_err;
3016 if (obj_request->type == OBJ_REQUEST_BIO)
3017 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3018 obj_request->bio_list);
3019 else
3020 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3021 obj_request->pages);
3022 if (result)
3023 goto out_err;
3025 img_request->callback = rbd_img_parent_read_callback;
3026 result = rbd_img_request_submit(img_request);
3027 if (result)
3028 goto out_err;
3030 return;
3031 out_err:
3032 if (img_request)
3033 rbd_img_request_put(img_request);
3034 obj_request->result = result;
3035 obj_request->xferred = 0;
3036 obj_request_done_set(obj_request);
3039 static const struct rbd_client_id rbd_empty_cid;
3041 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3042 const struct rbd_client_id *rhs)
3044 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3047 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3049 struct rbd_client_id cid;
3051 mutex_lock(&rbd_dev->watch_mutex);
3052 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3053 cid.handle = rbd_dev->watch_cookie;
3054 mutex_unlock(&rbd_dev->watch_mutex);
3055 return cid;
3059 * lock_rwsem must be held for write
3061 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3062 const struct rbd_client_id *cid)
3064 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3065 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3066 cid->gid, cid->handle);
3067 rbd_dev->owner_cid = *cid; /* struct */
3070 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3072 mutex_lock(&rbd_dev->watch_mutex);
3073 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3074 mutex_unlock(&rbd_dev->watch_mutex);
3078 * lock_rwsem must be held for write
3080 static int rbd_lock(struct rbd_device *rbd_dev)
3082 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3083 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3084 char cookie[32];
3085 int ret;
3087 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3088 rbd_dev->lock_cookie[0] != '\0');
3090 format_lock_cookie(rbd_dev, cookie);
3091 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3092 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3093 RBD_LOCK_TAG, "", 0);
3094 if (ret)
3095 return ret;
3097 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3098 strcpy(rbd_dev->lock_cookie, cookie);
3099 rbd_set_owner_cid(rbd_dev, &cid);
3100 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3101 return 0;
3105 * lock_rwsem must be held for write
3107 static void rbd_unlock(struct rbd_device *rbd_dev)
3109 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3110 int ret;
3112 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3113 rbd_dev->lock_cookie[0] == '\0');
3115 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3116 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3117 if (ret && ret != -ENOENT)
3118 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3120 /* treat errors as the image is unlocked */
3121 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3122 rbd_dev->lock_cookie[0] = '\0';
3123 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3124 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3127 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3128 enum rbd_notify_op notify_op,
3129 struct page ***preply_pages,
3130 size_t *preply_len)
3132 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3133 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3134 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3135 char buf[buf_size];
3136 void *p = buf;
3138 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3140 /* encode *LockPayload NotifyMessage (op + ClientId) */
3141 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3142 ceph_encode_32(&p, notify_op);
3143 ceph_encode_64(&p, cid.gid);
3144 ceph_encode_64(&p, cid.handle);
3146 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3147 &rbd_dev->header_oloc, buf, buf_size,
3148 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3151 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3152 enum rbd_notify_op notify_op)
3154 struct page **reply_pages;
3155 size_t reply_len;
3157 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3158 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3161 static void rbd_notify_acquired_lock(struct work_struct *work)
3163 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3164 acquired_lock_work);
3166 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3169 static void rbd_notify_released_lock(struct work_struct *work)
3171 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3172 released_lock_work);
3174 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3177 static int rbd_request_lock(struct rbd_device *rbd_dev)
3179 struct page **reply_pages;
3180 size_t reply_len;
3181 bool lock_owner_responded = false;
3182 int ret;
3184 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3186 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3187 &reply_pages, &reply_len);
3188 if (ret && ret != -ETIMEDOUT) {
3189 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3190 goto out;
3193 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3194 void *p = page_address(reply_pages[0]);
3195 void *const end = p + reply_len;
3196 u32 n;
3198 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3199 while (n--) {
3200 u8 struct_v;
3201 u32 len;
3203 ceph_decode_need(&p, end, 8 + 8, e_inval);
3204 p += 8 + 8; /* skip gid and cookie */
3206 ceph_decode_32_safe(&p, end, len, e_inval);
3207 if (!len)
3208 continue;
3210 if (lock_owner_responded) {
3211 rbd_warn(rbd_dev,
3212 "duplicate lock owners detected");
3213 ret = -EIO;
3214 goto out;
3217 lock_owner_responded = true;
3218 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3219 &struct_v, &len);
3220 if (ret) {
3221 rbd_warn(rbd_dev,
3222 "failed to decode ResponseMessage: %d",
3223 ret);
3224 goto e_inval;
3227 ret = ceph_decode_32(&p);
3231 if (!lock_owner_responded) {
3232 rbd_warn(rbd_dev, "no lock owners detected");
3233 ret = -ETIMEDOUT;
3236 out:
3237 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3238 return ret;
3240 e_inval:
3241 ret = -EINVAL;
3242 goto out;
3245 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3247 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3249 cancel_delayed_work(&rbd_dev->lock_dwork);
3250 if (wake_all)
3251 wake_up_all(&rbd_dev->lock_waitq);
3252 else
3253 wake_up(&rbd_dev->lock_waitq);
3256 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3257 struct ceph_locker **lockers, u32 *num_lockers)
3259 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3260 u8 lock_type;
3261 char *lock_tag;
3262 int ret;
3264 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3266 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3267 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3268 &lock_type, &lock_tag, lockers, num_lockers);
3269 if (ret)
3270 return ret;
3272 if (*num_lockers == 0) {
3273 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3274 goto out;
3277 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3278 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3279 lock_tag);
3280 ret = -EBUSY;
3281 goto out;
3284 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3285 rbd_warn(rbd_dev, "shared lock type detected");
3286 ret = -EBUSY;
3287 goto out;
3290 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3291 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3292 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3293 (*lockers)[0].id.cookie);
3294 ret = -EBUSY;
3295 goto out;
3298 out:
3299 kfree(lock_tag);
3300 return ret;
3303 static int find_watcher(struct rbd_device *rbd_dev,
3304 const struct ceph_locker *locker)
3306 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3307 struct ceph_watch_item *watchers;
3308 u32 num_watchers;
3309 u64 cookie;
3310 int i;
3311 int ret;
3313 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3314 &rbd_dev->header_oloc, &watchers,
3315 &num_watchers);
3316 if (ret)
3317 return ret;
3319 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3320 for (i = 0; i < num_watchers; i++) {
3321 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3322 sizeof(locker->info.addr)) &&
3323 watchers[i].cookie == cookie) {
3324 struct rbd_client_id cid = {
3325 .gid = le64_to_cpu(watchers[i].name.num),
3326 .handle = cookie,
3329 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3330 rbd_dev, cid.gid, cid.handle);
3331 rbd_set_owner_cid(rbd_dev, &cid);
3332 ret = 1;
3333 goto out;
3337 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3338 ret = 0;
3339 out:
3340 kfree(watchers);
3341 return ret;
3345 * lock_rwsem must be held for write
3347 static int rbd_try_lock(struct rbd_device *rbd_dev)
3349 struct ceph_client *client = rbd_dev->rbd_client->client;
3350 struct ceph_locker *lockers;
3351 u32 num_lockers;
3352 int ret;
3354 for (;;) {
3355 ret = rbd_lock(rbd_dev);
3356 if (ret != -EBUSY)
3357 return ret;
3359 /* determine if the current lock holder is still alive */
3360 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3361 if (ret)
3362 return ret;
3364 if (num_lockers == 0)
3365 goto again;
3367 ret = find_watcher(rbd_dev, lockers);
3368 if (ret) {
3369 if (ret > 0)
3370 ret = 0; /* have to request lock */
3371 goto out;
3374 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3375 ENTITY_NAME(lockers[0].id.name));
3377 ret = ceph_monc_blacklist_add(&client->monc,
3378 &lockers[0].info.addr);
3379 if (ret) {
3380 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3381 ENTITY_NAME(lockers[0].id.name), ret);
3382 goto out;
3385 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3386 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3387 lockers[0].id.cookie,
3388 &lockers[0].id.name);
3389 if (ret && ret != -ENOENT)
3390 goto out;
3392 again:
3393 ceph_free_lockers(lockers, num_lockers);
3396 out:
3397 ceph_free_lockers(lockers, num_lockers);
3398 return ret;
3402 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3404 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3405 int *pret)
3407 enum rbd_lock_state lock_state;
3409 down_read(&rbd_dev->lock_rwsem);
3410 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3411 rbd_dev->lock_state);
3412 if (__rbd_is_lock_owner(rbd_dev)) {
3413 lock_state = rbd_dev->lock_state;
3414 up_read(&rbd_dev->lock_rwsem);
3415 return lock_state;
3418 up_read(&rbd_dev->lock_rwsem);
3419 down_write(&rbd_dev->lock_rwsem);
3420 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3421 rbd_dev->lock_state);
3422 if (!__rbd_is_lock_owner(rbd_dev)) {
3423 *pret = rbd_try_lock(rbd_dev);
3424 if (*pret)
3425 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3428 lock_state = rbd_dev->lock_state;
3429 up_write(&rbd_dev->lock_rwsem);
3430 return lock_state;
3433 static void rbd_acquire_lock(struct work_struct *work)
3435 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3436 struct rbd_device, lock_dwork);
3437 enum rbd_lock_state lock_state;
3438 int ret;
3440 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3441 again:
3442 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3443 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3444 if (lock_state == RBD_LOCK_STATE_LOCKED)
3445 wake_requests(rbd_dev, true);
3446 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3447 rbd_dev, lock_state, ret);
3448 return;
3451 ret = rbd_request_lock(rbd_dev);
3452 if (ret == -ETIMEDOUT) {
3453 goto again; /* treat this as a dead client */
3454 } else if (ret == -EROFS) {
3455 rbd_warn(rbd_dev, "peer will not release lock");
3457 * If this is rbd_add_acquire_lock(), we want to fail
3458 * immediately -- reuse BLACKLISTED flag. Otherwise we
3459 * want to block.
3461 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3462 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3463 /* wake "rbd map --exclusive" process */
3464 wake_requests(rbd_dev, false);
3466 } else if (ret < 0) {
3467 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3468 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3469 RBD_RETRY_DELAY);
3470 } else {
3472 * lock owner acked, but resend if we don't see them
3473 * release the lock
3475 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3476 rbd_dev);
3477 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3478 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3483 * lock_rwsem must be held for write
3485 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3487 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3488 rbd_dev->lock_state);
3489 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3490 return false;
3492 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3493 downgrade_write(&rbd_dev->lock_rwsem);
3495 * Ensure that all in-flight IO is flushed.
3497 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3498 * may be shared with other devices.
3500 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3501 up_read(&rbd_dev->lock_rwsem);
3503 down_write(&rbd_dev->lock_rwsem);
3504 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3505 rbd_dev->lock_state);
3506 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3507 return false;
3509 rbd_unlock(rbd_dev);
3511 * Give others a chance to grab the lock - we would re-acquire
3512 * almost immediately if we got new IO during ceph_osdc_sync()
3513 * otherwise. We need to ack our own notifications, so this
3514 * lock_dwork will be requeued from rbd_wait_state_locked()
3515 * after wake_requests() in rbd_handle_released_lock().
3517 cancel_delayed_work(&rbd_dev->lock_dwork);
3518 return true;
3521 static void rbd_release_lock_work(struct work_struct *work)
3523 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3524 unlock_work);
3526 down_write(&rbd_dev->lock_rwsem);
3527 rbd_release_lock(rbd_dev);
3528 up_write(&rbd_dev->lock_rwsem);
3531 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3532 void **p)
3534 struct rbd_client_id cid = { 0 };
3536 if (struct_v >= 2) {
3537 cid.gid = ceph_decode_64(p);
3538 cid.handle = ceph_decode_64(p);
3541 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3542 cid.handle);
3543 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3544 down_write(&rbd_dev->lock_rwsem);
3545 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3547 * we already know that the remote client is
3548 * the owner
3550 up_write(&rbd_dev->lock_rwsem);
3551 return;
3554 rbd_set_owner_cid(rbd_dev, &cid);
3555 downgrade_write(&rbd_dev->lock_rwsem);
3556 } else {
3557 down_read(&rbd_dev->lock_rwsem);
3560 if (!__rbd_is_lock_owner(rbd_dev))
3561 wake_requests(rbd_dev, false);
3562 up_read(&rbd_dev->lock_rwsem);
3565 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3566 void **p)
3568 struct rbd_client_id cid = { 0 };
3570 if (struct_v >= 2) {
3571 cid.gid = ceph_decode_64(p);
3572 cid.handle = ceph_decode_64(p);
3575 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3576 cid.handle);
3577 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3578 down_write(&rbd_dev->lock_rwsem);
3579 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3580 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3581 __func__, rbd_dev, cid.gid, cid.handle,
3582 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3583 up_write(&rbd_dev->lock_rwsem);
3584 return;
3587 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3588 downgrade_write(&rbd_dev->lock_rwsem);
3589 } else {
3590 down_read(&rbd_dev->lock_rwsem);
3593 if (!__rbd_is_lock_owner(rbd_dev))
3594 wake_requests(rbd_dev, false);
3595 up_read(&rbd_dev->lock_rwsem);
3599 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3600 * ResponseMessage is needed.
3602 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3603 void **p)
3605 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3606 struct rbd_client_id cid = { 0 };
3607 int result = 1;
3609 if (struct_v >= 2) {
3610 cid.gid = ceph_decode_64(p);
3611 cid.handle = ceph_decode_64(p);
3614 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3615 cid.handle);
3616 if (rbd_cid_equal(&cid, &my_cid))
3617 return result;
3619 down_read(&rbd_dev->lock_rwsem);
3620 if (__rbd_is_lock_owner(rbd_dev)) {
3621 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3622 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3623 goto out_unlock;
3626 * encode ResponseMessage(0) so the peer can detect
3627 * a missing owner
3629 result = 0;
3631 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3632 if (!rbd_dev->opts->exclusive) {
3633 dout("%s rbd_dev %p queueing unlock_work\n",
3634 __func__, rbd_dev);
3635 queue_work(rbd_dev->task_wq,
3636 &rbd_dev->unlock_work);
3637 } else {
3638 /* refuse to release the lock */
3639 result = -EROFS;
3644 out_unlock:
3645 up_read(&rbd_dev->lock_rwsem);
3646 return result;
3649 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3650 u64 notify_id, u64 cookie, s32 *result)
3652 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3653 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3654 char buf[buf_size];
3655 int ret;
3657 if (result) {
3658 void *p = buf;
3660 /* encode ResponseMessage */
3661 ceph_start_encoding(&p, 1, 1,
3662 buf_size - CEPH_ENCODING_START_BLK_LEN);
3663 ceph_encode_32(&p, *result);
3664 } else {
3665 buf_size = 0;
3668 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3669 &rbd_dev->header_oloc, notify_id, cookie,
3670 buf, buf_size);
3671 if (ret)
3672 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3675 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3676 u64 cookie)
3678 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3679 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3682 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3683 u64 notify_id, u64 cookie, s32 result)
3685 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3686 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3689 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3690 u64 notifier_id, void *data, size_t data_len)
3692 struct rbd_device *rbd_dev = arg;
3693 void *p = data;
3694 void *const end = p + data_len;
3695 u8 struct_v = 0;
3696 u32 len;
3697 u32 notify_op;
3698 int ret;
3700 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3701 __func__, rbd_dev, cookie, notify_id, data_len);
3702 if (data_len) {
3703 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3704 &struct_v, &len);
3705 if (ret) {
3706 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3707 ret);
3708 return;
3711 notify_op = ceph_decode_32(&p);
3712 } else {
3713 /* legacy notification for header updates */
3714 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3715 len = 0;
3718 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3719 switch (notify_op) {
3720 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3721 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3722 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3723 break;
3724 case RBD_NOTIFY_OP_RELEASED_LOCK:
3725 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3726 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3727 break;
3728 case RBD_NOTIFY_OP_REQUEST_LOCK:
3729 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3730 if (ret <= 0)
3731 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3732 cookie, ret);
3733 else
3734 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3735 break;
3736 case RBD_NOTIFY_OP_HEADER_UPDATE:
3737 ret = rbd_dev_refresh(rbd_dev);
3738 if (ret)
3739 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3741 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3742 break;
3743 default:
3744 if (rbd_is_lock_owner(rbd_dev))
3745 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3746 cookie, -EOPNOTSUPP);
3747 else
3748 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3749 break;
3753 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3755 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3757 struct rbd_device *rbd_dev = arg;
3759 rbd_warn(rbd_dev, "encountered watch error: %d", err);
3761 down_write(&rbd_dev->lock_rwsem);
3762 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3763 up_write(&rbd_dev->lock_rwsem);
3765 mutex_lock(&rbd_dev->watch_mutex);
3766 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3767 __rbd_unregister_watch(rbd_dev);
3768 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3770 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3772 mutex_unlock(&rbd_dev->watch_mutex);
3776 * watch_mutex must be locked
3778 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3780 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3781 struct ceph_osd_linger_request *handle;
3783 rbd_assert(!rbd_dev->watch_handle);
3784 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3786 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3787 &rbd_dev->header_oloc, rbd_watch_cb,
3788 rbd_watch_errcb, rbd_dev);
3789 if (IS_ERR(handle))
3790 return PTR_ERR(handle);
3792 rbd_dev->watch_handle = handle;
3793 return 0;
3797 * watch_mutex must be locked
3799 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3801 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3802 int ret;
3804 rbd_assert(rbd_dev->watch_handle);
3805 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3807 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3808 if (ret)
3809 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3811 rbd_dev->watch_handle = NULL;
3814 static int rbd_register_watch(struct rbd_device *rbd_dev)
3816 int ret;
3818 mutex_lock(&rbd_dev->watch_mutex);
3819 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3820 ret = __rbd_register_watch(rbd_dev);
3821 if (ret)
3822 goto out;
3824 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3825 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3827 out:
3828 mutex_unlock(&rbd_dev->watch_mutex);
3829 return ret;
3832 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3834 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3836 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3837 cancel_work_sync(&rbd_dev->acquired_lock_work);
3838 cancel_work_sync(&rbd_dev->released_lock_work);
3839 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3840 cancel_work_sync(&rbd_dev->unlock_work);
3843 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3845 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3846 cancel_tasks_sync(rbd_dev);
3848 mutex_lock(&rbd_dev->watch_mutex);
3849 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3850 __rbd_unregister_watch(rbd_dev);
3851 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3852 mutex_unlock(&rbd_dev->watch_mutex);
3854 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3858 * lock_rwsem must be held for write
3860 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3862 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3863 char cookie[32];
3864 int ret;
3866 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3868 format_lock_cookie(rbd_dev, cookie);
3869 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3870 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3871 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3872 RBD_LOCK_TAG, cookie);
3873 if (ret) {
3874 if (ret != -EOPNOTSUPP)
3875 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3876 ret);
3879 * Lock cookie cannot be updated on older OSDs, so do
3880 * a manual release and queue an acquire.
3882 if (rbd_release_lock(rbd_dev))
3883 queue_delayed_work(rbd_dev->task_wq,
3884 &rbd_dev->lock_dwork, 0);
3885 } else {
3886 strcpy(rbd_dev->lock_cookie, cookie);
3890 static void rbd_reregister_watch(struct work_struct *work)
3892 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3893 struct rbd_device, watch_dwork);
3894 int ret;
3896 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3898 mutex_lock(&rbd_dev->watch_mutex);
3899 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3900 mutex_unlock(&rbd_dev->watch_mutex);
3901 return;
3904 ret = __rbd_register_watch(rbd_dev);
3905 if (ret) {
3906 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3907 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3908 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3909 wake_requests(rbd_dev, true);
3910 } else {
3911 queue_delayed_work(rbd_dev->task_wq,
3912 &rbd_dev->watch_dwork,
3913 RBD_RETRY_DELAY);
3915 mutex_unlock(&rbd_dev->watch_mutex);
3916 return;
3919 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3920 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3921 mutex_unlock(&rbd_dev->watch_mutex);
3923 down_write(&rbd_dev->lock_rwsem);
3924 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3925 rbd_reacquire_lock(rbd_dev);
3926 up_write(&rbd_dev->lock_rwsem);
3928 ret = rbd_dev_refresh(rbd_dev);
3929 if (ret)
3930 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3934 * Synchronous osd object method call. Returns the number of bytes
3935 * returned in the outbound buffer, or a negative error code.
3937 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3938 struct ceph_object_id *oid,
3939 struct ceph_object_locator *oloc,
3940 const char *method_name,
3941 const void *outbound,
3942 size_t outbound_size,
3943 void *inbound,
3944 size_t inbound_size)
3946 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3947 struct page *req_page = NULL;
3948 struct page *reply_page;
3949 int ret;
3952 * Method calls are ultimately read operations. The result
3953 * should placed into the inbound buffer provided. They
3954 * also supply outbound data--parameters for the object
3955 * method. Currently if this is present it will be a
3956 * snapshot id.
3958 if (outbound) {
3959 if (outbound_size > PAGE_SIZE)
3960 return -E2BIG;
3962 req_page = alloc_page(GFP_KERNEL);
3963 if (!req_page)
3964 return -ENOMEM;
3966 memcpy(page_address(req_page), outbound, outbound_size);
3969 reply_page = alloc_page(GFP_KERNEL);
3970 if (!reply_page) {
3971 if (req_page)
3972 __free_page(req_page);
3973 return -ENOMEM;
3976 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3977 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3978 reply_page, &inbound_size);
3979 if (!ret) {
3980 memcpy(inbound, page_address(reply_page), inbound_size);
3981 ret = inbound_size;
3984 if (req_page)
3985 __free_page(req_page);
3986 __free_page(reply_page);
3987 return ret;
3991 * lock_rwsem must be held for read
3993 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3995 DEFINE_WAIT(wait);
3997 do {
3999 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4000 * and cancel_delayed_work() in wake_requests().
4002 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4003 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4004 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4005 TASK_UNINTERRUPTIBLE);
4006 up_read(&rbd_dev->lock_rwsem);
4007 schedule();
4008 down_read(&rbd_dev->lock_rwsem);
4009 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4010 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4012 finish_wait(&rbd_dev->lock_waitq, &wait);
4015 static void rbd_queue_workfn(struct work_struct *work)
4017 struct request *rq = blk_mq_rq_from_pdu(work);
4018 struct rbd_device *rbd_dev = rq->q->queuedata;
4019 struct rbd_img_request *img_request;
4020 struct ceph_snap_context *snapc = NULL;
4021 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4022 u64 length = blk_rq_bytes(rq);
4023 enum obj_operation_type op_type;
4024 u64 mapping_size;
4025 bool must_be_locked;
4026 int result;
4028 switch (req_op(rq)) {
4029 case REQ_OP_DISCARD:
4030 case REQ_OP_WRITE_ZEROES:
4031 op_type = OBJ_OP_DISCARD;
4032 break;
4033 case REQ_OP_WRITE:
4034 op_type = OBJ_OP_WRITE;
4035 break;
4036 case REQ_OP_READ:
4037 op_type = OBJ_OP_READ;
4038 break;
4039 default:
4040 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4041 result = -EIO;
4042 goto err;
4045 /* Ignore/skip any zero-length requests */
4047 if (!length) {
4048 dout("%s: zero-length request\n", __func__);
4049 result = 0;
4050 goto err_rq;
4053 /* Only reads are allowed to a read-only device */
4055 if (op_type != OBJ_OP_READ) {
4056 if (rbd_dev->mapping.read_only) {
4057 result = -EROFS;
4058 goto err_rq;
4060 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4064 * Quit early if the mapped snapshot no longer exists. It's
4065 * still possible the snapshot will have disappeared by the
4066 * time our request arrives at the osd, but there's no sense in
4067 * sending it if we already know.
4069 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4070 dout("request for non-existent snapshot");
4071 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4072 result = -ENXIO;
4073 goto err_rq;
4076 if (offset && length > U64_MAX - offset + 1) {
4077 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4078 length);
4079 result = -EINVAL;
4080 goto err_rq; /* Shouldn't happen */
4083 blk_mq_start_request(rq);
4085 down_read(&rbd_dev->header_rwsem);
4086 mapping_size = rbd_dev->mapping.size;
4087 if (op_type != OBJ_OP_READ) {
4088 snapc = rbd_dev->header.snapc;
4089 ceph_get_snap_context(snapc);
4091 up_read(&rbd_dev->header_rwsem);
4093 if (offset + length > mapping_size) {
4094 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4095 length, mapping_size);
4096 result = -EIO;
4097 goto err_rq;
4100 must_be_locked =
4101 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4102 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4103 if (must_be_locked) {
4104 down_read(&rbd_dev->lock_rwsem);
4105 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4106 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4107 if (rbd_dev->opts->exclusive) {
4108 rbd_warn(rbd_dev, "exclusive lock required");
4109 result = -EROFS;
4110 goto err_unlock;
4112 rbd_wait_state_locked(rbd_dev);
4114 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4115 result = -EBLACKLISTED;
4116 goto err_unlock;
4120 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4121 snapc);
4122 if (!img_request) {
4123 result = -ENOMEM;
4124 goto err_unlock;
4126 img_request->rq = rq;
4127 snapc = NULL; /* img_request consumes a ref */
4129 if (op_type == OBJ_OP_DISCARD)
4130 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4131 NULL);
4132 else
4133 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4134 rq->bio);
4135 if (result)
4136 goto err_img_request;
4138 result = rbd_img_request_submit(img_request);
4139 if (result)
4140 goto err_img_request;
4142 if (must_be_locked)
4143 up_read(&rbd_dev->lock_rwsem);
4144 return;
4146 err_img_request:
4147 rbd_img_request_put(img_request);
4148 err_unlock:
4149 if (must_be_locked)
4150 up_read(&rbd_dev->lock_rwsem);
4151 err_rq:
4152 if (result)
4153 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4154 obj_op_name(op_type), length, offset, result);
4155 ceph_put_snap_context(snapc);
4156 err:
4157 blk_mq_end_request(rq, errno_to_blk_status(result));
4160 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4161 const struct blk_mq_queue_data *bd)
4163 struct request *rq = bd->rq;
4164 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4166 queue_work(rbd_wq, work);
4167 return BLK_STS_OK;
4170 static void rbd_free_disk(struct rbd_device *rbd_dev)
4172 blk_cleanup_queue(rbd_dev->disk->queue);
4173 blk_mq_free_tag_set(&rbd_dev->tag_set);
4174 put_disk(rbd_dev->disk);
4175 rbd_dev->disk = NULL;
4178 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4179 struct ceph_object_id *oid,
4180 struct ceph_object_locator *oloc,
4181 void *buf, int buf_len)
4184 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4185 struct ceph_osd_request *req;
4186 struct page **pages;
4187 int num_pages = calc_pages_for(0, buf_len);
4188 int ret;
4190 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4191 if (!req)
4192 return -ENOMEM;
4194 ceph_oid_copy(&req->r_base_oid, oid);
4195 ceph_oloc_copy(&req->r_base_oloc, oloc);
4196 req->r_flags = CEPH_OSD_FLAG_READ;
4198 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4199 if (ret)
4200 goto out_req;
4202 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4203 if (IS_ERR(pages)) {
4204 ret = PTR_ERR(pages);
4205 goto out_req;
4208 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4209 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4210 true);
4212 ceph_osdc_start_request(osdc, req, false);
4213 ret = ceph_osdc_wait_request(osdc, req);
4214 if (ret >= 0)
4215 ceph_copy_from_page_vector(pages, buf, 0, ret);
4217 out_req:
4218 ceph_osdc_put_request(req);
4219 return ret;
4223 * Read the complete header for the given rbd device. On successful
4224 * return, the rbd_dev->header field will contain up-to-date
4225 * information about the image.
4227 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4229 struct rbd_image_header_ondisk *ondisk = NULL;
4230 u32 snap_count = 0;
4231 u64 names_size = 0;
4232 u32 want_count;
4233 int ret;
4236 * The complete header will include an array of its 64-bit
4237 * snapshot ids, followed by the names of those snapshots as
4238 * a contiguous block of NUL-terminated strings. Note that
4239 * the number of snapshots could change by the time we read
4240 * it in, in which case we re-read it.
4242 do {
4243 size_t size;
4245 kfree(ondisk);
4247 size = sizeof (*ondisk);
4248 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4249 size += names_size;
4250 ondisk = kmalloc(size, GFP_KERNEL);
4251 if (!ondisk)
4252 return -ENOMEM;
4254 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4255 &rbd_dev->header_oloc, ondisk, size);
4256 if (ret < 0)
4257 goto out;
4258 if ((size_t)ret < size) {
4259 ret = -ENXIO;
4260 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4261 size, ret);
4262 goto out;
4264 if (!rbd_dev_ondisk_valid(ondisk)) {
4265 ret = -ENXIO;
4266 rbd_warn(rbd_dev, "invalid header");
4267 goto out;
4270 names_size = le64_to_cpu(ondisk->snap_names_len);
4271 want_count = snap_count;
4272 snap_count = le32_to_cpu(ondisk->snap_count);
4273 } while (snap_count != want_count);
4275 ret = rbd_header_from_disk(rbd_dev, ondisk);
4276 out:
4277 kfree(ondisk);
4279 return ret;
4283 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4284 * has disappeared from the (just updated) snapshot context.
4286 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4288 u64 snap_id;
4290 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4291 return;
4293 snap_id = rbd_dev->spec->snap_id;
4294 if (snap_id == CEPH_NOSNAP)
4295 return;
4297 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4298 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4301 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4303 sector_t size;
4306 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4307 * try to update its size. If REMOVING is set, updating size
4308 * is just useless work since the device can't be opened.
4310 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4311 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4312 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4313 dout("setting size to %llu sectors", (unsigned long long)size);
4314 set_capacity(rbd_dev->disk, size);
4315 revalidate_disk(rbd_dev->disk);
4319 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4321 u64 mapping_size;
4322 int ret;
4324 down_write(&rbd_dev->header_rwsem);
4325 mapping_size = rbd_dev->mapping.size;
4327 ret = rbd_dev_header_info(rbd_dev);
4328 if (ret)
4329 goto out;
4332 * If there is a parent, see if it has disappeared due to the
4333 * mapped image getting flattened.
4335 if (rbd_dev->parent) {
4336 ret = rbd_dev_v2_parent_info(rbd_dev);
4337 if (ret)
4338 goto out;
4341 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4342 rbd_dev->mapping.size = rbd_dev->header.image_size;
4343 } else {
4344 /* validate mapped snapshot's EXISTS flag */
4345 rbd_exists_validate(rbd_dev);
4348 out:
4349 up_write(&rbd_dev->header_rwsem);
4350 if (!ret && mapping_size != rbd_dev->mapping.size)
4351 rbd_dev_update_size(rbd_dev);
4353 return ret;
4356 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4357 unsigned int hctx_idx, unsigned int numa_node)
4359 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4361 INIT_WORK(work, rbd_queue_workfn);
4362 return 0;
4365 static const struct blk_mq_ops rbd_mq_ops = {
4366 .queue_rq = rbd_queue_rq,
4367 .init_request = rbd_init_request,
4370 static int rbd_init_disk(struct rbd_device *rbd_dev)
4372 struct gendisk *disk;
4373 struct request_queue *q;
4374 u64 segment_size;
4375 int err;
4377 /* create gendisk info */
4378 disk = alloc_disk(single_major ?
4379 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4380 RBD_MINORS_PER_MAJOR);
4381 if (!disk)
4382 return -ENOMEM;
4384 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4385 rbd_dev->dev_id);
4386 disk->major = rbd_dev->major;
4387 disk->first_minor = rbd_dev->minor;
4388 if (single_major)
4389 disk->flags |= GENHD_FL_EXT_DEVT;
4390 disk->fops = &rbd_bd_ops;
4391 disk->private_data = rbd_dev;
4393 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4394 rbd_dev->tag_set.ops = &rbd_mq_ops;
4395 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4396 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4397 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4398 rbd_dev->tag_set.nr_hw_queues = 1;
4399 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4401 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4402 if (err)
4403 goto out_disk;
4405 q = blk_mq_init_queue(&rbd_dev->tag_set);
4406 if (IS_ERR(q)) {
4407 err = PTR_ERR(q);
4408 goto out_tag_set;
4411 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4412 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4414 /* set io sizes to object size */
4415 segment_size = rbd_obj_bytes(&rbd_dev->header);
4416 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4417 q->limits.max_sectors = queue_max_hw_sectors(q);
4418 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4419 blk_queue_max_segment_size(q, segment_size);
4420 blk_queue_io_min(q, segment_size);
4421 blk_queue_io_opt(q, segment_size);
4423 /* enable the discard support */
4424 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4425 q->limits.discard_granularity = segment_size;
4426 q->limits.discard_alignment = segment_size;
4427 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4428 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4430 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4431 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4434 * disk_release() expects a queue ref from add_disk() and will
4435 * put it. Hold an extra ref until add_disk() is called.
4437 WARN_ON(!blk_get_queue(q));
4438 disk->queue = q;
4439 q->queuedata = rbd_dev;
4441 rbd_dev->disk = disk;
4443 return 0;
4444 out_tag_set:
4445 blk_mq_free_tag_set(&rbd_dev->tag_set);
4446 out_disk:
4447 put_disk(disk);
4448 return err;
4452 sysfs
4455 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4457 return container_of(dev, struct rbd_device, dev);
4460 static ssize_t rbd_size_show(struct device *dev,
4461 struct device_attribute *attr, char *buf)
4463 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4465 return sprintf(buf, "%llu\n",
4466 (unsigned long long)rbd_dev->mapping.size);
4470 * Note this shows the features for whatever's mapped, which is not
4471 * necessarily the base image.
4473 static ssize_t rbd_features_show(struct device *dev,
4474 struct device_attribute *attr, char *buf)
4476 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4478 return sprintf(buf, "0x%016llx\n",
4479 (unsigned long long)rbd_dev->mapping.features);
4482 static ssize_t rbd_major_show(struct device *dev,
4483 struct device_attribute *attr, char *buf)
4485 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4487 if (rbd_dev->major)
4488 return sprintf(buf, "%d\n", rbd_dev->major);
4490 return sprintf(buf, "(none)\n");
4493 static ssize_t rbd_minor_show(struct device *dev,
4494 struct device_attribute *attr, char *buf)
4496 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4498 return sprintf(buf, "%d\n", rbd_dev->minor);
4501 static ssize_t rbd_client_addr_show(struct device *dev,
4502 struct device_attribute *attr, char *buf)
4504 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4505 struct ceph_entity_addr *client_addr =
4506 ceph_client_addr(rbd_dev->rbd_client->client);
4508 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4509 le32_to_cpu(client_addr->nonce));
4512 static ssize_t rbd_client_id_show(struct device *dev,
4513 struct device_attribute *attr, char *buf)
4515 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4517 return sprintf(buf, "client%lld\n",
4518 ceph_client_gid(rbd_dev->rbd_client->client));
4521 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4522 struct device_attribute *attr, char *buf)
4524 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4526 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4529 static ssize_t rbd_config_info_show(struct device *dev,
4530 struct device_attribute *attr, char *buf)
4532 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4534 return sprintf(buf, "%s\n", rbd_dev->config_info);
4537 static ssize_t rbd_pool_show(struct device *dev,
4538 struct device_attribute *attr, char *buf)
4540 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4542 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4545 static ssize_t rbd_pool_id_show(struct device *dev,
4546 struct device_attribute *attr, char *buf)
4548 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4550 return sprintf(buf, "%llu\n",
4551 (unsigned long long) rbd_dev->spec->pool_id);
4554 static ssize_t rbd_name_show(struct device *dev,
4555 struct device_attribute *attr, char *buf)
4557 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4559 if (rbd_dev->spec->image_name)
4560 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4562 return sprintf(buf, "(unknown)\n");
4565 static ssize_t rbd_image_id_show(struct device *dev,
4566 struct device_attribute *attr, char *buf)
4568 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4570 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4574 * Shows the name of the currently-mapped snapshot (or
4575 * RBD_SNAP_HEAD_NAME for the base image).
4577 static ssize_t rbd_snap_show(struct device *dev,
4578 struct device_attribute *attr,
4579 char *buf)
4581 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4583 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4586 static ssize_t rbd_snap_id_show(struct device *dev,
4587 struct device_attribute *attr, char *buf)
4589 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4591 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4595 * For a v2 image, shows the chain of parent images, separated by empty
4596 * lines. For v1 images or if there is no parent, shows "(no parent
4597 * image)".
4599 static ssize_t rbd_parent_show(struct device *dev,
4600 struct device_attribute *attr,
4601 char *buf)
4603 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4604 ssize_t count = 0;
4606 if (!rbd_dev->parent)
4607 return sprintf(buf, "(no parent image)\n");
4609 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4610 struct rbd_spec *spec = rbd_dev->parent_spec;
4612 count += sprintf(&buf[count], "%s"
4613 "pool_id %llu\npool_name %s\n"
4614 "image_id %s\nimage_name %s\n"
4615 "snap_id %llu\nsnap_name %s\n"
4616 "overlap %llu\n",
4617 !count ? "" : "\n", /* first? */
4618 spec->pool_id, spec->pool_name,
4619 spec->image_id, spec->image_name ?: "(unknown)",
4620 spec->snap_id, spec->snap_name,
4621 rbd_dev->parent_overlap);
4624 return count;
4627 static ssize_t rbd_image_refresh(struct device *dev,
4628 struct device_attribute *attr,
4629 const char *buf,
4630 size_t size)
4632 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4633 int ret;
4635 ret = rbd_dev_refresh(rbd_dev);
4636 if (ret)
4637 return ret;
4639 return size;
4642 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4643 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4644 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4645 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4646 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4647 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4648 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4649 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4650 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4651 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4652 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4653 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4654 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4655 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4656 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4657 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4659 static struct attribute *rbd_attrs[] = {
4660 &dev_attr_size.attr,
4661 &dev_attr_features.attr,
4662 &dev_attr_major.attr,
4663 &dev_attr_minor.attr,
4664 &dev_attr_client_addr.attr,
4665 &dev_attr_client_id.attr,
4666 &dev_attr_cluster_fsid.attr,
4667 &dev_attr_config_info.attr,
4668 &dev_attr_pool.attr,
4669 &dev_attr_pool_id.attr,
4670 &dev_attr_name.attr,
4671 &dev_attr_image_id.attr,
4672 &dev_attr_current_snap.attr,
4673 &dev_attr_snap_id.attr,
4674 &dev_attr_parent.attr,
4675 &dev_attr_refresh.attr,
4676 NULL
4679 static struct attribute_group rbd_attr_group = {
4680 .attrs = rbd_attrs,
4683 static const struct attribute_group *rbd_attr_groups[] = {
4684 &rbd_attr_group,
4685 NULL
4688 static void rbd_dev_release(struct device *dev);
4690 static const struct device_type rbd_device_type = {
4691 .name = "rbd",
4692 .groups = rbd_attr_groups,
4693 .release = rbd_dev_release,
4696 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4698 kref_get(&spec->kref);
4700 return spec;
4703 static void rbd_spec_free(struct kref *kref);
4704 static void rbd_spec_put(struct rbd_spec *spec)
4706 if (spec)
4707 kref_put(&spec->kref, rbd_spec_free);
4710 static struct rbd_spec *rbd_spec_alloc(void)
4712 struct rbd_spec *spec;
4714 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4715 if (!spec)
4716 return NULL;
4718 spec->pool_id = CEPH_NOPOOL;
4719 spec->snap_id = CEPH_NOSNAP;
4720 kref_init(&spec->kref);
4722 return spec;
4725 static void rbd_spec_free(struct kref *kref)
4727 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4729 kfree(spec->pool_name);
4730 kfree(spec->image_id);
4731 kfree(spec->image_name);
4732 kfree(spec->snap_name);
4733 kfree(spec);
4736 static void rbd_dev_free(struct rbd_device *rbd_dev)
4738 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4739 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4741 ceph_oid_destroy(&rbd_dev->header_oid);
4742 ceph_oloc_destroy(&rbd_dev->header_oloc);
4743 kfree(rbd_dev->config_info);
4745 rbd_put_client(rbd_dev->rbd_client);
4746 rbd_spec_put(rbd_dev->spec);
4747 kfree(rbd_dev->opts);
4748 kfree(rbd_dev);
4751 static void rbd_dev_release(struct device *dev)
4753 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4754 bool need_put = !!rbd_dev->opts;
4756 if (need_put) {
4757 destroy_workqueue(rbd_dev->task_wq);
4758 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4761 rbd_dev_free(rbd_dev);
4764 * This is racy, but way better than putting module outside of
4765 * the release callback. The race window is pretty small, so
4766 * doing something similar to dm (dm-builtin.c) is overkill.
4768 if (need_put)
4769 module_put(THIS_MODULE);
4772 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4773 struct rbd_spec *spec)
4775 struct rbd_device *rbd_dev;
4777 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4778 if (!rbd_dev)
4779 return NULL;
4781 spin_lock_init(&rbd_dev->lock);
4782 INIT_LIST_HEAD(&rbd_dev->node);
4783 init_rwsem(&rbd_dev->header_rwsem);
4785 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4786 ceph_oid_init(&rbd_dev->header_oid);
4787 rbd_dev->header_oloc.pool = spec->pool_id;
4789 mutex_init(&rbd_dev->watch_mutex);
4790 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4791 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4793 init_rwsem(&rbd_dev->lock_rwsem);
4794 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4795 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4796 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4797 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4798 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4799 init_waitqueue_head(&rbd_dev->lock_waitq);
4801 rbd_dev->dev.bus = &rbd_bus_type;
4802 rbd_dev->dev.type = &rbd_device_type;
4803 rbd_dev->dev.parent = &rbd_root_dev;
4804 device_initialize(&rbd_dev->dev);
4806 rbd_dev->rbd_client = rbdc;
4807 rbd_dev->spec = spec;
4809 return rbd_dev;
4813 * Create a mapping rbd_dev.
4815 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4816 struct rbd_spec *spec,
4817 struct rbd_options *opts)
4819 struct rbd_device *rbd_dev;
4821 rbd_dev = __rbd_dev_create(rbdc, spec);
4822 if (!rbd_dev)
4823 return NULL;
4825 rbd_dev->opts = opts;
4827 /* get an id and fill in device name */
4828 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4829 minor_to_rbd_dev_id(1 << MINORBITS),
4830 GFP_KERNEL);
4831 if (rbd_dev->dev_id < 0)
4832 goto fail_rbd_dev;
4834 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4835 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4836 rbd_dev->name);
4837 if (!rbd_dev->task_wq)
4838 goto fail_dev_id;
4840 /* we have a ref from do_rbd_add() */
4841 __module_get(THIS_MODULE);
4843 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4844 return rbd_dev;
4846 fail_dev_id:
4847 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4848 fail_rbd_dev:
4849 rbd_dev_free(rbd_dev);
4850 return NULL;
4853 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4855 if (rbd_dev)
4856 put_device(&rbd_dev->dev);
4860 * Get the size and object order for an image snapshot, or if
4861 * snap_id is CEPH_NOSNAP, gets this information for the base
4862 * image.
4864 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4865 u8 *order, u64 *snap_size)
4867 __le64 snapid = cpu_to_le64(snap_id);
4868 int ret;
4869 struct {
4870 u8 order;
4871 __le64 size;
4872 } __attribute__ ((packed)) size_buf = { 0 };
4874 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4875 &rbd_dev->header_oloc, "get_size",
4876 &snapid, sizeof(snapid),
4877 &size_buf, sizeof(size_buf));
4878 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4879 if (ret < 0)
4880 return ret;
4881 if (ret < sizeof (size_buf))
4882 return -ERANGE;
4884 if (order) {
4885 *order = size_buf.order;
4886 dout(" order %u", (unsigned int)*order);
4888 *snap_size = le64_to_cpu(size_buf.size);
4890 dout(" snap_id 0x%016llx snap_size = %llu\n",
4891 (unsigned long long)snap_id,
4892 (unsigned long long)*snap_size);
4894 return 0;
4897 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4899 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4900 &rbd_dev->header.obj_order,
4901 &rbd_dev->header.image_size);
4904 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4906 void *reply_buf;
4907 int ret;
4908 void *p;
4910 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4911 if (!reply_buf)
4912 return -ENOMEM;
4914 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4915 &rbd_dev->header_oloc, "get_object_prefix",
4916 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4917 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4918 if (ret < 0)
4919 goto out;
4921 p = reply_buf;
4922 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4923 p + ret, NULL, GFP_NOIO);
4924 ret = 0;
4926 if (IS_ERR(rbd_dev->header.object_prefix)) {
4927 ret = PTR_ERR(rbd_dev->header.object_prefix);
4928 rbd_dev->header.object_prefix = NULL;
4929 } else {
4930 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4932 out:
4933 kfree(reply_buf);
4935 return ret;
4938 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4939 u64 *snap_features)
4941 __le64 snapid = cpu_to_le64(snap_id);
4942 struct {
4943 __le64 features;
4944 __le64 incompat;
4945 } __attribute__ ((packed)) features_buf = { 0 };
4946 u64 unsup;
4947 int ret;
4949 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4950 &rbd_dev->header_oloc, "get_features",
4951 &snapid, sizeof(snapid),
4952 &features_buf, sizeof(features_buf));
4953 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4954 if (ret < 0)
4955 return ret;
4956 if (ret < sizeof (features_buf))
4957 return -ERANGE;
4959 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4960 if (unsup) {
4961 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4962 unsup);
4963 return -ENXIO;
4966 *snap_features = le64_to_cpu(features_buf.features);
4968 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4969 (unsigned long long)snap_id,
4970 (unsigned long long)*snap_features,
4971 (unsigned long long)le64_to_cpu(features_buf.incompat));
4973 return 0;
4976 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4978 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4979 &rbd_dev->header.features);
4982 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4984 struct rbd_spec *parent_spec;
4985 size_t size;
4986 void *reply_buf = NULL;
4987 __le64 snapid;
4988 void *p;
4989 void *end;
4990 u64 pool_id;
4991 char *image_id;
4992 u64 snap_id;
4993 u64 overlap;
4994 int ret;
4996 parent_spec = rbd_spec_alloc();
4997 if (!parent_spec)
4998 return -ENOMEM;
5000 size = sizeof (__le64) + /* pool_id */
5001 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5002 sizeof (__le64) + /* snap_id */
5003 sizeof (__le64); /* overlap */
5004 reply_buf = kmalloc(size, GFP_KERNEL);
5005 if (!reply_buf) {
5006 ret = -ENOMEM;
5007 goto out_err;
5010 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5011 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5012 &rbd_dev->header_oloc, "get_parent",
5013 &snapid, sizeof(snapid), reply_buf, size);
5014 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5015 if (ret < 0)
5016 goto out_err;
5018 p = reply_buf;
5019 end = reply_buf + ret;
5020 ret = -ERANGE;
5021 ceph_decode_64_safe(&p, end, pool_id, out_err);
5022 if (pool_id == CEPH_NOPOOL) {
5024 * Either the parent never existed, or we have
5025 * record of it but the image got flattened so it no
5026 * longer has a parent. When the parent of a
5027 * layered image disappears we immediately set the
5028 * overlap to 0. The effect of this is that all new
5029 * requests will be treated as if the image had no
5030 * parent.
5032 if (rbd_dev->parent_overlap) {
5033 rbd_dev->parent_overlap = 0;
5034 rbd_dev_parent_put(rbd_dev);
5035 pr_info("%s: clone image has been flattened\n",
5036 rbd_dev->disk->disk_name);
5039 goto out; /* No parent? No problem. */
5042 /* The ceph file layout needs to fit pool id in 32 bits */
5044 ret = -EIO;
5045 if (pool_id > (u64)U32_MAX) {
5046 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5047 (unsigned long long)pool_id, U32_MAX);
5048 goto out_err;
5051 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5052 if (IS_ERR(image_id)) {
5053 ret = PTR_ERR(image_id);
5054 goto out_err;
5056 ceph_decode_64_safe(&p, end, snap_id, out_err);
5057 ceph_decode_64_safe(&p, end, overlap, out_err);
5060 * The parent won't change (except when the clone is
5061 * flattened, already handled that). So we only need to
5062 * record the parent spec we have not already done so.
5064 if (!rbd_dev->parent_spec) {
5065 parent_spec->pool_id = pool_id;
5066 parent_spec->image_id = image_id;
5067 parent_spec->snap_id = snap_id;
5068 rbd_dev->parent_spec = parent_spec;
5069 parent_spec = NULL; /* rbd_dev now owns this */
5070 } else {
5071 kfree(image_id);
5075 * We always update the parent overlap. If it's zero we issue
5076 * a warning, as we will proceed as if there was no parent.
5078 if (!overlap) {
5079 if (parent_spec) {
5080 /* refresh, careful to warn just once */
5081 if (rbd_dev->parent_overlap)
5082 rbd_warn(rbd_dev,
5083 "clone now standalone (overlap became 0)");
5084 } else {
5085 /* initial probe */
5086 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5089 rbd_dev->parent_overlap = overlap;
5091 out:
5092 ret = 0;
5093 out_err:
5094 kfree(reply_buf);
5095 rbd_spec_put(parent_spec);
5097 return ret;
5100 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5102 struct {
5103 __le64 stripe_unit;
5104 __le64 stripe_count;
5105 } __attribute__ ((packed)) striping_info_buf = { 0 };
5106 size_t size = sizeof (striping_info_buf);
5107 void *p;
5108 u64 obj_size;
5109 u64 stripe_unit;
5110 u64 stripe_count;
5111 int ret;
5113 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5114 &rbd_dev->header_oloc, "get_stripe_unit_count",
5115 NULL, 0, &striping_info_buf, size);
5116 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5117 if (ret < 0)
5118 return ret;
5119 if (ret < size)
5120 return -ERANGE;
5123 * We don't actually support the "fancy striping" feature
5124 * (STRIPINGV2) yet, but if the striping sizes are the
5125 * defaults the behavior is the same as before. So find
5126 * out, and only fail if the image has non-default values.
5128 ret = -EINVAL;
5129 obj_size = rbd_obj_bytes(&rbd_dev->header);
5130 p = &striping_info_buf;
5131 stripe_unit = ceph_decode_64(&p);
5132 if (stripe_unit != obj_size) {
5133 rbd_warn(rbd_dev, "unsupported stripe unit "
5134 "(got %llu want %llu)",
5135 stripe_unit, obj_size);
5136 return -EINVAL;
5138 stripe_count = ceph_decode_64(&p);
5139 if (stripe_count != 1) {
5140 rbd_warn(rbd_dev, "unsupported stripe count "
5141 "(got %llu want 1)", stripe_count);
5142 return -EINVAL;
5144 rbd_dev->header.stripe_unit = stripe_unit;
5145 rbd_dev->header.stripe_count = stripe_count;
5147 return 0;
5150 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5152 __le64 data_pool_id;
5153 int ret;
5155 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5156 &rbd_dev->header_oloc, "get_data_pool",
5157 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5158 if (ret < 0)
5159 return ret;
5160 if (ret < sizeof(data_pool_id))
5161 return -EBADMSG;
5163 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5164 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5165 return 0;
5168 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5170 CEPH_DEFINE_OID_ONSTACK(oid);
5171 size_t image_id_size;
5172 char *image_id;
5173 void *p;
5174 void *end;
5175 size_t size;
5176 void *reply_buf = NULL;
5177 size_t len = 0;
5178 char *image_name = NULL;
5179 int ret;
5181 rbd_assert(!rbd_dev->spec->image_name);
5183 len = strlen(rbd_dev->spec->image_id);
5184 image_id_size = sizeof (__le32) + len;
5185 image_id = kmalloc(image_id_size, GFP_KERNEL);
5186 if (!image_id)
5187 return NULL;
5189 p = image_id;
5190 end = image_id + image_id_size;
5191 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5193 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5194 reply_buf = kmalloc(size, GFP_KERNEL);
5195 if (!reply_buf)
5196 goto out;
5198 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5199 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5200 "dir_get_name", image_id, image_id_size,
5201 reply_buf, size);
5202 if (ret < 0)
5203 goto out;
5204 p = reply_buf;
5205 end = reply_buf + ret;
5207 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5208 if (IS_ERR(image_name))
5209 image_name = NULL;
5210 else
5211 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5212 out:
5213 kfree(reply_buf);
5214 kfree(image_id);
5216 return image_name;
5219 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5221 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5222 const char *snap_name;
5223 u32 which = 0;
5225 /* Skip over names until we find the one we are looking for */
5227 snap_name = rbd_dev->header.snap_names;
5228 while (which < snapc->num_snaps) {
5229 if (!strcmp(name, snap_name))
5230 return snapc->snaps[which];
5231 snap_name += strlen(snap_name) + 1;
5232 which++;
5234 return CEPH_NOSNAP;
5237 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5239 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5240 u32 which;
5241 bool found = false;
5242 u64 snap_id;
5244 for (which = 0; !found && which < snapc->num_snaps; which++) {
5245 const char *snap_name;
5247 snap_id = snapc->snaps[which];
5248 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5249 if (IS_ERR(snap_name)) {
5250 /* ignore no-longer existing snapshots */
5251 if (PTR_ERR(snap_name) == -ENOENT)
5252 continue;
5253 else
5254 break;
5256 found = !strcmp(name, snap_name);
5257 kfree(snap_name);
5259 return found ? snap_id : CEPH_NOSNAP;
5263 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5264 * no snapshot by that name is found, or if an error occurs.
5266 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5268 if (rbd_dev->image_format == 1)
5269 return rbd_v1_snap_id_by_name(rbd_dev, name);
5271 return rbd_v2_snap_id_by_name(rbd_dev, name);
5275 * An image being mapped will have everything but the snap id.
5277 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5279 struct rbd_spec *spec = rbd_dev->spec;
5281 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5282 rbd_assert(spec->image_id && spec->image_name);
5283 rbd_assert(spec->snap_name);
5285 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5286 u64 snap_id;
5288 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5289 if (snap_id == CEPH_NOSNAP)
5290 return -ENOENT;
5292 spec->snap_id = snap_id;
5293 } else {
5294 spec->snap_id = CEPH_NOSNAP;
5297 return 0;
5301 * A parent image will have all ids but none of the names.
5303 * All names in an rbd spec are dynamically allocated. It's OK if we
5304 * can't figure out the name for an image id.
5306 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5308 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5309 struct rbd_spec *spec = rbd_dev->spec;
5310 const char *pool_name;
5311 const char *image_name;
5312 const char *snap_name;
5313 int ret;
5315 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5316 rbd_assert(spec->image_id);
5317 rbd_assert(spec->snap_id != CEPH_NOSNAP);
5319 /* Get the pool name; we have to make our own copy of this */
5321 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5322 if (!pool_name) {
5323 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5324 return -EIO;
5326 pool_name = kstrdup(pool_name, GFP_KERNEL);
5327 if (!pool_name)
5328 return -ENOMEM;
5330 /* Fetch the image name; tolerate failure here */
5332 image_name = rbd_dev_image_name(rbd_dev);
5333 if (!image_name)
5334 rbd_warn(rbd_dev, "unable to get image name");
5336 /* Fetch the snapshot name */
5338 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5339 if (IS_ERR(snap_name)) {
5340 ret = PTR_ERR(snap_name);
5341 goto out_err;
5344 spec->pool_name = pool_name;
5345 spec->image_name = image_name;
5346 spec->snap_name = snap_name;
5348 return 0;
5350 out_err:
5351 kfree(image_name);
5352 kfree(pool_name);
5353 return ret;
5356 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5358 size_t size;
5359 int ret;
5360 void *reply_buf;
5361 void *p;
5362 void *end;
5363 u64 seq;
5364 u32 snap_count;
5365 struct ceph_snap_context *snapc;
5366 u32 i;
5369 * We'll need room for the seq value (maximum snapshot id),
5370 * snapshot count, and array of that many snapshot ids.
5371 * For now we have a fixed upper limit on the number we're
5372 * prepared to receive.
5374 size = sizeof (__le64) + sizeof (__le32) +
5375 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5376 reply_buf = kzalloc(size, GFP_KERNEL);
5377 if (!reply_buf)
5378 return -ENOMEM;
5380 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5381 &rbd_dev->header_oloc, "get_snapcontext",
5382 NULL, 0, reply_buf, size);
5383 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5384 if (ret < 0)
5385 goto out;
5387 p = reply_buf;
5388 end = reply_buf + ret;
5389 ret = -ERANGE;
5390 ceph_decode_64_safe(&p, end, seq, out);
5391 ceph_decode_32_safe(&p, end, snap_count, out);
5394 * Make sure the reported number of snapshot ids wouldn't go
5395 * beyond the end of our buffer. But before checking that,
5396 * make sure the computed size of the snapshot context we
5397 * allocate is representable in a size_t.
5399 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5400 / sizeof (u64)) {
5401 ret = -EINVAL;
5402 goto out;
5404 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5405 goto out;
5406 ret = 0;
5408 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5409 if (!snapc) {
5410 ret = -ENOMEM;
5411 goto out;
5413 snapc->seq = seq;
5414 for (i = 0; i < snap_count; i++)
5415 snapc->snaps[i] = ceph_decode_64(&p);
5417 ceph_put_snap_context(rbd_dev->header.snapc);
5418 rbd_dev->header.snapc = snapc;
5420 dout(" snap context seq = %llu, snap_count = %u\n",
5421 (unsigned long long)seq, (unsigned int)snap_count);
5422 out:
5423 kfree(reply_buf);
5425 return ret;
5428 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5429 u64 snap_id)
5431 size_t size;
5432 void *reply_buf;
5433 __le64 snapid;
5434 int ret;
5435 void *p;
5436 void *end;
5437 char *snap_name;
5439 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5440 reply_buf = kmalloc(size, GFP_KERNEL);
5441 if (!reply_buf)
5442 return ERR_PTR(-ENOMEM);
5444 snapid = cpu_to_le64(snap_id);
5445 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5446 &rbd_dev->header_oloc, "get_snapshot_name",
5447 &snapid, sizeof(snapid), reply_buf, size);
5448 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5449 if (ret < 0) {
5450 snap_name = ERR_PTR(ret);
5451 goto out;
5454 p = reply_buf;
5455 end = reply_buf + ret;
5456 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5457 if (IS_ERR(snap_name))
5458 goto out;
5460 dout(" snap_id 0x%016llx snap_name = %s\n",
5461 (unsigned long long)snap_id, snap_name);
5462 out:
5463 kfree(reply_buf);
5465 return snap_name;
5468 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5470 bool first_time = rbd_dev->header.object_prefix == NULL;
5471 int ret;
5473 ret = rbd_dev_v2_image_size(rbd_dev);
5474 if (ret)
5475 return ret;
5477 if (first_time) {
5478 ret = rbd_dev_v2_header_onetime(rbd_dev);
5479 if (ret)
5480 return ret;
5483 ret = rbd_dev_v2_snap_context(rbd_dev);
5484 if (ret && first_time) {
5485 kfree(rbd_dev->header.object_prefix);
5486 rbd_dev->header.object_prefix = NULL;
5489 return ret;
5492 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5494 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5496 if (rbd_dev->image_format == 1)
5497 return rbd_dev_v1_header_info(rbd_dev);
5499 return rbd_dev_v2_header_info(rbd_dev);
5503 * Skips over white space at *buf, and updates *buf to point to the
5504 * first found non-space character (if any). Returns the length of
5505 * the token (string of non-white space characters) found. Note
5506 * that *buf must be terminated with '\0'.
5508 static inline size_t next_token(const char **buf)
5511 * These are the characters that produce nonzero for
5512 * isspace() in the "C" and "POSIX" locales.
5514 const char *spaces = " \f\n\r\t\v";
5516 *buf += strspn(*buf, spaces); /* Find start of token */
5518 return strcspn(*buf, spaces); /* Return token length */
5522 * Finds the next token in *buf, dynamically allocates a buffer big
5523 * enough to hold a copy of it, and copies the token into the new
5524 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5525 * that a duplicate buffer is created even for a zero-length token.
5527 * Returns a pointer to the newly-allocated duplicate, or a null
5528 * pointer if memory for the duplicate was not available. If
5529 * the lenp argument is a non-null pointer, the length of the token
5530 * (not including the '\0') is returned in *lenp.
5532 * If successful, the *buf pointer will be updated to point beyond
5533 * the end of the found token.
5535 * Note: uses GFP_KERNEL for allocation.
5537 static inline char *dup_token(const char **buf, size_t *lenp)
5539 char *dup;
5540 size_t len;
5542 len = next_token(buf);
5543 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5544 if (!dup)
5545 return NULL;
5546 *(dup + len) = '\0';
5547 *buf += len;
5549 if (lenp)
5550 *lenp = len;
5552 return dup;
5556 * Parse the options provided for an "rbd add" (i.e., rbd image
5557 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5558 * and the data written is passed here via a NUL-terminated buffer.
5559 * Returns 0 if successful or an error code otherwise.
5561 * The information extracted from these options is recorded in
5562 * the other parameters which return dynamically-allocated
5563 * structures:
5564 * ceph_opts
5565 * The address of a pointer that will refer to a ceph options
5566 * structure. Caller must release the returned pointer using
5567 * ceph_destroy_options() when it is no longer needed.
5568 * rbd_opts
5569 * Address of an rbd options pointer. Fully initialized by
5570 * this function; caller must release with kfree().
5571 * spec
5572 * Address of an rbd image specification pointer. Fully
5573 * initialized by this function based on parsed options.
5574 * Caller must release with rbd_spec_put().
5576 * The options passed take this form:
5577 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5578 * where:
5579 * <mon_addrs>
5580 * A comma-separated list of one or more monitor addresses.
5581 * A monitor address is an ip address, optionally followed
5582 * by a port number (separated by a colon).
5583 * I.e.: ip1[:port1][,ip2[:port2]...]
5584 * <options>
5585 * A comma-separated list of ceph and/or rbd options.
5586 * <pool_name>
5587 * The name of the rados pool containing the rbd image.
5588 * <image_name>
5589 * The name of the image in that pool to map.
5590 * <snap_id>
5591 * An optional snapshot id. If provided, the mapping will
5592 * present data from the image at the time that snapshot was
5593 * created. The image head is used if no snapshot id is
5594 * provided. Snapshot mappings are always read-only.
5596 static int rbd_add_parse_args(const char *buf,
5597 struct ceph_options **ceph_opts,
5598 struct rbd_options **opts,
5599 struct rbd_spec **rbd_spec)
5601 size_t len;
5602 char *options;
5603 const char *mon_addrs;
5604 char *snap_name;
5605 size_t mon_addrs_size;
5606 struct rbd_spec *spec = NULL;
5607 struct rbd_options *rbd_opts = NULL;
5608 struct ceph_options *copts;
5609 int ret;
5611 /* The first four tokens are required */
5613 len = next_token(&buf);
5614 if (!len) {
5615 rbd_warn(NULL, "no monitor address(es) provided");
5616 return -EINVAL;
5618 mon_addrs = buf;
5619 mon_addrs_size = len + 1;
5620 buf += len;
5622 ret = -EINVAL;
5623 options = dup_token(&buf, NULL);
5624 if (!options)
5625 return -ENOMEM;
5626 if (!*options) {
5627 rbd_warn(NULL, "no options provided");
5628 goto out_err;
5631 spec = rbd_spec_alloc();
5632 if (!spec)
5633 goto out_mem;
5635 spec->pool_name = dup_token(&buf, NULL);
5636 if (!spec->pool_name)
5637 goto out_mem;
5638 if (!*spec->pool_name) {
5639 rbd_warn(NULL, "no pool name provided");
5640 goto out_err;
5643 spec->image_name = dup_token(&buf, NULL);
5644 if (!spec->image_name)
5645 goto out_mem;
5646 if (!*spec->image_name) {
5647 rbd_warn(NULL, "no image name provided");
5648 goto out_err;
5652 * Snapshot name is optional; default is to use "-"
5653 * (indicating the head/no snapshot).
5655 len = next_token(&buf);
5656 if (!len) {
5657 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5658 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5659 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5660 ret = -ENAMETOOLONG;
5661 goto out_err;
5663 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5664 if (!snap_name)
5665 goto out_mem;
5666 *(snap_name + len) = '\0';
5667 spec->snap_name = snap_name;
5669 /* Initialize all rbd options to the defaults */
5671 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5672 if (!rbd_opts)
5673 goto out_mem;
5675 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5676 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5677 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5678 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5680 copts = ceph_parse_options(options, mon_addrs,
5681 mon_addrs + mon_addrs_size - 1,
5682 parse_rbd_opts_token, rbd_opts);
5683 if (IS_ERR(copts)) {
5684 ret = PTR_ERR(copts);
5685 goto out_err;
5687 kfree(options);
5689 *ceph_opts = copts;
5690 *opts = rbd_opts;
5691 *rbd_spec = spec;
5693 return 0;
5694 out_mem:
5695 ret = -ENOMEM;
5696 out_err:
5697 kfree(rbd_opts);
5698 rbd_spec_put(spec);
5699 kfree(options);
5701 return ret;
5705 * Return pool id (>= 0) or a negative error code.
5707 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5709 struct ceph_options *opts = rbdc->client->options;
5710 u64 newest_epoch;
5711 int tries = 0;
5712 int ret;
5714 again:
5715 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5716 if (ret == -ENOENT && tries++ < 1) {
5717 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5718 &newest_epoch);
5719 if (ret < 0)
5720 return ret;
5722 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5723 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5724 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5725 newest_epoch,
5726 opts->mount_timeout);
5727 goto again;
5728 } else {
5729 /* the osdmap we have is new enough */
5730 return -ENOENT;
5734 return ret;
5737 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5739 down_write(&rbd_dev->lock_rwsem);
5740 if (__rbd_is_lock_owner(rbd_dev))
5741 rbd_unlock(rbd_dev);
5742 up_write(&rbd_dev->lock_rwsem);
5745 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5747 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5748 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5749 return -EINVAL;
5752 /* FIXME: "rbd map --exclusive" should be in interruptible */
5753 down_read(&rbd_dev->lock_rwsem);
5754 rbd_wait_state_locked(rbd_dev);
5755 up_read(&rbd_dev->lock_rwsem);
5756 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5757 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5758 return -EROFS;
5761 return 0;
5765 * An rbd format 2 image has a unique identifier, distinct from the
5766 * name given to it by the user. Internally, that identifier is
5767 * what's used to specify the names of objects related to the image.
5769 * A special "rbd id" object is used to map an rbd image name to its
5770 * id. If that object doesn't exist, then there is no v2 rbd image
5771 * with the supplied name.
5773 * This function will record the given rbd_dev's image_id field if
5774 * it can be determined, and in that case will return 0. If any
5775 * errors occur a negative errno will be returned and the rbd_dev's
5776 * image_id field will be unchanged (and should be NULL).
5778 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5780 int ret;
5781 size_t size;
5782 CEPH_DEFINE_OID_ONSTACK(oid);
5783 void *response;
5784 char *image_id;
5787 * When probing a parent image, the image id is already
5788 * known (and the image name likely is not). There's no
5789 * need to fetch the image id again in this case. We
5790 * do still need to set the image format though.
5792 if (rbd_dev->spec->image_id) {
5793 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5795 return 0;
5799 * First, see if the format 2 image id file exists, and if
5800 * so, get the image's persistent id from it.
5802 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5803 rbd_dev->spec->image_name);
5804 if (ret)
5805 return ret;
5807 dout("rbd id object name is %s\n", oid.name);
5809 /* Response will be an encoded string, which includes a length */
5811 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5812 response = kzalloc(size, GFP_NOIO);
5813 if (!response) {
5814 ret = -ENOMEM;
5815 goto out;
5818 /* If it doesn't exist we'll assume it's a format 1 image */
5820 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5821 "get_id", NULL, 0,
5822 response, RBD_IMAGE_ID_LEN_MAX);
5823 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5824 if (ret == -ENOENT) {
5825 image_id = kstrdup("", GFP_KERNEL);
5826 ret = image_id ? 0 : -ENOMEM;
5827 if (!ret)
5828 rbd_dev->image_format = 1;
5829 } else if (ret >= 0) {
5830 void *p = response;
5832 image_id = ceph_extract_encoded_string(&p, p + ret,
5833 NULL, GFP_NOIO);
5834 ret = PTR_ERR_OR_ZERO(image_id);
5835 if (!ret)
5836 rbd_dev->image_format = 2;
5839 if (!ret) {
5840 rbd_dev->spec->image_id = image_id;
5841 dout("image_id is %s\n", image_id);
5843 out:
5844 kfree(response);
5845 ceph_oid_destroy(&oid);
5846 return ret;
5850 * Undo whatever state changes are made by v1 or v2 header info
5851 * call.
5853 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5855 struct rbd_image_header *header;
5857 rbd_dev_parent_put(rbd_dev);
5859 /* Free dynamic fields from the header, then zero it out */
5861 header = &rbd_dev->header;
5862 ceph_put_snap_context(header->snapc);
5863 kfree(header->snap_sizes);
5864 kfree(header->snap_names);
5865 kfree(header->object_prefix);
5866 memset(header, 0, sizeof (*header));
5869 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5871 int ret;
5873 ret = rbd_dev_v2_object_prefix(rbd_dev);
5874 if (ret)
5875 goto out_err;
5878 * Get the and check features for the image. Currently the
5879 * features are assumed to never change.
5881 ret = rbd_dev_v2_features(rbd_dev);
5882 if (ret)
5883 goto out_err;
5885 /* If the image supports fancy striping, get its parameters */
5887 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5888 ret = rbd_dev_v2_striping_info(rbd_dev);
5889 if (ret < 0)
5890 goto out_err;
5893 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5894 ret = rbd_dev_v2_data_pool(rbd_dev);
5895 if (ret)
5896 goto out_err;
5899 rbd_init_layout(rbd_dev);
5900 return 0;
5902 out_err:
5903 rbd_dev->header.features = 0;
5904 kfree(rbd_dev->header.object_prefix);
5905 rbd_dev->header.object_prefix = NULL;
5906 return ret;
5910 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5911 * rbd_dev_image_probe() recursion depth, which means it's also the
5912 * length of the already discovered part of the parent chain.
5914 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5916 struct rbd_device *parent = NULL;
5917 int ret;
5919 if (!rbd_dev->parent_spec)
5920 return 0;
5922 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5923 pr_info("parent chain is too long (%d)\n", depth);
5924 ret = -EINVAL;
5925 goto out_err;
5928 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5929 if (!parent) {
5930 ret = -ENOMEM;
5931 goto out_err;
5935 * Images related by parent/child relationships always share
5936 * rbd_client and spec/parent_spec, so bump their refcounts.
5938 __rbd_get_client(rbd_dev->rbd_client);
5939 rbd_spec_get(rbd_dev->parent_spec);
5941 ret = rbd_dev_image_probe(parent, depth);
5942 if (ret < 0)
5943 goto out_err;
5945 rbd_dev->parent = parent;
5946 atomic_set(&rbd_dev->parent_ref, 1);
5947 return 0;
5949 out_err:
5950 rbd_dev_unparent(rbd_dev);
5951 rbd_dev_destroy(parent);
5952 return ret;
5955 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5957 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5958 rbd_dev_mapping_clear(rbd_dev);
5959 rbd_free_disk(rbd_dev);
5960 if (!single_major)
5961 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5965 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5966 * upon return.
5968 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5970 int ret;
5972 /* Record our major and minor device numbers. */
5974 if (!single_major) {
5975 ret = register_blkdev(0, rbd_dev->name);
5976 if (ret < 0)
5977 goto err_out_unlock;
5979 rbd_dev->major = ret;
5980 rbd_dev->minor = 0;
5981 } else {
5982 rbd_dev->major = rbd_major;
5983 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5986 /* Set up the blkdev mapping. */
5988 ret = rbd_init_disk(rbd_dev);
5989 if (ret)
5990 goto err_out_blkdev;
5992 ret = rbd_dev_mapping_set(rbd_dev);
5993 if (ret)
5994 goto err_out_disk;
5996 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5997 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5999 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6000 if (ret)
6001 goto err_out_mapping;
6003 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6004 up_write(&rbd_dev->header_rwsem);
6005 return 0;
6007 err_out_mapping:
6008 rbd_dev_mapping_clear(rbd_dev);
6009 err_out_disk:
6010 rbd_free_disk(rbd_dev);
6011 err_out_blkdev:
6012 if (!single_major)
6013 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6014 err_out_unlock:
6015 up_write(&rbd_dev->header_rwsem);
6016 return ret;
6019 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6021 struct rbd_spec *spec = rbd_dev->spec;
6022 int ret;
6024 /* Record the header object name for this rbd image. */
6026 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6027 if (rbd_dev->image_format == 1)
6028 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6029 spec->image_name, RBD_SUFFIX);
6030 else
6031 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6032 RBD_HEADER_PREFIX, spec->image_id);
6034 return ret;
6037 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6039 rbd_dev_unprobe(rbd_dev);
6040 if (rbd_dev->opts)
6041 rbd_unregister_watch(rbd_dev);
6042 rbd_dev->image_format = 0;
6043 kfree(rbd_dev->spec->image_id);
6044 rbd_dev->spec->image_id = NULL;
6048 * Probe for the existence of the header object for the given rbd
6049 * device. If this image is the one being mapped (i.e., not a
6050 * parent), initiate a watch on its header object before using that
6051 * object to get detailed information about the rbd image.
6053 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6055 int ret;
6058 * Get the id from the image id object. Unless there's an
6059 * error, rbd_dev->spec->image_id will be filled in with
6060 * a dynamically-allocated string, and rbd_dev->image_format
6061 * will be set to either 1 or 2.
6063 ret = rbd_dev_image_id(rbd_dev);
6064 if (ret)
6065 return ret;
6067 ret = rbd_dev_header_name(rbd_dev);
6068 if (ret)
6069 goto err_out_format;
6071 if (!depth) {
6072 ret = rbd_register_watch(rbd_dev);
6073 if (ret) {
6074 if (ret == -ENOENT)
6075 pr_info("image %s/%s does not exist\n",
6076 rbd_dev->spec->pool_name,
6077 rbd_dev->spec->image_name);
6078 goto err_out_format;
6082 ret = rbd_dev_header_info(rbd_dev);
6083 if (ret)
6084 goto err_out_watch;
6087 * If this image is the one being mapped, we have pool name and
6088 * id, image name and id, and snap name - need to fill snap id.
6089 * Otherwise this is a parent image, identified by pool, image
6090 * and snap ids - need to fill in names for those ids.
6092 if (!depth)
6093 ret = rbd_spec_fill_snap_id(rbd_dev);
6094 else
6095 ret = rbd_spec_fill_names(rbd_dev);
6096 if (ret) {
6097 if (ret == -ENOENT)
6098 pr_info("snap %s/%s@%s does not exist\n",
6099 rbd_dev->spec->pool_name,
6100 rbd_dev->spec->image_name,
6101 rbd_dev->spec->snap_name);
6102 goto err_out_probe;
6105 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6106 ret = rbd_dev_v2_parent_info(rbd_dev);
6107 if (ret)
6108 goto err_out_probe;
6111 * Need to warn users if this image is the one being
6112 * mapped and has a parent.
6114 if (!depth && rbd_dev->parent_spec)
6115 rbd_warn(rbd_dev,
6116 "WARNING: kernel layering is EXPERIMENTAL!");
6119 ret = rbd_dev_probe_parent(rbd_dev, depth);
6120 if (ret)
6121 goto err_out_probe;
6123 dout("discovered format %u image, header name is %s\n",
6124 rbd_dev->image_format, rbd_dev->header_oid.name);
6125 return 0;
6127 err_out_probe:
6128 rbd_dev_unprobe(rbd_dev);
6129 err_out_watch:
6130 if (!depth)
6131 rbd_unregister_watch(rbd_dev);
6132 err_out_format:
6133 rbd_dev->image_format = 0;
6134 kfree(rbd_dev->spec->image_id);
6135 rbd_dev->spec->image_id = NULL;
6136 return ret;
6139 static ssize_t do_rbd_add(struct bus_type *bus,
6140 const char *buf,
6141 size_t count)
6143 struct rbd_device *rbd_dev = NULL;
6144 struct ceph_options *ceph_opts = NULL;
6145 struct rbd_options *rbd_opts = NULL;
6146 struct rbd_spec *spec = NULL;
6147 struct rbd_client *rbdc;
6148 bool read_only;
6149 int rc;
6151 if (!try_module_get(THIS_MODULE))
6152 return -ENODEV;
6154 /* parse add command */
6155 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6156 if (rc < 0)
6157 goto out;
6159 rbdc = rbd_get_client(ceph_opts);
6160 if (IS_ERR(rbdc)) {
6161 rc = PTR_ERR(rbdc);
6162 goto err_out_args;
6165 /* pick the pool */
6166 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6167 if (rc < 0) {
6168 if (rc == -ENOENT)
6169 pr_info("pool %s does not exist\n", spec->pool_name);
6170 goto err_out_client;
6172 spec->pool_id = (u64)rc;
6174 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6175 if (!rbd_dev) {
6176 rc = -ENOMEM;
6177 goto err_out_client;
6179 rbdc = NULL; /* rbd_dev now owns this */
6180 spec = NULL; /* rbd_dev now owns this */
6181 rbd_opts = NULL; /* rbd_dev now owns this */
6183 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6184 if (!rbd_dev->config_info) {
6185 rc = -ENOMEM;
6186 goto err_out_rbd_dev;
6189 down_write(&rbd_dev->header_rwsem);
6190 rc = rbd_dev_image_probe(rbd_dev, 0);
6191 if (rc < 0) {
6192 up_write(&rbd_dev->header_rwsem);
6193 goto err_out_rbd_dev;
6196 /* If we are mapping a snapshot it must be marked read-only */
6198 read_only = rbd_dev->opts->read_only;
6199 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6200 read_only = true;
6201 rbd_dev->mapping.read_only = read_only;
6203 rc = rbd_dev_device_setup(rbd_dev);
6204 if (rc)
6205 goto err_out_image_probe;
6207 if (rbd_dev->opts->exclusive) {
6208 rc = rbd_add_acquire_lock(rbd_dev);
6209 if (rc)
6210 goto err_out_device_setup;
6213 /* Everything's ready. Announce the disk to the world. */
6215 rc = device_add(&rbd_dev->dev);
6216 if (rc)
6217 goto err_out_image_lock;
6219 add_disk(rbd_dev->disk);
6220 /* see rbd_init_disk() */
6221 blk_put_queue(rbd_dev->disk->queue);
6223 spin_lock(&rbd_dev_list_lock);
6224 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6225 spin_unlock(&rbd_dev_list_lock);
6227 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6228 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6229 rbd_dev->header.features);
6230 rc = count;
6231 out:
6232 module_put(THIS_MODULE);
6233 return rc;
6235 err_out_image_lock:
6236 rbd_dev_image_unlock(rbd_dev);
6237 err_out_device_setup:
6238 rbd_dev_device_release(rbd_dev);
6239 err_out_image_probe:
6240 rbd_dev_image_release(rbd_dev);
6241 err_out_rbd_dev:
6242 rbd_dev_destroy(rbd_dev);
6243 err_out_client:
6244 rbd_put_client(rbdc);
6245 err_out_args:
6246 rbd_spec_put(spec);
6247 kfree(rbd_opts);
6248 goto out;
6251 static ssize_t rbd_add(struct bus_type *bus,
6252 const char *buf,
6253 size_t count)
6255 if (single_major)
6256 return -EINVAL;
6258 return do_rbd_add(bus, buf, count);
6261 static ssize_t rbd_add_single_major(struct bus_type *bus,
6262 const char *buf,
6263 size_t count)
6265 return do_rbd_add(bus, buf, count);
6268 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6270 while (rbd_dev->parent) {
6271 struct rbd_device *first = rbd_dev;
6272 struct rbd_device *second = first->parent;
6273 struct rbd_device *third;
6276 * Follow to the parent with no grandparent and
6277 * remove it.
6279 while (second && (third = second->parent)) {
6280 first = second;
6281 second = third;
6283 rbd_assert(second);
6284 rbd_dev_image_release(second);
6285 rbd_dev_destroy(second);
6286 first->parent = NULL;
6287 first->parent_overlap = 0;
6289 rbd_assert(first->parent_spec);
6290 rbd_spec_put(first->parent_spec);
6291 first->parent_spec = NULL;
6295 static ssize_t do_rbd_remove(struct bus_type *bus,
6296 const char *buf,
6297 size_t count)
6299 struct rbd_device *rbd_dev = NULL;
6300 struct list_head *tmp;
6301 int dev_id;
6302 char opt_buf[6];
6303 bool already = false;
6304 bool force = false;
6305 int ret;
6307 dev_id = -1;
6308 opt_buf[0] = '\0';
6309 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6310 if (dev_id < 0) {
6311 pr_err("dev_id out of range\n");
6312 return -EINVAL;
6314 if (opt_buf[0] != '\0') {
6315 if (!strcmp(opt_buf, "force")) {
6316 force = true;
6317 } else {
6318 pr_err("bad remove option at '%s'\n", opt_buf);
6319 return -EINVAL;
6323 ret = -ENOENT;
6324 spin_lock(&rbd_dev_list_lock);
6325 list_for_each(tmp, &rbd_dev_list) {
6326 rbd_dev = list_entry(tmp, struct rbd_device, node);
6327 if (rbd_dev->dev_id == dev_id) {
6328 ret = 0;
6329 break;
6332 if (!ret) {
6333 spin_lock_irq(&rbd_dev->lock);
6334 if (rbd_dev->open_count && !force)
6335 ret = -EBUSY;
6336 else
6337 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6338 &rbd_dev->flags);
6339 spin_unlock_irq(&rbd_dev->lock);
6341 spin_unlock(&rbd_dev_list_lock);
6342 if (ret < 0 || already)
6343 return ret;
6345 if (force) {
6347 * Prevent new IO from being queued and wait for existing
6348 * IO to complete/fail.
6350 blk_mq_freeze_queue(rbd_dev->disk->queue);
6351 blk_set_queue_dying(rbd_dev->disk->queue);
6354 del_gendisk(rbd_dev->disk);
6355 spin_lock(&rbd_dev_list_lock);
6356 list_del_init(&rbd_dev->node);
6357 spin_unlock(&rbd_dev_list_lock);
6358 device_del(&rbd_dev->dev);
6360 rbd_dev_image_unlock(rbd_dev);
6361 rbd_dev_device_release(rbd_dev);
6362 rbd_dev_image_release(rbd_dev);
6363 rbd_dev_destroy(rbd_dev);
6364 return count;
6367 static ssize_t rbd_remove(struct bus_type *bus,
6368 const char *buf,
6369 size_t count)
6371 if (single_major)
6372 return -EINVAL;
6374 return do_rbd_remove(bus, buf, count);
6377 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6378 const char *buf,
6379 size_t count)
6381 return do_rbd_remove(bus, buf, count);
6385 * create control files in sysfs
6386 * /sys/bus/rbd/...
6388 static int rbd_sysfs_init(void)
6390 int ret;
6392 ret = device_register(&rbd_root_dev);
6393 if (ret < 0)
6394 return ret;
6396 ret = bus_register(&rbd_bus_type);
6397 if (ret < 0)
6398 device_unregister(&rbd_root_dev);
6400 return ret;
6403 static void rbd_sysfs_cleanup(void)
6405 bus_unregister(&rbd_bus_type);
6406 device_unregister(&rbd_root_dev);
6409 static int rbd_slab_init(void)
6411 rbd_assert(!rbd_img_request_cache);
6412 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6413 if (!rbd_img_request_cache)
6414 return -ENOMEM;
6416 rbd_assert(!rbd_obj_request_cache);
6417 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6418 if (!rbd_obj_request_cache)
6419 goto out_err;
6421 rbd_assert(!rbd_bio_clone);
6422 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6423 if (!rbd_bio_clone)
6424 goto out_err_clone;
6426 return 0;
6428 out_err_clone:
6429 kmem_cache_destroy(rbd_obj_request_cache);
6430 rbd_obj_request_cache = NULL;
6431 out_err:
6432 kmem_cache_destroy(rbd_img_request_cache);
6433 rbd_img_request_cache = NULL;
6434 return -ENOMEM;
6437 static void rbd_slab_exit(void)
6439 rbd_assert(rbd_obj_request_cache);
6440 kmem_cache_destroy(rbd_obj_request_cache);
6441 rbd_obj_request_cache = NULL;
6443 rbd_assert(rbd_img_request_cache);
6444 kmem_cache_destroy(rbd_img_request_cache);
6445 rbd_img_request_cache = NULL;
6447 rbd_assert(rbd_bio_clone);
6448 bioset_free(rbd_bio_clone);
6449 rbd_bio_clone = NULL;
6452 static int __init rbd_init(void)
6454 int rc;
6456 if (!libceph_compatible(NULL)) {
6457 rbd_warn(NULL, "libceph incompatibility (quitting)");
6458 return -EINVAL;
6461 rc = rbd_slab_init();
6462 if (rc)
6463 return rc;
6466 * The number of active work items is limited by the number of
6467 * rbd devices * queue depth, so leave @max_active at default.
6469 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6470 if (!rbd_wq) {
6471 rc = -ENOMEM;
6472 goto err_out_slab;
6475 if (single_major) {
6476 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6477 if (rbd_major < 0) {
6478 rc = rbd_major;
6479 goto err_out_wq;
6483 rc = rbd_sysfs_init();
6484 if (rc)
6485 goto err_out_blkdev;
6487 if (single_major)
6488 pr_info("loaded (major %d)\n", rbd_major);
6489 else
6490 pr_info("loaded\n");
6492 return 0;
6494 err_out_blkdev:
6495 if (single_major)
6496 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6497 err_out_wq:
6498 destroy_workqueue(rbd_wq);
6499 err_out_slab:
6500 rbd_slab_exit();
6501 return rc;
6504 static void __exit rbd_exit(void)
6506 ida_destroy(&rbd_dev_id_ida);
6507 rbd_sysfs_cleanup();
6508 if (single_major)
6509 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6510 destroy_workqueue(rbd_wq);
6511 rbd_slab_exit();
6514 module_init(rbd_init);
6515 module_exit(rbd_exit);
6517 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6518 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6519 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6520 /* following authorship retained from original osdblk.c */
6521 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6523 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6524 MODULE_LICENSE("GPL");