Linux 4.9.243
[linux/fpc-iii.git] / drivers / block / rbd.c
blobc16be18fddef180b8ed8f9ce597b06579e05fa37
2 /*
3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
49 #include "rbd_types.h"
51 #define RBD_DEBUG /* Activate rbd_assert() calls */
54 * Increment the given counter and return its updated value.
55 * If the counter is already 0 it will not be incremented.
56 * If the counter is already at its maximum value returns
57 * -EINVAL without updating it.
59 static int atomic_inc_return_safe(atomic_t *v)
61 unsigned int counter;
63 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
64 if (counter <= (unsigned int)INT_MAX)
65 return (int)counter;
67 atomic_dec(v);
69 return -EINVAL;
72 /* Decrement the counter. Return the resulting value, or -EINVAL */
73 static int atomic_dec_return_safe(atomic_t *v)
75 int counter;
77 counter = atomic_dec_return(v);
78 if (counter >= 0)
79 return counter;
81 atomic_inc(v);
83 return -EINVAL;
86 #define RBD_DRV_NAME "rbd"
88 #define RBD_MINORS_PER_MAJOR 256
89 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
91 #define RBD_MAX_PARENT_CHAIN_LEN 16
93 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
94 #define RBD_MAX_SNAP_NAME_LEN \
95 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
99 #define RBD_SNAP_HEAD_NAME "-"
101 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
103 /* This allows a single page to hold an image name sent by OSD */
104 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
105 #define RBD_IMAGE_ID_LEN_MAX 64
107 #define RBD_OBJ_PREFIX_LEN_MAX 64
109 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */
110 #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
112 /* Feature bits */
114 #define RBD_FEATURE_LAYERING (1<<0)
115 #define RBD_FEATURE_STRIPINGV2 (1<<1)
116 #define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
117 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
118 RBD_FEATURE_STRIPINGV2 | \
119 RBD_FEATURE_EXCLUSIVE_LOCK)
121 /* Features supported by this (client software) implementation. */
123 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
126 * An RBD device name will be "rbd#", where the "rbd" comes from
127 * RBD_DRV_NAME above, and # is a unique integer identifier.
129 #define DEV_NAME_LEN 32
132 * block device image metadata (in-memory version)
134 struct rbd_image_header {
135 /* These six fields never change for a given rbd image */
136 char *object_prefix;
137 __u8 obj_order;
138 __u8 crypt_type;
139 __u8 comp_type;
140 u64 stripe_unit;
141 u64 stripe_count;
142 u64 features; /* Might be changeable someday? */
144 /* The remaining fields need to be updated occasionally */
145 u64 image_size;
146 struct ceph_snap_context *snapc;
147 char *snap_names; /* format 1 only */
148 u64 *snap_sizes; /* format 1 only */
152 * An rbd image specification.
154 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
155 * identify an image. Each rbd_dev structure includes a pointer to
156 * an rbd_spec structure that encapsulates this identity.
158 * Each of the id's in an rbd_spec has an associated name. For a
159 * user-mapped image, the names are supplied and the id's associated
160 * with them are looked up. For a layered image, a parent image is
161 * defined by the tuple, and the names are looked up.
163 * An rbd_dev structure contains a parent_spec pointer which is
164 * non-null if the image it represents is a child in a layered
165 * image. This pointer will refer to the rbd_spec structure used
166 * by the parent rbd_dev for its own identity (i.e., the structure
167 * is shared between the parent and child).
169 * Since these structures are populated once, during the discovery
170 * phase of image construction, they are effectively immutable so
171 * we make no effort to synchronize access to them.
173 * Note that code herein does not assume the image name is known (it
174 * could be a null pointer).
176 struct rbd_spec {
177 u64 pool_id;
178 const char *pool_name;
180 const char *image_id;
181 const char *image_name;
183 u64 snap_id;
184 const char *snap_name;
186 struct kref kref;
190 * an instance of the client. multiple devices may share an rbd client.
192 struct rbd_client {
193 struct ceph_client *client;
194 struct kref kref;
195 struct list_head node;
198 struct rbd_img_request;
199 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203 struct rbd_obj_request;
204 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206 enum obj_request_type {
207 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
210 enum obj_operation_type {
211 OBJ_OP_WRITE,
212 OBJ_OP_READ,
213 OBJ_OP_DISCARD,
216 enum obj_req_flags {
217 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
218 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
219 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
220 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
223 struct rbd_obj_request {
224 const char *object_name;
225 u64 offset; /* object start byte */
226 u64 length; /* bytes from offset */
227 unsigned long flags;
230 * An object request associated with an image will have its
231 * img_data flag set; a standalone object request will not.
233 * A standalone object request will have which == BAD_WHICH
234 * and a null obj_request pointer.
236 * An object request initiated in support of a layered image
237 * object (to check for its existence before a write) will
238 * have which == BAD_WHICH and a non-null obj_request pointer.
240 * Finally, an object request for rbd image data will have
241 * which != BAD_WHICH, and will have a non-null img_request
242 * pointer. The value of which will be in the range
243 * 0..(img_request->obj_request_count-1).
245 union {
246 struct rbd_obj_request *obj_request; /* STAT op */
247 struct {
248 struct rbd_img_request *img_request;
249 u64 img_offset;
250 /* links for img_request->obj_requests list */
251 struct list_head links;
254 u32 which; /* posn image request list */
256 enum obj_request_type type;
257 union {
258 struct bio *bio_list;
259 struct {
260 struct page **pages;
261 u32 page_count;
264 struct page **copyup_pages;
265 u32 copyup_page_count;
267 struct ceph_osd_request *osd_req;
269 u64 xferred; /* bytes transferred */
270 int result;
272 rbd_obj_callback_t callback;
273 struct completion completion;
275 struct kref kref;
278 enum img_req_flags {
279 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
280 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
281 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
282 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
285 struct rbd_img_request {
286 struct rbd_device *rbd_dev;
287 u64 offset; /* starting image byte offset */
288 u64 length; /* byte count from offset */
289 unsigned long flags;
290 union {
291 u64 snap_id; /* for reads */
292 struct ceph_snap_context *snapc; /* for writes */
294 union {
295 struct request *rq; /* block request */
296 struct rbd_obj_request *obj_request; /* obj req initiator */
298 struct page **copyup_pages;
299 u32 copyup_page_count;
300 spinlock_t completion_lock;/* protects next_completion */
301 u32 next_completion;
302 rbd_img_callback_t callback;
303 u64 xferred;/* aggregate bytes transferred */
304 int result; /* first nonzero obj_request result */
306 u32 obj_request_count;
307 struct list_head obj_requests; /* rbd_obj_request structs */
309 struct kref kref;
312 #define for_each_obj_request(ireq, oreq) \
313 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
314 #define for_each_obj_request_from(ireq, oreq) \
315 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
316 #define for_each_obj_request_safe(ireq, oreq, n) \
317 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
319 enum rbd_watch_state {
320 RBD_WATCH_STATE_UNREGISTERED,
321 RBD_WATCH_STATE_REGISTERED,
322 RBD_WATCH_STATE_ERROR,
325 enum rbd_lock_state {
326 RBD_LOCK_STATE_UNLOCKED,
327 RBD_LOCK_STATE_LOCKED,
328 RBD_LOCK_STATE_RELEASING,
331 /* WatchNotify::ClientId */
332 struct rbd_client_id {
333 u64 gid;
334 u64 handle;
337 struct rbd_mapping {
338 u64 size;
339 u64 features;
340 bool read_only;
344 * a single device
346 struct rbd_device {
347 int dev_id; /* blkdev unique id */
349 int major; /* blkdev assigned major */
350 int minor;
351 struct gendisk *disk; /* blkdev's gendisk and rq */
353 u32 image_format; /* Either 1 or 2 */
354 struct rbd_client *rbd_client;
356 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
358 spinlock_t lock; /* queue, flags, open_count */
360 struct rbd_image_header header;
361 unsigned long flags; /* possibly lock protected */
362 struct rbd_spec *spec;
363 struct rbd_options *opts;
364 char *config_info; /* add{,_single_major} string */
366 struct ceph_object_id header_oid;
367 struct ceph_object_locator header_oloc;
369 struct ceph_file_layout layout; /* used for all rbd requests */
371 struct mutex watch_mutex;
372 enum rbd_watch_state watch_state;
373 struct ceph_osd_linger_request *watch_handle;
374 u64 watch_cookie;
375 struct delayed_work watch_dwork;
377 struct rw_semaphore lock_rwsem;
378 enum rbd_lock_state lock_state;
379 struct rbd_client_id owner_cid;
380 struct work_struct acquired_lock_work;
381 struct work_struct released_lock_work;
382 struct delayed_work lock_dwork;
383 struct work_struct unlock_work;
384 wait_queue_head_t lock_waitq;
386 struct workqueue_struct *task_wq;
388 struct rbd_spec *parent_spec;
389 u64 parent_overlap;
390 atomic_t parent_ref;
391 struct rbd_device *parent;
393 /* Block layer tags. */
394 struct blk_mq_tag_set tag_set;
396 /* protects updating the header */
397 struct rw_semaphore header_rwsem;
399 struct rbd_mapping mapping;
401 struct list_head node;
403 /* sysfs related */
404 struct device dev;
405 unsigned long open_count; /* protected by lock */
409 * Flag bits for rbd_dev->flags:
410 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
411 * by rbd_dev->lock
412 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
414 enum rbd_dev_flags {
415 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
416 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
417 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
420 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
422 static LIST_HEAD(rbd_dev_list); /* devices */
423 static DEFINE_SPINLOCK(rbd_dev_list_lock);
425 static LIST_HEAD(rbd_client_list); /* clients */
426 static DEFINE_SPINLOCK(rbd_client_list_lock);
428 /* Slab caches for frequently-allocated structures */
430 static struct kmem_cache *rbd_img_request_cache;
431 static struct kmem_cache *rbd_obj_request_cache;
432 static struct kmem_cache *rbd_segment_name_cache;
434 static int rbd_major;
435 static DEFINE_IDA(rbd_dev_id_ida);
437 static struct workqueue_struct *rbd_wq;
440 * Default to false for now, as single-major requires >= 0.75 version of
441 * userspace rbd utility.
443 static bool single_major = false;
444 module_param(single_major, bool, S_IRUGO);
445 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
447 static int rbd_img_request_submit(struct rbd_img_request *img_request);
449 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
450 size_t count);
451 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
452 size_t count);
453 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
454 size_t count);
455 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
456 size_t count);
457 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
458 static void rbd_spec_put(struct rbd_spec *spec);
460 static int rbd_dev_id_to_minor(int dev_id)
462 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
465 static int minor_to_rbd_dev_id(int minor)
467 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
470 static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
472 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
473 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
474 !rbd_dev->mapping.read_only;
477 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
479 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
480 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
483 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
485 bool is_lock_owner;
487 down_read(&rbd_dev->lock_rwsem);
488 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
489 up_read(&rbd_dev->lock_rwsem);
490 return is_lock_owner;
493 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
494 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
495 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
496 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
498 static struct attribute *rbd_bus_attrs[] = {
499 &bus_attr_add.attr,
500 &bus_attr_remove.attr,
501 &bus_attr_add_single_major.attr,
502 &bus_attr_remove_single_major.attr,
503 NULL,
506 static umode_t rbd_bus_is_visible(struct kobject *kobj,
507 struct attribute *attr, int index)
509 if (!single_major &&
510 (attr == &bus_attr_add_single_major.attr ||
511 attr == &bus_attr_remove_single_major.attr))
512 return 0;
514 return attr->mode;
517 static const struct attribute_group rbd_bus_group = {
518 .attrs = rbd_bus_attrs,
519 .is_visible = rbd_bus_is_visible,
521 __ATTRIBUTE_GROUPS(rbd_bus);
523 static struct bus_type rbd_bus_type = {
524 .name = "rbd",
525 .bus_groups = rbd_bus_groups,
528 static void rbd_root_dev_release(struct device *dev)
532 static struct device rbd_root_dev = {
533 .init_name = "rbd",
534 .release = rbd_root_dev_release,
537 static __printf(2, 3)
538 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
540 struct va_format vaf;
541 va_list args;
543 va_start(args, fmt);
544 vaf.fmt = fmt;
545 vaf.va = &args;
547 if (!rbd_dev)
548 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
549 else if (rbd_dev->disk)
550 printk(KERN_WARNING "%s: %s: %pV\n",
551 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
552 else if (rbd_dev->spec && rbd_dev->spec->image_name)
553 printk(KERN_WARNING "%s: image %s: %pV\n",
554 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
555 else if (rbd_dev->spec && rbd_dev->spec->image_id)
556 printk(KERN_WARNING "%s: id %s: %pV\n",
557 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
558 else /* punt */
559 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
560 RBD_DRV_NAME, rbd_dev, &vaf);
561 va_end(args);
564 #ifdef RBD_DEBUG
565 #define rbd_assert(expr) \
566 if (unlikely(!(expr))) { \
567 printk(KERN_ERR "\nAssertion failure in %s() " \
568 "at line %d:\n\n" \
569 "\trbd_assert(%s);\n\n", \
570 __func__, __LINE__, #expr); \
571 BUG(); \
573 #else /* !RBD_DEBUG */
574 # define rbd_assert(expr) ((void) 0)
575 #endif /* !RBD_DEBUG */
577 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
578 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
579 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
580 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
582 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
583 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
584 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
585 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
586 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
587 u64 snap_id);
588 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
589 u8 *order, u64 *snap_size);
590 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
591 u64 *snap_features);
593 static int rbd_open(struct block_device *bdev, fmode_t mode)
595 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
596 bool removing = false;
598 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
599 return -EROFS;
601 spin_lock_irq(&rbd_dev->lock);
602 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
603 removing = true;
604 else
605 rbd_dev->open_count++;
606 spin_unlock_irq(&rbd_dev->lock);
607 if (removing)
608 return -ENOENT;
610 (void) get_device(&rbd_dev->dev);
612 return 0;
615 static void rbd_release(struct gendisk *disk, fmode_t mode)
617 struct rbd_device *rbd_dev = disk->private_data;
618 unsigned long open_count_before;
620 spin_lock_irq(&rbd_dev->lock);
621 open_count_before = rbd_dev->open_count--;
622 spin_unlock_irq(&rbd_dev->lock);
623 rbd_assert(open_count_before > 0);
625 put_device(&rbd_dev->dev);
628 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
630 int ret = 0;
631 int val;
632 bool ro;
633 bool ro_changed = false;
635 /* get_user() may sleep, so call it before taking rbd_dev->lock */
636 if (get_user(val, (int __user *)(arg)))
637 return -EFAULT;
639 ro = val ? true : false;
640 /* Snapshot doesn't allow to write*/
641 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
642 return -EROFS;
644 spin_lock_irq(&rbd_dev->lock);
645 /* prevent others open this device */
646 if (rbd_dev->open_count > 1) {
647 ret = -EBUSY;
648 goto out;
651 if (rbd_dev->mapping.read_only != ro) {
652 rbd_dev->mapping.read_only = ro;
653 ro_changed = true;
656 out:
657 spin_unlock_irq(&rbd_dev->lock);
658 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
659 if (ret == 0 && ro_changed)
660 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
662 return ret;
665 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
666 unsigned int cmd, unsigned long arg)
668 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
669 int ret = 0;
671 switch (cmd) {
672 case BLKROSET:
673 ret = rbd_ioctl_set_ro(rbd_dev, arg);
674 break;
675 default:
676 ret = -ENOTTY;
679 return ret;
682 #ifdef CONFIG_COMPAT
683 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
684 unsigned int cmd, unsigned long arg)
686 return rbd_ioctl(bdev, mode, cmd, arg);
688 #endif /* CONFIG_COMPAT */
690 static const struct block_device_operations rbd_bd_ops = {
691 .owner = THIS_MODULE,
692 .open = rbd_open,
693 .release = rbd_release,
694 .ioctl = rbd_ioctl,
695 #ifdef CONFIG_COMPAT
696 .compat_ioctl = rbd_compat_ioctl,
697 #endif
701 * Initialize an rbd client instance. Success or not, this function
702 * consumes ceph_opts. Caller holds client_mutex.
704 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
706 struct rbd_client *rbdc;
707 int ret = -ENOMEM;
709 dout("%s:\n", __func__);
710 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
711 if (!rbdc)
712 goto out_opt;
714 kref_init(&rbdc->kref);
715 INIT_LIST_HEAD(&rbdc->node);
717 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
718 if (IS_ERR(rbdc->client))
719 goto out_rbdc;
720 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
722 ret = ceph_open_session(rbdc->client);
723 if (ret < 0)
724 goto out_client;
726 spin_lock(&rbd_client_list_lock);
727 list_add_tail(&rbdc->node, &rbd_client_list);
728 spin_unlock(&rbd_client_list_lock);
730 dout("%s: rbdc %p\n", __func__, rbdc);
732 return rbdc;
733 out_client:
734 ceph_destroy_client(rbdc->client);
735 out_rbdc:
736 kfree(rbdc);
737 out_opt:
738 if (ceph_opts)
739 ceph_destroy_options(ceph_opts);
740 dout("%s: error %d\n", __func__, ret);
742 return ERR_PTR(ret);
745 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
747 kref_get(&rbdc->kref);
749 return rbdc;
753 * Find a ceph client with specific addr and configuration. If
754 * found, bump its reference count.
756 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
758 struct rbd_client *client_node;
759 bool found = false;
761 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
762 return NULL;
764 spin_lock(&rbd_client_list_lock);
765 list_for_each_entry(client_node, &rbd_client_list, node) {
766 if (!ceph_compare_options(ceph_opts, client_node->client)) {
767 __rbd_get_client(client_node);
769 found = true;
770 break;
773 spin_unlock(&rbd_client_list_lock);
775 return found ? client_node : NULL;
779 * (Per device) rbd map options
781 enum {
782 Opt_queue_depth,
783 Opt_last_int,
784 /* int args above */
785 Opt_last_string,
786 /* string args above */
787 Opt_read_only,
788 Opt_read_write,
789 Opt_lock_on_read,
790 Opt_err
793 static match_table_t rbd_opts_tokens = {
794 {Opt_queue_depth, "queue_depth=%d"},
795 /* int args above */
796 /* string args above */
797 {Opt_read_only, "read_only"},
798 {Opt_read_only, "ro"}, /* Alternate spelling */
799 {Opt_read_write, "read_write"},
800 {Opt_read_write, "rw"}, /* Alternate spelling */
801 {Opt_lock_on_read, "lock_on_read"},
802 {Opt_err, NULL}
805 struct rbd_options {
806 int queue_depth;
807 bool read_only;
808 bool lock_on_read;
811 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
812 #define RBD_READ_ONLY_DEFAULT false
813 #define RBD_LOCK_ON_READ_DEFAULT false
815 static int parse_rbd_opts_token(char *c, void *private)
817 struct rbd_options *rbd_opts = private;
818 substring_t argstr[MAX_OPT_ARGS];
819 int token, intval, ret;
821 token = match_token(c, rbd_opts_tokens, argstr);
822 if (token < Opt_last_int) {
823 ret = match_int(&argstr[0], &intval);
824 if (ret < 0) {
825 pr_err("bad mount option arg (not int) at '%s'\n", c);
826 return ret;
828 dout("got int token %d val %d\n", token, intval);
829 } else if (token > Opt_last_int && token < Opt_last_string) {
830 dout("got string token %d val %s\n", token, argstr[0].from);
831 } else {
832 dout("got token %d\n", token);
835 switch (token) {
836 case Opt_queue_depth:
837 if (intval < 1) {
838 pr_err("queue_depth out of range\n");
839 return -EINVAL;
841 rbd_opts->queue_depth = intval;
842 break;
843 case Opt_read_only:
844 rbd_opts->read_only = true;
845 break;
846 case Opt_read_write:
847 rbd_opts->read_only = false;
848 break;
849 case Opt_lock_on_read:
850 rbd_opts->lock_on_read = true;
851 break;
852 default:
853 /* libceph prints "bad option" msg */
854 return -EINVAL;
857 return 0;
860 static char* obj_op_name(enum obj_operation_type op_type)
862 switch (op_type) {
863 case OBJ_OP_READ:
864 return "read";
865 case OBJ_OP_WRITE:
866 return "write";
867 case OBJ_OP_DISCARD:
868 return "discard";
869 default:
870 return "???";
875 * Get a ceph client with specific addr and configuration, if one does
876 * not exist create it. Either way, ceph_opts is consumed by this
877 * function.
879 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
881 struct rbd_client *rbdc;
883 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
884 rbdc = rbd_client_find(ceph_opts);
885 if (rbdc) /* using an existing client */
886 ceph_destroy_options(ceph_opts);
887 else
888 rbdc = rbd_client_create(ceph_opts);
889 mutex_unlock(&client_mutex);
891 return rbdc;
895 * Destroy ceph client
897 * Caller must hold rbd_client_list_lock.
899 static void rbd_client_release(struct kref *kref)
901 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
903 dout("%s: rbdc %p\n", __func__, rbdc);
904 spin_lock(&rbd_client_list_lock);
905 list_del(&rbdc->node);
906 spin_unlock(&rbd_client_list_lock);
908 ceph_destroy_client(rbdc->client);
909 kfree(rbdc);
913 * Drop reference to ceph client node. If it's not referenced anymore, release
914 * it.
916 static void rbd_put_client(struct rbd_client *rbdc)
918 if (rbdc)
919 kref_put(&rbdc->kref, rbd_client_release);
922 static bool rbd_image_format_valid(u32 image_format)
924 return image_format == 1 || image_format == 2;
927 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
929 size_t size;
930 u32 snap_count;
932 /* The header has to start with the magic rbd header text */
933 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
934 return false;
936 /* The bio layer requires at least sector-sized I/O */
938 if (ondisk->options.order < SECTOR_SHIFT)
939 return false;
941 /* If we use u64 in a few spots we may be able to loosen this */
943 if (ondisk->options.order > 8 * sizeof (int) - 1)
944 return false;
947 * The size of a snapshot header has to fit in a size_t, and
948 * that limits the number of snapshots.
950 snap_count = le32_to_cpu(ondisk->snap_count);
951 size = SIZE_MAX - sizeof (struct ceph_snap_context);
952 if (snap_count > size / sizeof (__le64))
953 return false;
956 * Not only that, but the size of the entire the snapshot
957 * header must also be representable in a size_t.
959 size -= snap_count * sizeof (__le64);
960 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
961 return false;
963 return true;
967 * Fill an rbd image header with information from the given format 1
968 * on-disk header.
970 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
971 struct rbd_image_header_ondisk *ondisk)
973 struct rbd_image_header *header = &rbd_dev->header;
974 bool first_time = header->object_prefix == NULL;
975 struct ceph_snap_context *snapc;
976 char *object_prefix = NULL;
977 char *snap_names = NULL;
978 u64 *snap_sizes = NULL;
979 u32 snap_count;
980 int ret = -ENOMEM;
981 u32 i;
983 /* Allocate this now to avoid having to handle failure below */
985 if (first_time) {
986 size_t len;
988 len = strnlen(ondisk->object_prefix,
989 sizeof (ondisk->object_prefix));
990 object_prefix = kmalloc(len + 1, GFP_KERNEL);
991 if (!object_prefix)
992 return -ENOMEM;
993 memcpy(object_prefix, ondisk->object_prefix, len);
994 object_prefix[len] = '\0';
997 /* Allocate the snapshot context and fill it in */
999 snap_count = le32_to_cpu(ondisk->snap_count);
1000 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1001 if (!snapc)
1002 goto out_err;
1003 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1004 if (snap_count) {
1005 struct rbd_image_snap_ondisk *snaps;
1006 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1008 /* We'll keep a copy of the snapshot names... */
1010 if (snap_names_len > (u64)SIZE_MAX)
1011 goto out_2big;
1012 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1013 if (!snap_names)
1014 goto out_err;
1016 /* ...as well as the array of their sizes. */
1017 snap_sizes = kmalloc_array(snap_count,
1018 sizeof(*header->snap_sizes),
1019 GFP_KERNEL);
1020 if (!snap_sizes)
1021 goto out_err;
1024 * Copy the names, and fill in each snapshot's id
1025 * and size.
1027 * Note that rbd_dev_v1_header_info() guarantees the
1028 * ondisk buffer we're working with has
1029 * snap_names_len bytes beyond the end of the
1030 * snapshot id array, this memcpy() is safe.
1032 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1033 snaps = ondisk->snaps;
1034 for (i = 0; i < snap_count; i++) {
1035 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1036 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1040 /* We won't fail any more, fill in the header */
1042 if (first_time) {
1043 header->object_prefix = object_prefix;
1044 header->obj_order = ondisk->options.order;
1045 header->crypt_type = ondisk->options.crypt_type;
1046 header->comp_type = ondisk->options.comp_type;
1047 /* The rest aren't used for format 1 images */
1048 header->stripe_unit = 0;
1049 header->stripe_count = 0;
1050 header->features = 0;
1051 } else {
1052 ceph_put_snap_context(header->snapc);
1053 kfree(header->snap_names);
1054 kfree(header->snap_sizes);
1057 /* The remaining fields always get updated (when we refresh) */
1059 header->image_size = le64_to_cpu(ondisk->image_size);
1060 header->snapc = snapc;
1061 header->snap_names = snap_names;
1062 header->snap_sizes = snap_sizes;
1064 return 0;
1065 out_2big:
1066 ret = -EIO;
1067 out_err:
1068 kfree(snap_sizes);
1069 kfree(snap_names);
1070 ceph_put_snap_context(snapc);
1071 kfree(object_prefix);
1073 return ret;
1076 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1078 const char *snap_name;
1080 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1082 /* Skip over names until we find the one we are looking for */
1084 snap_name = rbd_dev->header.snap_names;
1085 while (which--)
1086 snap_name += strlen(snap_name) + 1;
1088 return kstrdup(snap_name, GFP_KERNEL);
1092 * Snapshot id comparison function for use with qsort()/bsearch().
1093 * Note that result is for snapshots in *descending* order.
1095 static int snapid_compare_reverse(const void *s1, const void *s2)
1097 u64 snap_id1 = *(u64 *)s1;
1098 u64 snap_id2 = *(u64 *)s2;
1100 if (snap_id1 < snap_id2)
1101 return 1;
1102 return snap_id1 == snap_id2 ? 0 : -1;
1106 * Search a snapshot context to see if the given snapshot id is
1107 * present.
1109 * Returns the position of the snapshot id in the array if it's found,
1110 * or BAD_SNAP_INDEX otherwise.
1112 * Note: The snapshot array is in kept sorted (by the osd) in
1113 * reverse order, highest snapshot id first.
1115 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1117 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1118 u64 *found;
1120 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1121 sizeof (snap_id), snapid_compare_reverse);
1123 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1126 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1127 u64 snap_id)
1129 u32 which;
1130 const char *snap_name;
1132 which = rbd_dev_snap_index(rbd_dev, snap_id);
1133 if (which == BAD_SNAP_INDEX)
1134 return ERR_PTR(-ENOENT);
1136 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1137 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1140 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1142 if (snap_id == CEPH_NOSNAP)
1143 return RBD_SNAP_HEAD_NAME;
1145 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1146 if (rbd_dev->image_format == 1)
1147 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1149 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1152 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1153 u64 *snap_size)
1155 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1156 if (snap_id == CEPH_NOSNAP) {
1157 *snap_size = rbd_dev->header.image_size;
1158 } else if (rbd_dev->image_format == 1) {
1159 u32 which;
1161 which = rbd_dev_snap_index(rbd_dev, snap_id);
1162 if (which == BAD_SNAP_INDEX)
1163 return -ENOENT;
1165 *snap_size = rbd_dev->header.snap_sizes[which];
1166 } else {
1167 u64 size = 0;
1168 int ret;
1170 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1171 if (ret)
1172 return ret;
1174 *snap_size = size;
1176 return 0;
1179 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1180 u64 *snap_features)
1182 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1183 if (snap_id == CEPH_NOSNAP) {
1184 *snap_features = rbd_dev->header.features;
1185 } else if (rbd_dev->image_format == 1) {
1186 *snap_features = 0; /* No features for format 1 */
1187 } else {
1188 u64 features = 0;
1189 int ret;
1191 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1192 if (ret)
1193 return ret;
1195 *snap_features = features;
1197 return 0;
1200 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1202 u64 snap_id = rbd_dev->spec->snap_id;
1203 u64 size = 0;
1204 u64 features = 0;
1205 int ret;
1207 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1208 if (ret)
1209 return ret;
1210 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1211 if (ret)
1212 return ret;
1214 rbd_dev->mapping.size = size;
1215 rbd_dev->mapping.features = features;
1217 return 0;
1220 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1222 rbd_dev->mapping.size = 0;
1223 rbd_dev->mapping.features = 0;
1226 static void rbd_segment_name_free(const char *name)
1228 /* The explicit cast here is needed to drop the const qualifier */
1230 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1233 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1235 char *name;
1236 u64 segment;
1237 int ret;
1238 char *name_format;
1240 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1241 if (!name)
1242 return NULL;
1243 segment = offset >> rbd_dev->header.obj_order;
1244 name_format = "%s.%012llx";
1245 if (rbd_dev->image_format == 2)
1246 name_format = "%s.%016llx";
1247 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1248 rbd_dev->header.object_prefix, segment);
1249 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1250 pr_err("error formatting segment name for #%llu (%d)\n",
1251 segment, ret);
1252 rbd_segment_name_free(name);
1253 name = NULL;
1256 return name;
1259 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1261 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1263 return offset & (segment_size - 1);
1266 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1267 u64 offset, u64 length)
1269 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1271 offset &= segment_size - 1;
1273 rbd_assert(length <= U64_MAX - offset);
1274 if (offset + length > segment_size)
1275 length = segment_size - offset;
1277 return length;
1281 * returns the size of an object in the image
1283 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1285 return 1 << header->obj_order;
1289 * bio helpers
1292 static void bio_chain_put(struct bio *chain)
1294 struct bio *tmp;
1296 while (chain) {
1297 tmp = chain;
1298 chain = chain->bi_next;
1299 bio_put(tmp);
1304 * zeros a bio chain, starting at specific offset
1306 static void zero_bio_chain(struct bio *chain, int start_ofs)
1308 struct bio_vec bv;
1309 struct bvec_iter iter;
1310 unsigned long flags;
1311 void *buf;
1312 int pos = 0;
1314 while (chain) {
1315 bio_for_each_segment(bv, chain, iter) {
1316 if (pos + bv.bv_len > start_ofs) {
1317 int remainder = max(start_ofs - pos, 0);
1318 buf = bvec_kmap_irq(&bv, &flags);
1319 memset(buf + remainder, 0,
1320 bv.bv_len - remainder);
1321 flush_dcache_page(bv.bv_page);
1322 bvec_kunmap_irq(buf, &flags);
1324 pos += bv.bv_len;
1327 chain = chain->bi_next;
1332 * similar to zero_bio_chain(), zeros data defined by a page array,
1333 * starting at the given byte offset from the start of the array and
1334 * continuing up to the given end offset. The pages array is
1335 * assumed to be big enough to hold all bytes up to the end.
1337 static void zero_pages(struct page **pages, u64 offset, u64 end)
1339 struct page **page = &pages[offset >> PAGE_SHIFT];
1341 rbd_assert(end > offset);
1342 rbd_assert(end - offset <= (u64)SIZE_MAX);
1343 while (offset < end) {
1344 size_t page_offset;
1345 size_t length;
1346 unsigned long flags;
1347 void *kaddr;
1349 page_offset = offset & ~PAGE_MASK;
1350 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1351 local_irq_save(flags);
1352 kaddr = kmap_atomic(*page);
1353 memset(kaddr + page_offset, 0, length);
1354 flush_dcache_page(*page);
1355 kunmap_atomic(kaddr);
1356 local_irq_restore(flags);
1358 offset += length;
1359 page++;
1364 * Clone a portion of a bio, starting at the given byte offset
1365 * and continuing for the number of bytes indicated.
1367 static struct bio *bio_clone_range(struct bio *bio_src,
1368 unsigned int offset,
1369 unsigned int len,
1370 gfp_t gfpmask)
1372 struct bio *bio;
1374 bio = bio_clone(bio_src, gfpmask);
1375 if (!bio)
1376 return NULL; /* ENOMEM */
1378 bio_advance(bio, offset);
1379 bio->bi_iter.bi_size = len;
1381 return bio;
1385 * Clone a portion of a bio chain, starting at the given byte offset
1386 * into the first bio in the source chain and continuing for the
1387 * number of bytes indicated. The result is another bio chain of
1388 * exactly the given length, or a null pointer on error.
1390 * The bio_src and offset parameters are both in-out. On entry they
1391 * refer to the first source bio and the offset into that bio where
1392 * the start of data to be cloned is located.
1394 * On return, bio_src is updated to refer to the bio in the source
1395 * chain that contains first un-cloned byte, and *offset will
1396 * contain the offset of that byte within that bio.
1398 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1399 unsigned int *offset,
1400 unsigned int len,
1401 gfp_t gfpmask)
1403 struct bio *bi = *bio_src;
1404 unsigned int off = *offset;
1405 struct bio *chain = NULL;
1406 struct bio **end;
1408 /* Build up a chain of clone bios up to the limit */
1410 if (!bi || off >= bi->bi_iter.bi_size || !len)
1411 return NULL; /* Nothing to clone */
1413 end = &chain;
1414 while (len) {
1415 unsigned int bi_size;
1416 struct bio *bio;
1418 if (!bi) {
1419 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1420 goto out_err; /* EINVAL; ran out of bio's */
1422 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1423 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1424 if (!bio)
1425 goto out_err; /* ENOMEM */
1427 *end = bio;
1428 end = &bio->bi_next;
1430 off += bi_size;
1431 if (off == bi->bi_iter.bi_size) {
1432 bi = bi->bi_next;
1433 off = 0;
1435 len -= bi_size;
1437 *bio_src = bi;
1438 *offset = off;
1440 return chain;
1441 out_err:
1442 bio_chain_put(chain);
1444 return NULL;
1448 * The default/initial value for all object request flags is 0. For
1449 * each flag, once its value is set to 1 it is never reset to 0
1450 * again.
1452 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1454 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1455 struct rbd_device *rbd_dev;
1457 rbd_dev = obj_request->img_request->rbd_dev;
1458 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1459 obj_request);
1463 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1465 smp_mb();
1466 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1469 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1471 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1472 struct rbd_device *rbd_dev = NULL;
1474 if (obj_request_img_data_test(obj_request))
1475 rbd_dev = obj_request->img_request->rbd_dev;
1476 rbd_warn(rbd_dev, "obj_request %p already marked done",
1477 obj_request);
1481 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1483 smp_mb();
1484 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1488 * This sets the KNOWN flag after (possibly) setting the EXISTS
1489 * flag. The latter is set based on the "exists" value provided.
1491 * Note that for our purposes once an object exists it never goes
1492 * away again. It's possible that the response from two existence
1493 * checks are separated by the creation of the target object, and
1494 * the first ("doesn't exist") response arrives *after* the second
1495 * ("does exist"). In that case we ignore the second one.
1497 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1498 bool exists)
1500 if (exists)
1501 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1502 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1503 smp_mb();
1506 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1508 smp_mb();
1509 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1512 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1514 smp_mb();
1515 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1518 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1520 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1522 return obj_request->img_offset <
1523 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1526 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1528 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1529 atomic_read(&obj_request->kref.refcount));
1530 kref_get(&obj_request->kref);
1533 static void rbd_obj_request_destroy(struct kref *kref);
1534 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1536 rbd_assert(obj_request != NULL);
1537 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1538 atomic_read(&obj_request->kref.refcount));
1539 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1542 static void rbd_img_request_get(struct rbd_img_request *img_request)
1544 dout("%s: img %p (was %d)\n", __func__, img_request,
1545 atomic_read(&img_request->kref.refcount));
1546 kref_get(&img_request->kref);
1549 static bool img_request_child_test(struct rbd_img_request *img_request);
1550 static void rbd_parent_request_destroy(struct kref *kref);
1551 static void rbd_img_request_destroy(struct kref *kref);
1552 static void rbd_img_request_put(struct rbd_img_request *img_request)
1554 rbd_assert(img_request != NULL);
1555 dout("%s: img %p (was %d)\n", __func__, img_request,
1556 atomic_read(&img_request->kref.refcount));
1557 if (img_request_child_test(img_request))
1558 kref_put(&img_request->kref, rbd_parent_request_destroy);
1559 else
1560 kref_put(&img_request->kref, rbd_img_request_destroy);
1563 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1564 struct rbd_obj_request *obj_request)
1566 rbd_assert(obj_request->img_request == NULL);
1568 /* Image request now owns object's original reference */
1569 obj_request->img_request = img_request;
1570 obj_request->which = img_request->obj_request_count;
1571 rbd_assert(!obj_request_img_data_test(obj_request));
1572 obj_request_img_data_set(obj_request);
1573 rbd_assert(obj_request->which != BAD_WHICH);
1574 img_request->obj_request_count++;
1575 list_add_tail(&obj_request->links, &img_request->obj_requests);
1576 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1577 obj_request->which);
1580 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1581 struct rbd_obj_request *obj_request)
1583 rbd_assert(obj_request->which != BAD_WHICH);
1585 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1586 obj_request->which);
1587 list_del(&obj_request->links);
1588 rbd_assert(img_request->obj_request_count > 0);
1589 img_request->obj_request_count--;
1590 rbd_assert(obj_request->which == img_request->obj_request_count);
1591 obj_request->which = BAD_WHICH;
1592 rbd_assert(obj_request_img_data_test(obj_request));
1593 rbd_assert(obj_request->img_request == img_request);
1594 obj_request->img_request = NULL;
1595 obj_request->callback = NULL;
1596 rbd_obj_request_put(obj_request);
1599 static bool obj_request_type_valid(enum obj_request_type type)
1601 switch (type) {
1602 case OBJ_REQUEST_NODATA:
1603 case OBJ_REQUEST_BIO:
1604 case OBJ_REQUEST_PAGES:
1605 return true;
1606 default:
1607 return false;
1611 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1613 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1615 struct ceph_osd_request *osd_req = obj_request->osd_req;
1617 dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
1618 if (obj_request_img_data_test(obj_request)) {
1619 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1620 rbd_img_request_get(obj_request->img_request);
1622 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1625 static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1627 dout("%s %p\n", __func__, obj_request);
1628 ceph_osdc_cancel_request(obj_request->osd_req);
1632 * Wait for an object request to complete. If interrupted, cancel the
1633 * underlying osd request.
1635 * @timeout: in jiffies, 0 means "wait forever"
1637 static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1638 unsigned long timeout)
1640 long ret;
1642 dout("%s %p\n", __func__, obj_request);
1643 ret = wait_for_completion_interruptible_timeout(
1644 &obj_request->completion,
1645 ceph_timeout_jiffies(timeout));
1646 if (ret <= 0) {
1647 if (ret == 0)
1648 ret = -ETIMEDOUT;
1649 rbd_obj_request_end(obj_request);
1650 } else {
1651 ret = 0;
1654 dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1655 return ret;
1658 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1660 return __rbd_obj_request_wait(obj_request, 0);
1663 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1666 dout("%s: img %p\n", __func__, img_request);
1669 * If no error occurred, compute the aggregate transfer
1670 * count for the image request. We could instead use
1671 * atomic64_cmpxchg() to update it as each object request
1672 * completes; not clear which way is better off hand.
1674 if (!img_request->result) {
1675 struct rbd_obj_request *obj_request;
1676 u64 xferred = 0;
1678 for_each_obj_request(img_request, obj_request)
1679 xferred += obj_request->xferred;
1680 img_request->xferred = xferred;
1683 if (img_request->callback)
1684 img_request->callback(img_request);
1685 else
1686 rbd_img_request_put(img_request);
1690 * The default/initial value for all image request flags is 0. Each
1691 * is conditionally set to 1 at image request initialization time
1692 * and currently never change thereafter.
1694 static void img_request_write_set(struct rbd_img_request *img_request)
1696 set_bit(IMG_REQ_WRITE, &img_request->flags);
1697 smp_mb();
1700 static bool img_request_write_test(struct rbd_img_request *img_request)
1702 smp_mb();
1703 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1707 * Set the discard flag when the img_request is an discard request
1709 static void img_request_discard_set(struct rbd_img_request *img_request)
1711 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1712 smp_mb();
1715 static bool img_request_discard_test(struct rbd_img_request *img_request)
1717 smp_mb();
1718 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1721 static void img_request_child_set(struct rbd_img_request *img_request)
1723 set_bit(IMG_REQ_CHILD, &img_request->flags);
1724 smp_mb();
1727 static void img_request_child_clear(struct rbd_img_request *img_request)
1729 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1730 smp_mb();
1733 static bool img_request_child_test(struct rbd_img_request *img_request)
1735 smp_mb();
1736 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1739 static void img_request_layered_set(struct rbd_img_request *img_request)
1741 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1742 smp_mb();
1745 static void img_request_layered_clear(struct rbd_img_request *img_request)
1747 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1748 smp_mb();
1751 static bool img_request_layered_test(struct rbd_img_request *img_request)
1753 smp_mb();
1754 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1757 static enum obj_operation_type
1758 rbd_img_request_op_type(struct rbd_img_request *img_request)
1760 if (img_request_write_test(img_request))
1761 return OBJ_OP_WRITE;
1762 else if (img_request_discard_test(img_request))
1763 return OBJ_OP_DISCARD;
1764 else
1765 return OBJ_OP_READ;
1768 static void
1769 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1771 u64 xferred = obj_request->xferred;
1772 u64 length = obj_request->length;
1774 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1775 obj_request, obj_request->img_request, obj_request->result,
1776 xferred, length);
1778 * ENOENT means a hole in the image. We zero-fill the entire
1779 * length of the request. A short read also implies zero-fill
1780 * to the end of the request. An error requires the whole
1781 * length of the request to be reported finished with an error
1782 * to the block layer. In each case we update the xferred
1783 * count to indicate the whole request was satisfied.
1785 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1786 if (obj_request->result == -ENOENT) {
1787 if (obj_request->type == OBJ_REQUEST_BIO)
1788 zero_bio_chain(obj_request->bio_list, 0);
1789 else
1790 zero_pages(obj_request->pages, 0, length);
1791 obj_request->result = 0;
1792 } else if (xferred < length && !obj_request->result) {
1793 if (obj_request->type == OBJ_REQUEST_BIO)
1794 zero_bio_chain(obj_request->bio_list, xferred);
1795 else
1796 zero_pages(obj_request->pages, xferred, length);
1798 obj_request->xferred = length;
1799 obj_request_done_set(obj_request);
1802 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1804 dout("%s: obj %p cb %p\n", __func__, obj_request,
1805 obj_request->callback);
1806 if (obj_request->callback)
1807 obj_request->callback(obj_request);
1808 else
1809 complete_all(&obj_request->completion);
1812 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1814 obj_request->result = err;
1815 obj_request->xferred = 0;
1817 * kludge - mirror rbd_obj_request_submit() to match a put in
1818 * rbd_img_obj_callback()
1820 if (obj_request_img_data_test(obj_request)) {
1821 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1822 rbd_img_request_get(obj_request->img_request);
1824 obj_request_done_set(obj_request);
1825 rbd_obj_request_complete(obj_request);
1828 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1830 struct rbd_img_request *img_request = NULL;
1831 struct rbd_device *rbd_dev = NULL;
1832 bool layered = false;
1834 if (obj_request_img_data_test(obj_request)) {
1835 img_request = obj_request->img_request;
1836 layered = img_request && img_request_layered_test(img_request);
1837 rbd_dev = img_request->rbd_dev;
1840 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1841 obj_request, img_request, obj_request->result,
1842 obj_request->xferred, obj_request->length);
1843 if (layered && obj_request->result == -ENOENT &&
1844 obj_request->img_offset < rbd_dev->parent_overlap)
1845 rbd_img_parent_read(obj_request);
1846 else if (img_request)
1847 rbd_img_obj_request_read_callback(obj_request);
1848 else
1849 obj_request_done_set(obj_request);
1852 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1854 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1855 obj_request->result, obj_request->length);
1857 * There is no such thing as a successful short write. Set
1858 * it to our originally-requested length.
1860 obj_request->xferred = obj_request->length;
1861 obj_request_done_set(obj_request);
1864 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1866 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1867 obj_request->result, obj_request->length);
1869 * There is no such thing as a successful short discard. Set
1870 * it to our originally-requested length.
1872 obj_request->xferred = obj_request->length;
1873 /* discarding a non-existent object is not a problem */
1874 if (obj_request->result == -ENOENT)
1875 obj_request->result = 0;
1876 obj_request_done_set(obj_request);
1880 * For a simple stat call there's nothing to do. We'll do more if
1881 * this is part of a write sequence for a layered image.
1883 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1885 dout("%s: obj %p\n", __func__, obj_request);
1886 obj_request_done_set(obj_request);
1889 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1891 dout("%s: obj %p\n", __func__, obj_request);
1893 if (obj_request_img_data_test(obj_request))
1894 rbd_osd_copyup_callback(obj_request);
1895 else
1896 obj_request_done_set(obj_request);
1899 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1901 struct rbd_obj_request *obj_request = osd_req->r_priv;
1902 u16 opcode;
1904 dout("%s: osd_req %p\n", __func__, osd_req);
1905 rbd_assert(osd_req == obj_request->osd_req);
1906 if (obj_request_img_data_test(obj_request)) {
1907 rbd_assert(obj_request->img_request);
1908 rbd_assert(obj_request->which != BAD_WHICH);
1909 } else {
1910 rbd_assert(obj_request->which == BAD_WHICH);
1913 if (osd_req->r_result < 0)
1914 obj_request->result = osd_req->r_result;
1917 * We support a 64-bit length, but ultimately it has to be
1918 * passed to the block layer, which just supports a 32-bit
1919 * length field.
1921 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1922 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1924 opcode = osd_req->r_ops[0].op;
1925 switch (opcode) {
1926 case CEPH_OSD_OP_READ:
1927 rbd_osd_read_callback(obj_request);
1928 break;
1929 case CEPH_OSD_OP_SETALLOCHINT:
1930 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1931 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1932 /* fall through */
1933 case CEPH_OSD_OP_WRITE:
1934 case CEPH_OSD_OP_WRITEFULL:
1935 rbd_osd_write_callback(obj_request);
1936 break;
1937 case CEPH_OSD_OP_STAT:
1938 rbd_osd_stat_callback(obj_request);
1939 break;
1940 case CEPH_OSD_OP_DELETE:
1941 case CEPH_OSD_OP_TRUNCATE:
1942 case CEPH_OSD_OP_ZERO:
1943 rbd_osd_discard_callback(obj_request);
1944 break;
1945 case CEPH_OSD_OP_CALL:
1946 rbd_osd_call_callback(obj_request);
1947 break;
1948 default:
1949 rbd_warn(NULL, "%s: unsupported op %hu",
1950 obj_request->object_name, (unsigned short) opcode);
1951 break;
1954 if (obj_request_done_test(obj_request))
1955 rbd_obj_request_complete(obj_request);
1958 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1960 struct ceph_osd_request *osd_req = obj_request->osd_req;
1962 rbd_assert(obj_request_img_data_test(obj_request));
1963 osd_req->r_snapid = obj_request->img_request->snap_id;
1966 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1968 struct ceph_osd_request *osd_req = obj_request->osd_req;
1970 osd_req->r_mtime = CURRENT_TIME;
1971 osd_req->r_data_offset = obj_request->offset;
1975 * Create an osd request. A read request has one osd op (read).
1976 * A write request has either one (watch) or two (hint+write) osd ops.
1977 * (All rbd data writes are prefixed with an allocation hint op, but
1978 * technically osd watch is a write request, hence this distinction.)
1980 static struct ceph_osd_request *rbd_osd_req_create(
1981 struct rbd_device *rbd_dev,
1982 enum obj_operation_type op_type,
1983 unsigned int num_ops,
1984 struct rbd_obj_request *obj_request)
1986 struct ceph_snap_context *snapc = NULL;
1987 struct ceph_osd_client *osdc;
1988 struct ceph_osd_request *osd_req;
1990 if (obj_request_img_data_test(obj_request) &&
1991 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1992 struct rbd_img_request *img_request = obj_request->img_request;
1993 if (op_type == OBJ_OP_WRITE) {
1994 rbd_assert(img_request_write_test(img_request));
1995 } else {
1996 rbd_assert(img_request_discard_test(img_request));
1998 snapc = img_request->snapc;
2001 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
2003 /* Allocate and initialize the request, for the num_ops ops */
2005 osdc = &rbd_dev->rbd_client->client->osdc;
2006 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2007 GFP_NOIO);
2008 if (!osd_req)
2009 goto fail;
2011 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2012 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2013 else
2014 osd_req->r_flags = CEPH_OSD_FLAG_READ;
2016 osd_req->r_callback = rbd_osd_req_callback;
2017 osd_req->r_priv = obj_request;
2019 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
2020 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2021 obj_request->object_name))
2022 goto fail;
2024 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2025 goto fail;
2027 return osd_req;
2029 fail:
2030 ceph_osdc_put_request(osd_req);
2031 return NULL;
2035 * Create a copyup osd request based on the information in the object
2036 * request supplied. A copyup request has two or three osd ops, a
2037 * copyup method call, potentially a hint op, and a write or truncate
2038 * or zero op.
2040 static struct ceph_osd_request *
2041 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2043 struct rbd_img_request *img_request;
2044 struct ceph_snap_context *snapc;
2045 struct rbd_device *rbd_dev;
2046 struct ceph_osd_client *osdc;
2047 struct ceph_osd_request *osd_req;
2048 int num_osd_ops = 3;
2050 rbd_assert(obj_request_img_data_test(obj_request));
2051 img_request = obj_request->img_request;
2052 rbd_assert(img_request);
2053 rbd_assert(img_request_write_test(img_request) ||
2054 img_request_discard_test(img_request));
2056 if (img_request_discard_test(img_request))
2057 num_osd_ops = 2;
2059 /* Allocate and initialize the request, for all the ops */
2061 snapc = img_request->snapc;
2062 rbd_dev = img_request->rbd_dev;
2063 osdc = &rbd_dev->rbd_client->client->osdc;
2064 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2065 false, GFP_NOIO);
2066 if (!osd_req)
2067 goto fail;
2069 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2070 osd_req->r_callback = rbd_osd_req_callback;
2071 osd_req->r_priv = obj_request;
2073 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
2074 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2075 obj_request->object_name))
2076 goto fail;
2078 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2079 goto fail;
2081 return osd_req;
2083 fail:
2084 ceph_osdc_put_request(osd_req);
2085 return NULL;
2089 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2091 ceph_osdc_put_request(osd_req);
2094 /* object_name is assumed to be a non-null pointer and NUL-terminated */
2096 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2097 u64 offset, u64 length,
2098 enum obj_request_type type)
2100 struct rbd_obj_request *obj_request;
2101 size_t size;
2102 char *name;
2104 rbd_assert(obj_request_type_valid(type));
2106 size = strlen(object_name) + 1;
2107 name = kmalloc(size, GFP_NOIO);
2108 if (!name)
2109 return NULL;
2111 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2112 if (!obj_request) {
2113 kfree(name);
2114 return NULL;
2117 obj_request->object_name = memcpy(name, object_name, size);
2118 obj_request->offset = offset;
2119 obj_request->length = length;
2120 obj_request->flags = 0;
2121 obj_request->which = BAD_WHICH;
2122 obj_request->type = type;
2123 INIT_LIST_HEAD(&obj_request->links);
2124 init_completion(&obj_request->completion);
2125 kref_init(&obj_request->kref);
2127 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2128 offset, length, (int)type, obj_request);
2130 return obj_request;
2133 static void rbd_obj_request_destroy(struct kref *kref)
2135 struct rbd_obj_request *obj_request;
2137 obj_request = container_of(kref, struct rbd_obj_request, kref);
2139 dout("%s: obj %p\n", __func__, obj_request);
2141 rbd_assert(obj_request->img_request == NULL);
2142 rbd_assert(obj_request->which == BAD_WHICH);
2144 if (obj_request->osd_req)
2145 rbd_osd_req_destroy(obj_request->osd_req);
2147 rbd_assert(obj_request_type_valid(obj_request->type));
2148 switch (obj_request->type) {
2149 case OBJ_REQUEST_NODATA:
2150 break; /* Nothing to do */
2151 case OBJ_REQUEST_BIO:
2152 if (obj_request->bio_list)
2153 bio_chain_put(obj_request->bio_list);
2154 break;
2155 case OBJ_REQUEST_PAGES:
2156 /* img_data requests don't own their page array */
2157 if (obj_request->pages &&
2158 !obj_request_img_data_test(obj_request))
2159 ceph_release_page_vector(obj_request->pages,
2160 obj_request->page_count);
2161 break;
2164 kfree(obj_request->object_name);
2165 obj_request->object_name = NULL;
2166 kmem_cache_free(rbd_obj_request_cache, obj_request);
2169 /* It's OK to call this for a device with no parent */
2171 static void rbd_spec_put(struct rbd_spec *spec);
2172 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2174 rbd_dev_remove_parent(rbd_dev);
2175 rbd_spec_put(rbd_dev->parent_spec);
2176 rbd_dev->parent_spec = NULL;
2177 rbd_dev->parent_overlap = 0;
2181 * Parent image reference counting is used to determine when an
2182 * image's parent fields can be safely torn down--after there are no
2183 * more in-flight requests to the parent image. When the last
2184 * reference is dropped, cleaning them up is safe.
2186 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2188 int counter;
2190 if (!rbd_dev->parent_spec)
2191 return;
2193 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2194 if (counter > 0)
2195 return;
2197 /* Last reference; clean up parent data structures */
2199 if (!counter)
2200 rbd_dev_unparent(rbd_dev);
2201 else
2202 rbd_warn(rbd_dev, "parent reference underflow");
2206 * If an image has a non-zero parent overlap, get a reference to its
2207 * parent.
2209 * Returns true if the rbd device has a parent with a non-zero
2210 * overlap and a reference for it was successfully taken, or
2211 * false otherwise.
2213 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2215 int counter = 0;
2217 if (!rbd_dev->parent_spec)
2218 return false;
2220 down_read(&rbd_dev->header_rwsem);
2221 if (rbd_dev->parent_overlap)
2222 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2223 up_read(&rbd_dev->header_rwsem);
2225 if (counter < 0)
2226 rbd_warn(rbd_dev, "parent reference overflow");
2228 return counter > 0;
2232 * Caller is responsible for filling in the list of object requests
2233 * that comprises the image request, and the Linux request pointer
2234 * (if there is one).
2236 static struct rbd_img_request *rbd_img_request_create(
2237 struct rbd_device *rbd_dev,
2238 u64 offset, u64 length,
2239 enum obj_operation_type op_type,
2240 struct ceph_snap_context *snapc)
2242 struct rbd_img_request *img_request;
2244 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2245 if (!img_request)
2246 return NULL;
2248 img_request->rq = NULL;
2249 img_request->rbd_dev = rbd_dev;
2250 img_request->offset = offset;
2251 img_request->length = length;
2252 img_request->flags = 0;
2253 if (op_type == OBJ_OP_DISCARD) {
2254 img_request_discard_set(img_request);
2255 img_request->snapc = snapc;
2256 } else if (op_type == OBJ_OP_WRITE) {
2257 img_request_write_set(img_request);
2258 img_request->snapc = snapc;
2259 } else {
2260 img_request->snap_id = rbd_dev->spec->snap_id;
2262 if (rbd_dev_parent_get(rbd_dev))
2263 img_request_layered_set(img_request);
2264 spin_lock_init(&img_request->completion_lock);
2265 img_request->next_completion = 0;
2266 img_request->callback = NULL;
2267 img_request->result = 0;
2268 img_request->obj_request_count = 0;
2269 INIT_LIST_HEAD(&img_request->obj_requests);
2270 kref_init(&img_request->kref);
2272 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2273 obj_op_name(op_type), offset, length, img_request);
2275 return img_request;
2278 static void rbd_img_request_destroy(struct kref *kref)
2280 struct rbd_img_request *img_request;
2281 struct rbd_obj_request *obj_request;
2282 struct rbd_obj_request *next_obj_request;
2284 img_request = container_of(kref, struct rbd_img_request, kref);
2286 dout("%s: img %p\n", __func__, img_request);
2288 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2289 rbd_img_obj_request_del(img_request, obj_request);
2290 rbd_assert(img_request->obj_request_count == 0);
2292 if (img_request_layered_test(img_request)) {
2293 img_request_layered_clear(img_request);
2294 rbd_dev_parent_put(img_request->rbd_dev);
2297 if (img_request_write_test(img_request) ||
2298 img_request_discard_test(img_request))
2299 ceph_put_snap_context(img_request->snapc);
2301 kmem_cache_free(rbd_img_request_cache, img_request);
2304 static struct rbd_img_request *rbd_parent_request_create(
2305 struct rbd_obj_request *obj_request,
2306 u64 img_offset, u64 length)
2308 struct rbd_img_request *parent_request;
2309 struct rbd_device *rbd_dev;
2311 rbd_assert(obj_request->img_request);
2312 rbd_dev = obj_request->img_request->rbd_dev;
2314 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2315 length, OBJ_OP_READ, NULL);
2316 if (!parent_request)
2317 return NULL;
2319 img_request_child_set(parent_request);
2320 rbd_obj_request_get(obj_request);
2321 parent_request->obj_request = obj_request;
2323 return parent_request;
2326 static void rbd_parent_request_destroy(struct kref *kref)
2328 struct rbd_img_request *parent_request;
2329 struct rbd_obj_request *orig_request;
2331 parent_request = container_of(kref, struct rbd_img_request, kref);
2332 orig_request = parent_request->obj_request;
2334 parent_request->obj_request = NULL;
2335 rbd_obj_request_put(orig_request);
2336 img_request_child_clear(parent_request);
2338 rbd_img_request_destroy(kref);
2341 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2343 struct rbd_img_request *img_request;
2344 unsigned int xferred;
2345 int result;
2346 bool more;
2348 rbd_assert(obj_request_img_data_test(obj_request));
2349 img_request = obj_request->img_request;
2351 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2352 xferred = (unsigned int)obj_request->xferred;
2353 result = obj_request->result;
2354 if (result) {
2355 struct rbd_device *rbd_dev = img_request->rbd_dev;
2356 enum obj_operation_type op_type;
2358 if (img_request_discard_test(img_request))
2359 op_type = OBJ_OP_DISCARD;
2360 else if (img_request_write_test(img_request))
2361 op_type = OBJ_OP_WRITE;
2362 else
2363 op_type = OBJ_OP_READ;
2365 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2366 obj_op_name(op_type), obj_request->length,
2367 obj_request->img_offset, obj_request->offset);
2368 rbd_warn(rbd_dev, " result %d xferred %x",
2369 result, xferred);
2370 if (!img_request->result)
2371 img_request->result = result;
2373 * Need to end I/O on the entire obj_request worth of
2374 * bytes in case of error.
2376 xferred = obj_request->length;
2379 if (img_request_child_test(img_request)) {
2380 rbd_assert(img_request->obj_request != NULL);
2381 more = obj_request->which < img_request->obj_request_count - 1;
2382 } else {
2383 rbd_assert(img_request->rq != NULL);
2385 more = blk_update_request(img_request->rq, result, xferred);
2386 if (!more)
2387 __blk_mq_end_request(img_request->rq, result);
2390 return more;
2393 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2395 struct rbd_img_request *img_request;
2396 u32 which = obj_request->which;
2397 bool more = true;
2399 rbd_assert(obj_request_img_data_test(obj_request));
2400 img_request = obj_request->img_request;
2402 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2403 rbd_assert(img_request != NULL);
2404 rbd_assert(img_request->obj_request_count > 0);
2405 rbd_assert(which != BAD_WHICH);
2406 rbd_assert(which < img_request->obj_request_count);
2408 spin_lock_irq(&img_request->completion_lock);
2409 if (which != img_request->next_completion)
2410 goto out;
2412 for_each_obj_request_from(img_request, obj_request) {
2413 rbd_assert(more);
2414 rbd_assert(which < img_request->obj_request_count);
2416 if (!obj_request_done_test(obj_request))
2417 break;
2418 more = rbd_img_obj_end_request(obj_request);
2419 which++;
2422 rbd_assert(more ^ (which == img_request->obj_request_count));
2423 img_request->next_completion = which;
2424 out:
2425 spin_unlock_irq(&img_request->completion_lock);
2426 rbd_img_request_put(img_request);
2428 if (!more)
2429 rbd_img_request_complete(img_request);
2433 * Add individual osd ops to the given ceph_osd_request and prepare
2434 * them for submission. num_ops is the current number of
2435 * osd operations already to the object request.
2437 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2438 struct ceph_osd_request *osd_request,
2439 enum obj_operation_type op_type,
2440 unsigned int num_ops)
2442 struct rbd_img_request *img_request = obj_request->img_request;
2443 struct rbd_device *rbd_dev = img_request->rbd_dev;
2444 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2445 u64 offset = obj_request->offset;
2446 u64 length = obj_request->length;
2447 u64 img_end;
2448 u16 opcode;
2450 if (op_type == OBJ_OP_DISCARD) {
2451 if (!offset && length == object_size &&
2452 (!img_request_layered_test(img_request) ||
2453 !obj_request_overlaps_parent(obj_request))) {
2454 opcode = CEPH_OSD_OP_DELETE;
2455 } else if ((offset + length == object_size)) {
2456 opcode = CEPH_OSD_OP_TRUNCATE;
2457 } else {
2458 down_read(&rbd_dev->header_rwsem);
2459 img_end = rbd_dev->header.image_size;
2460 up_read(&rbd_dev->header_rwsem);
2462 if (obj_request->img_offset + length == img_end)
2463 opcode = CEPH_OSD_OP_TRUNCATE;
2464 else
2465 opcode = CEPH_OSD_OP_ZERO;
2467 } else if (op_type == OBJ_OP_WRITE) {
2468 if (!offset && length == object_size)
2469 opcode = CEPH_OSD_OP_WRITEFULL;
2470 else
2471 opcode = CEPH_OSD_OP_WRITE;
2472 osd_req_op_alloc_hint_init(osd_request, num_ops,
2473 object_size, object_size);
2474 num_ops++;
2475 } else {
2476 opcode = CEPH_OSD_OP_READ;
2479 if (opcode == CEPH_OSD_OP_DELETE)
2480 osd_req_op_init(osd_request, num_ops, opcode, 0);
2481 else
2482 osd_req_op_extent_init(osd_request, num_ops, opcode,
2483 offset, length, 0, 0);
2485 if (obj_request->type == OBJ_REQUEST_BIO)
2486 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2487 obj_request->bio_list, length);
2488 else if (obj_request->type == OBJ_REQUEST_PAGES)
2489 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2490 obj_request->pages, length,
2491 offset & ~PAGE_MASK, false, false);
2493 /* Discards are also writes */
2494 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2495 rbd_osd_req_format_write(obj_request);
2496 else
2497 rbd_osd_req_format_read(obj_request);
2501 * Split up an image request into one or more object requests, each
2502 * to a different object. The "type" parameter indicates whether
2503 * "data_desc" is the pointer to the head of a list of bio
2504 * structures, or the base of a page array. In either case this
2505 * function assumes data_desc describes memory sufficient to hold
2506 * all data described by the image request.
2508 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2509 enum obj_request_type type,
2510 void *data_desc)
2512 struct rbd_device *rbd_dev = img_request->rbd_dev;
2513 struct rbd_obj_request *obj_request = NULL;
2514 struct rbd_obj_request *next_obj_request;
2515 struct bio *bio_list = NULL;
2516 unsigned int bio_offset = 0;
2517 struct page **pages = NULL;
2518 enum obj_operation_type op_type;
2519 u64 img_offset;
2520 u64 resid;
2522 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2523 (int)type, data_desc);
2525 img_offset = img_request->offset;
2526 resid = img_request->length;
2527 rbd_assert(resid > 0);
2528 op_type = rbd_img_request_op_type(img_request);
2530 if (type == OBJ_REQUEST_BIO) {
2531 bio_list = data_desc;
2532 rbd_assert(img_offset ==
2533 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2534 } else if (type == OBJ_REQUEST_PAGES) {
2535 pages = data_desc;
2538 while (resid) {
2539 struct ceph_osd_request *osd_req;
2540 const char *object_name;
2541 u64 offset;
2542 u64 length;
2544 object_name = rbd_segment_name(rbd_dev, img_offset);
2545 if (!object_name)
2546 goto out_unwind;
2547 offset = rbd_segment_offset(rbd_dev, img_offset);
2548 length = rbd_segment_length(rbd_dev, img_offset, resid);
2549 obj_request = rbd_obj_request_create(object_name,
2550 offset, length, type);
2551 /* object request has its own copy of the object name */
2552 rbd_segment_name_free(object_name);
2553 if (!obj_request)
2554 goto out_unwind;
2557 * set obj_request->img_request before creating the
2558 * osd_request so that it gets the right snapc
2560 rbd_img_obj_request_add(img_request, obj_request);
2562 if (type == OBJ_REQUEST_BIO) {
2563 unsigned int clone_size;
2565 rbd_assert(length <= (u64)UINT_MAX);
2566 clone_size = (unsigned int)length;
2567 obj_request->bio_list =
2568 bio_chain_clone_range(&bio_list,
2569 &bio_offset,
2570 clone_size,
2571 GFP_NOIO);
2572 if (!obj_request->bio_list)
2573 goto out_unwind;
2574 } else if (type == OBJ_REQUEST_PAGES) {
2575 unsigned int page_count;
2577 obj_request->pages = pages;
2578 page_count = (u32)calc_pages_for(offset, length);
2579 obj_request->page_count = page_count;
2580 if ((offset + length) & ~PAGE_MASK)
2581 page_count--; /* more on last page */
2582 pages += page_count;
2585 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2586 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2587 obj_request);
2588 if (!osd_req)
2589 goto out_unwind;
2591 obj_request->osd_req = osd_req;
2592 obj_request->callback = rbd_img_obj_callback;
2593 obj_request->img_offset = img_offset;
2595 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2597 img_offset += length;
2598 resid -= length;
2601 return 0;
2603 out_unwind:
2604 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2605 rbd_img_obj_request_del(img_request, obj_request);
2607 return -ENOMEM;
2610 static void
2611 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2613 struct rbd_img_request *img_request;
2614 struct rbd_device *rbd_dev;
2615 struct page **pages;
2616 u32 page_count;
2618 dout("%s: obj %p\n", __func__, obj_request);
2620 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2621 obj_request->type == OBJ_REQUEST_NODATA);
2622 rbd_assert(obj_request_img_data_test(obj_request));
2623 img_request = obj_request->img_request;
2624 rbd_assert(img_request);
2626 rbd_dev = img_request->rbd_dev;
2627 rbd_assert(rbd_dev);
2629 pages = obj_request->copyup_pages;
2630 rbd_assert(pages != NULL);
2631 obj_request->copyup_pages = NULL;
2632 page_count = obj_request->copyup_page_count;
2633 rbd_assert(page_count);
2634 obj_request->copyup_page_count = 0;
2635 ceph_release_page_vector(pages, page_count);
2638 * We want the transfer count to reflect the size of the
2639 * original write request. There is no such thing as a
2640 * successful short write, so if the request was successful
2641 * we can just set it to the originally-requested length.
2643 if (!obj_request->result)
2644 obj_request->xferred = obj_request->length;
2646 obj_request_done_set(obj_request);
2649 static void
2650 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2652 struct rbd_obj_request *orig_request;
2653 struct ceph_osd_request *osd_req;
2654 struct rbd_device *rbd_dev;
2655 struct page **pages;
2656 enum obj_operation_type op_type;
2657 u32 page_count;
2658 int img_result;
2659 u64 parent_length;
2661 rbd_assert(img_request_child_test(img_request));
2663 /* First get what we need from the image request */
2665 pages = img_request->copyup_pages;
2666 rbd_assert(pages != NULL);
2667 img_request->copyup_pages = NULL;
2668 page_count = img_request->copyup_page_count;
2669 rbd_assert(page_count);
2670 img_request->copyup_page_count = 0;
2672 orig_request = img_request->obj_request;
2673 rbd_assert(orig_request != NULL);
2674 rbd_assert(obj_request_type_valid(orig_request->type));
2675 img_result = img_request->result;
2676 parent_length = img_request->length;
2677 rbd_assert(img_result || parent_length == img_request->xferred);
2678 rbd_img_request_put(img_request);
2680 rbd_assert(orig_request->img_request);
2681 rbd_dev = orig_request->img_request->rbd_dev;
2682 rbd_assert(rbd_dev);
2685 * If the overlap has become 0 (most likely because the
2686 * image has been flattened) we need to free the pages
2687 * and re-submit the original write request.
2689 if (!rbd_dev->parent_overlap) {
2690 ceph_release_page_vector(pages, page_count);
2691 rbd_obj_request_submit(orig_request);
2692 return;
2695 if (img_result)
2696 goto out_err;
2699 * The original osd request is of no use to use any more.
2700 * We need a new one that can hold the three ops in a copyup
2701 * request. Allocate the new copyup osd request for the
2702 * original request, and release the old one.
2704 img_result = -ENOMEM;
2705 osd_req = rbd_osd_req_create_copyup(orig_request);
2706 if (!osd_req)
2707 goto out_err;
2708 rbd_osd_req_destroy(orig_request->osd_req);
2709 orig_request->osd_req = osd_req;
2710 orig_request->copyup_pages = pages;
2711 orig_request->copyup_page_count = page_count;
2713 /* Initialize the copyup op */
2715 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2716 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2717 false, false);
2719 /* Add the other op(s) */
2721 op_type = rbd_img_request_op_type(orig_request->img_request);
2722 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2724 /* All set, send it off. */
2726 rbd_obj_request_submit(orig_request);
2727 return;
2729 out_err:
2730 ceph_release_page_vector(pages, page_count);
2731 rbd_obj_request_error(orig_request, img_result);
2735 * Read from the parent image the range of data that covers the
2736 * entire target of the given object request. This is used for
2737 * satisfying a layered image write request when the target of an
2738 * object request from the image request does not exist.
2740 * A page array big enough to hold the returned data is allocated
2741 * and supplied to rbd_img_request_fill() as the "data descriptor."
2742 * When the read completes, this page array will be transferred to
2743 * the original object request for the copyup operation.
2745 * If an error occurs, it is recorded as the result of the original
2746 * object request in rbd_img_obj_exists_callback().
2748 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2750 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2751 struct rbd_img_request *parent_request = NULL;
2752 u64 img_offset;
2753 u64 length;
2754 struct page **pages = NULL;
2755 u32 page_count;
2756 int result;
2758 rbd_assert(rbd_dev->parent != NULL);
2761 * Determine the byte range covered by the object in the
2762 * child image to which the original request was to be sent.
2764 img_offset = obj_request->img_offset - obj_request->offset;
2765 length = (u64)1 << rbd_dev->header.obj_order;
2768 * There is no defined parent data beyond the parent
2769 * overlap, so limit what we read at that boundary if
2770 * necessary.
2772 if (img_offset + length > rbd_dev->parent_overlap) {
2773 rbd_assert(img_offset < rbd_dev->parent_overlap);
2774 length = rbd_dev->parent_overlap - img_offset;
2778 * Allocate a page array big enough to receive the data read
2779 * from the parent.
2781 page_count = (u32)calc_pages_for(0, length);
2782 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2783 if (IS_ERR(pages)) {
2784 result = PTR_ERR(pages);
2785 pages = NULL;
2786 goto out_err;
2789 result = -ENOMEM;
2790 parent_request = rbd_parent_request_create(obj_request,
2791 img_offset, length);
2792 if (!parent_request)
2793 goto out_err;
2795 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2796 if (result)
2797 goto out_err;
2799 parent_request->copyup_pages = pages;
2800 parent_request->copyup_page_count = page_count;
2801 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2803 result = rbd_img_request_submit(parent_request);
2804 if (!result)
2805 return 0;
2807 parent_request->copyup_pages = NULL;
2808 parent_request->copyup_page_count = 0;
2809 parent_request->obj_request = NULL;
2810 rbd_obj_request_put(obj_request);
2811 out_err:
2812 if (pages)
2813 ceph_release_page_vector(pages, page_count);
2814 if (parent_request)
2815 rbd_img_request_put(parent_request);
2816 return result;
2819 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2821 struct rbd_obj_request *orig_request;
2822 struct rbd_device *rbd_dev;
2823 int result;
2825 rbd_assert(!obj_request_img_data_test(obj_request));
2828 * All we need from the object request is the original
2829 * request and the result of the STAT op. Grab those, then
2830 * we're done with the request.
2832 orig_request = obj_request->obj_request;
2833 obj_request->obj_request = NULL;
2834 rbd_obj_request_put(orig_request);
2835 rbd_assert(orig_request);
2836 rbd_assert(orig_request->img_request);
2838 result = obj_request->result;
2839 obj_request->result = 0;
2841 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2842 obj_request, orig_request, result,
2843 obj_request->xferred, obj_request->length);
2844 rbd_obj_request_put(obj_request);
2847 * If the overlap has become 0 (most likely because the
2848 * image has been flattened) we need to re-submit the
2849 * original request.
2851 rbd_dev = orig_request->img_request->rbd_dev;
2852 if (!rbd_dev->parent_overlap) {
2853 rbd_obj_request_submit(orig_request);
2854 return;
2858 * Our only purpose here is to determine whether the object
2859 * exists, and we don't want to treat the non-existence as
2860 * an error. If something else comes back, transfer the
2861 * error to the original request and complete it now.
2863 if (!result) {
2864 obj_request_existence_set(orig_request, true);
2865 } else if (result == -ENOENT) {
2866 obj_request_existence_set(orig_request, false);
2867 } else {
2868 goto fail_orig_request;
2872 * Resubmit the original request now that we have recorded
2873 * whether the target object exists.
2875 result = rbd_img_obj_request_submit(orig_request);
2876 if (result)
2877 goto fail_orig_request;
2879 return;
2881 fail_orig_request:
2882 rbd_obj_request_error(orig_request, result);
2885 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2887 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2888 struct rbd_obj_request *stat_request;
2889 struct page **pages;
2890 u32 page_count;
2891 size_t size;
2892 int ret;
2894 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2895 OBJ_REQUEST_PAGES);
2896 if (!stat_request)
2897 return -ENOMEM;
2899 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2900 stat_request);
2901 if (!stat_request->osd_req) {
2902 ret = -ENOMEM;
2903 goto fail_stat_request;
2907 * The response data for a STAT call consists of:
2908 * le64 length;
2909 * struct {
2910 * le32 tv_sec;
2911 * le32 tv_nsec;
2912 * } mtime;
2914 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2915 page_count = (u32)calc_pages_for(0, size);
2916 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2917 if (IS_ERR(pages)) {
2918 ret = PTR_ERR(pages);
2919 goto fail_stat_request;
2922 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2923 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2924 false, false);
2926 rbd_obj_request_get(obj_request);
2927 stat_request->obj_request = obj_request;
2928 stat_request->pages = pages;
2929 stat_request->page_count = page_count;
2930 stat_request->callback = rbd_img_obj_exists_callback;
2932 rbd_obj_request_submit(stat_request);
2933 return 0;
2935 fail_stat_request:
2936 rbd_obj_request_put(stat_request);
2937 return ret;
2940 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2942 struct rbd_img_request *img_request = obj_request->img_request;
2943 struct rbd_device *rbd_dev = img_request->rbd_dev;
2945 /* Reads */
2946 if (!img_request_write_test(img_request) &&
2947 !img_request_discard_test(img_request))
2948 return true;
2950 /* Non-layered writes */
2951 if (!img_request_layered_test(img_request))
2952 return true;
2955 * Layered writes outside of the parent overlap range don't
2956 * share any data with the parent.
2958 if (!obj_request_overlaps_parent(obj_request))
2959 return true;
2962 * Entire-object layered writes - we will overwrite whatever
2963 * parent data there is anyway.
2965 if (!obj_request->offset &&
2966 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2967 return true;
2970 * If the object is known to already exist, its parent data has
2971 * already been copied.
2973 if (obj_request_known_test(obj_request) &&
2974 obj_request_exists_test(obj_request))
2975 return true;
2977 return false;
2980 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2982 rbd_assert(obj_request_img_data_test(obj_request));
2983 rbd_assert(obj_request_type_valid(obj_request->type));
2984 rbd_assert(obj_request->img_request);
2986 if (img_obj_request_simple(obj_request)) {
2987 rbd_obj_request_submit(obj_request);
2988 return 0;
2992 * It's a layered write. The target object might exist but
2993 * we may not know that yet. If we know it doesn't exist,
2994 * start by reading the data for the full target object from
2995 * the parent so we can use it for a copyup to the target.
2997 if (obj_request_known_test(obj_request))
2998 return rbd_img_obj_parent_read_full(obj_request);
3000 /* We don't know whether the target exists. Go find out. */
3002 return rbd_img_obj_exists_submit(obj_request);
3005 static int rbd_img_request_submit(struct rbd_img_request *img_request)
3007 struct rbd_obj_request *obj_request;
3008 struct rbd_obj_request *next_obj_request;
3009 int ret = 0;
3011 dout("%s: img %p\n", __func__, img_request);
3013 rbd_img_request_get(img_request);
3014 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
3015 ret = rbd_img_obj_request_submit(obj_request);
3016 if (ret)
3017 goto out_put_ireq;
3020 out_put_ireq:
3021 rbd_img_request_put(img_request);
3022 return ret;
3025 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
3027 struct rbd_obj_request *obj_request;
3028 struct rbd_device *rbd_dev;
3029 u64 obj_end;
3030 u64 img_xferred;
3031 int img_result;
3033 rbd_assert(img_request_child_test(img_request));
3035 /* First get what we need from the image request and release it */
3037 obj_request = img_request->obj_request;
3038 img_xferred = img_request->xferred;
3039 img_result = img_request->result;
3040 rbd_img_request_put(img_request);
3043 * If the overlap has become 0 (most likely because the
3044 * image has been flattened) we need to re-submit the
3045 * original request.
3047 rbd_assert(obj_request);
3048 rbd_assert(obj_request->img_request);
3049 rbd_dev = obj_request->img_request->rbd_dev;
3050 if (!rbd_dev->parent_overlap) {
3051 rbd_obj_request_submit(obj_request);
3052 return;
3055 obj_request->result = img_result;
3056 if (obj_request->result)
3057 goto out;
3060 * We need to zero anything beyond the parent overlap
3061 * boundary. Since rbd_img_obj_request_read_callback()
3062 * will zero anything beyond the end of a short read, an
3063 * easy way to do this is to pretend the data from the
3064 * parent came up short--ending at the overlap boundary.
3066 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3067 obj_end = obj_request->img_offset + obj_request->length;
3068 if (obj_end > rbd_dev->parent_overlap) {
3069 u64 xferred = 0;
3071 if (obj_request->img_offset < rbd_dev->parent_overlap)
3072 xferred = rbd_dev->parent_overlap -
3073 obj_request->img_offset;
3075 obj_request->xferred = min(img_xferred, xferred);
3076 } else {
3077 obj_request->xferred = img_xferred;
3079 out:
3080 rbd_img_obj_request_read_callback(obj_request);
3081 rbd_obj_request_complete(obj_request);
3084 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3086 struct rbd_img_request *img_request;
3087 int result;
3089 rbd_assert(obj_request_img_data_test(obj_request));
3090 rbd_assert(obj_request->img_request != NULL);
3091 rbd_assert(obj_request->result == (s32) -ENOENT);
3092 rbd_assert(obj_request_type_valid(obj_request->type));
3094 /* rbd_read_finish(obj_request, obj_request->length); */
3095 img_request = rbd_parent_request_create(obj_request,
3096 obj_request->img_offset,
3097 obj_request->length);
3098 result = -ENOMEM;
3099 if (!img_request)
3100 goto out_err;
3102 if (obj_request->type == OBJ_REQUEST_BIO)
3103 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3104 obj_request->bio_list);
3105 else
3106 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3107 obj_request->pages);
3108 if (result)
3109 goto out_err;
3111 img_request->callback = rbd_img_parent_read_callback;
3112 result = rbd_img_request_submit(img_request);
3113 if (result)
3114 goto out_err;
3116 return;
3117 out_err:
3118 if (img_request)
3119 rbd_img_request_put(img_request);
3120 obj_request->result = result;
3121 obj_request->xferred = 0;
3122 obj_request_done_set(obj_request);
3125 static const struct rbd_client_id rbd_empty_cid;
3127 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3128 const struct rbd_client_id *rhs)
3130 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3133 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3135 struct rbd_client_id cid;
3137 mutex_lock(&rbd_dev->watch_mutex);
3138 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3139 cid.handle = rbd_dev->watch_cookie;
3140 mutex_unlock(&rbd_dev->watch_mutex);
3141 return cid;
3145 * lock_rwsem must be held for write
3147 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3148 const struct rbd_client_id *cid)
3150 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3151 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3152 cid->gid, cid->handle);
3153 rbd_dev->owner_cid = *cid; /* struct */
3156 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3158 mutex_lock(&rbd_dev->watch_mutex);
3159 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3160 mutex_unlock(&rbd_dev->watch_mutex);
3164 * lock_rwsem must be held for write
3166 static int rbd_lock(struct rbd_device *rbd_dev)
3168 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3169 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3170 char cookie[32];
3171 int ret;
3173 WARN_ON(__rbd_is_lock_owner(rbd_dev));
3175 format_lock_cookie(rbd_dev, cookie);
3176 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3177 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3178 RBD_LOCK_TAG, "", 0);
3179 if (ret)
3180 return ret;
3182 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3183 rbd_set_owner_cid(rbd_dev, &cid);
3184 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3185 return 0;
3189 * lock_rwsem must be held for write
3191 static int rbd_unlock(struct rbd_device *rbd_dev)
3193 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3194 char cookie[32];
3195 int ret;
3197 WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3199 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3201 format_lock_cookie(rbd_dev, cookie);
3202 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3203 RBD_LOCK_NAME, cookie);
3204 if (ret && ret != -ENOENT) {
3205 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3206 return ret;
3209 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3210 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3211 return 0;
3214 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3215 enum rbd_notify_op notify_op,
3216 struct page ***preply_pages,
3217 size_t *preply_len)
3219 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3220 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3221 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3222 char buf[buf_size];
3223 void *p = buf;
3225 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3227 /* encode *LockPayload NotifyMessage (op + ClientId) */
3228 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3229 ceph_encode_32(&p, notify_op);
3230 ceph_encode_64(&p, cid.gid);
3231 ceph_encode_64(&p, cid.handle);
3233 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3234 &rbd_dev->header_oloc, buf, buf_size,
3235 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3238 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3239 enum rbd_notify_op notify_op)
3241 struct page **reply_pages;
3242 size_t reply_len;
3244 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3245 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3248 static void rbd_notify_acquired_lock(struct work_struct *work)
3250 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3251 acquired_lock_work);
3253 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3256 static void rbd_notify_released_lock(struct work_struct *work)
3258 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3259 released_lock_work);
3261 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3264 static int rbd_request_lock(struct rbd_device *rbd_dev)
3266 struct page **reply_pages;
3267 size_t reply_len;
3268 bool lock_owner_responded = false;
3269 int ret;
3271 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3273 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3274 &reply_pages, &reply_len);
3275 if (ret && ret != -ETIMEDOUT) {
3276 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3277 goto out;
3280 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3281 void *p = page_address(reply_pages[0]);
3282 void *const end = p + reply_len;
3283 u32 n;
3285 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3286 while (n--) {
3287 u8 struct_v;
3288 u32 len;
3290 ceph_decode_need(&p, end, 8 + 8, e_inval);
3291 p += 8 + 8; /* skip gid and cookie */
3293 ceph_decode_32_safe(&p, end, len, e_inval);
3294 if (!len)
3295 continue;
3297 if (lock_owner_responded) {
3298 rbd_warn(rbd_dev,
3299 "duplicate lock owners detected");
3300 ret = -EIO;
3301 goto out;
3304 lock_owner_responded = true;
3305 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3306 &struct_v, &len);
3307 if (ret) {
3308 rbd_warn(rbd_dev,
3309 "failed to decode ResponseMessage: %d",
3310 ret);
3311 goto e_inval;
3314 ret = ceph_decode_32(&p);
3318 if (!lock_owner_responded) {
3319 rbd_warn(rbd_dev, "no lock owners detected");
3320 ret = -ETIMEDOUT;
3323 out:
3324 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3325 return ret;
3327 e_inval:
3328 ret = -EINVAL;
3329 goto out;
3332 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3334 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3336 cancel_delayed_work(&rbd_dev->lock_dwork);
3337 if (wake_all)
3338 wake_up_all(&rbd_dev->lock_waitq);
3339 else
3340 wake_up(&rbd_dev->lock_waitq);
3343 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3344 struct ceph_locker **lockers, u32 *num_lockers)
3346 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3347 u8 lock_type;
3348 char *lock_tag;
3349 int ret;
3351 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3353 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3354 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3355 &lock_type, &lock_tag, lockers, num_lockers);
3356 if (ret)
3357 return ret;
3359 if (*num_lockers == 0) {
3360 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3361 goto out;
3364 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3365 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3366 lock_tag);
3367 ret = -EBUSY;
3368 goto out;
3371 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3372 rbd_warn(rbd_dev, "shared lock type detected");
3373 ret = -EBUSY;
3374 goto out;
3377 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3378 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3379 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3380 (*lockers)[0].id.cookie);
3381 ret = -EBUSY;
3382 goto out;
3385 out:
3386 kfree(lock_tag);
3387 return ret;
3390 static int find_watcher(struct rbd_device *rbd_dev,
3391 const struct ceph_locker *locker)
3393 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3394 struct ceph_watch_item *watchers;
3395 u32 num_watchers;
3396 u64 cookie;
3397 int i;
3398 int ret;
3400 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3401 &rbd_dev->header_oloc, &watchers,
3402 &num_watchers);
3403 if (ret)
3404 return ret;
3406 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3407 for (i = 0; i < num_watchers; i++) {
3408 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3409 sizeof(locker->info.addr)) &&
3410 watchers[i].cookie == cookie) {
3411 struct rbd_client_id cid = {
3412 .gid = le64_to_cpu(watchers[i].name.num),
3413 .handle = cookie,
3416 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3417 rbd_dev, cid.gid, cid.handle);
3418 rbd_set_owner_cid(rbd_dev, &cid);
3419 ret = 1;
3420 goto out;
3424 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3425 ret = 0;
3426 out:
3427 kfree(watchers);
3428 return ret;
3432 * lock_rwsem must be held for write
3434 static int rbd_try_lock(struct rbd_device *rbd_dev)
3436 struct ceph_client *client = rbd_dev->rbd_client->client;
3437 struct ceph_locker *lockers;
3438 u32 num_lockers;
3439 int ret;
3441 for (;;) {
3442 ret = rbd_lock(rbd_dev);
3443 if (ret != -EBUSY)
3444 return ret;
3446 /* determine if the current lock holder is still alive */
3447 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3448 if (ret)
3449 return ret;
3451 if (num_lockers == 0)
3452 goto again;
3454 ret = find_watcher(rbd_dev, lockers);
3455 if (ret) {
3456 if (ret > 0)
3457 ret = 0; /* have to request lock */
3458 goto out;
3461 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3462 ENTITY_NAME(lockers[0].id.name));
3464 ret = ceph_monc_blacklist_add(&client->monc,
3465 &lockers[0].info.addr);
3466 if (ret) {
3467 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3468 ENTITY_NAME(lockers[0].id.name), ret);
3469 goto out;
3472 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3473 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3474 lockers[0].id.cookie,
3475 &lockers[0].id.name);
3476 if (ret && ret != -ENOENT)
3477 goto out;
3479 again:
3480 ceph_free_lockers(lockers, num_lockers);
3483 out:
3484 ceph_free_lockers(lockers, num_lockers);
3485 return ret;
3489 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3491 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3492 int *pret)
3494 enum rbd_lock_state lock_state;
3496 down_read(&rbd_dev->lock_rwsem);
3497 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3498 rbd_dev->lock_state);
3499 if (__rbd_is_lock_owner(rbd_dev)) {
3500 lock_state = rbd_dev->lock_state;
3501 up_read(&rbd_dev->lock_rwsem);
3502 return lock_state;
3505 up_read(&rbd_dev->lock_rwsem);
3506 down_write(&rbd_dev->lock_rwsem);
3507 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3508 rbd_dev->lock_state);
3509 if (!__rbd_is_lock_owner(rbd_dev)) {
3510 *pret = rbd_try_lock(rbd_dev);
3511 if (*pret)
3512 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3515 lock_state = rbd_dev->lock_state;
3516 up_write(&rbd_dev->lock_rwsem);
3517 return lock_state;
3520 static void rbd_acquire_lock(struct work_struct *work)
3522 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3523 struct rbd_device, lock_dwork);
3524 enum rbd_lock_state lock_state;
3525 int ret;
3527 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3528 again:
3529 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3530 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3531 if (lock_state == RBD_LOCK_STATE_LOCKED)
3532 wake_requests(rbd_dev, true);
3533 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3534 rbd_dev, lock_state, ret);
3535 return;
3538 ret = rbd_request_lock(rbd_dev);
3539 if (ret == -ETIMEDOUT) {
3540 goto again; /* treat this as a dead client */
3541 } else if (ret < 0) {
3542 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3543 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3544 RBD_RETRY_DELAY);
3545 } else {
3547 * lock owner acked, but resend if we don't see them
3548 * release the lock
3550 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3551 rbd_dev);
3552 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3553 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3558 * lock_rwsem must be held for write
3560 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3562 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3563 rbd_dev->lock_state);
3564 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3565 return false;
3567 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3568 downgrade_write(&rbd_dev->lock_rwsem);
3570 * Ensure that all in-flight IO is flushed.
3572 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3573 * may be shared with other devices.
3575 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3576 up_read(&rbd_dev->lock_rwsem);
3578 down_write(&rbd_dev->lock_rwsem);
3579 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3580 rbd_dev->lock_state);
3581 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3582 return false;
3584 if (!rbd_unlock(rbd_dev))
3586 * Give others a chance to grab the lock - we would re-acquire
3587 * almost immediately if we got new IO during ceph_osdc_sync()
3588 * otherwise. We need to ack our own notifications, so this
3589 * lock_dwork will be requeued from rbd_wait_state_locked()
3590 * after wake_requests() in rbd_handle_released_lock().
3592 cancel_delayed_work(&rbd_dev->lock_dwork);
3594 return true;
3597 static void rbd_release_lock_work(struct work_struct *work)
3599 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3600 unlock_work);
3602 down_write(&rbd_dev->lock_rwsem);
3603 rbd_release_lock(rbd_dev);
3604 up_write(&rbd_dev->lock_rwsem);
3607 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3608 void **p)
3610 struct rbd_client_id cid = { 0 };
3612 if (struct_v >= 2) {
3613 cid.gid = ceph_decode_64(p);
3614 cid.handle = ceph_decode_64(p);
3617 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3618 cid.handle);
3619 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3620 down_write(&rbd_dev->lock_rwsem);
3621 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3623 * we already know that the remote client is
3624 * the owner
3626 up_write(&rbd_dev->lock_rwsem);
3627 return;
3630 rbd_set_owner_cid(rbd_dev, &cid);
3631 downgrade_write(&rbd_dev->lock_rwsem);
3632 } else {
3633 down_read(&rbd_dev->lock_rwsem);
3636 if (!__rbd_is_lock_owner(rbd_dev))
3637 wake_requests(rbd_dev, false);
3638 up_read(&rbd_dev->lock_rwsem);
3641 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3642 void **p)
3644 struct rbd_client_id cid = { 0 };
3646 if (struct_v >= 2) {
3647 cid.gid = ceph_decode_64(p);
3648 cid.handle = ceph_decode_64(p);
3651 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3652 cid.handle);
3653 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3654 down_write(&rbd_dev->lock_rwsem);
3655 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3656 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3657 __func__, rbd_dev, cid.gid, cid.handle,
3658 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3659 up_write(&rbd_dev->lock_rwsem);
3660 return;
3663 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3664 downgrade_write(&rbd_dev->lock_rwsem);
3665 } else {
3666 down_read(&rbd_dev->lock_rwsem);
3669 if (!__rbd_is_lock_owner(rbd_dev))
3670 wake_requests(rbd_dev, false);
3671 up_read(&rbd_dev->lock_rwsem);
3674 static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3675 void **p)
3677 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3678 struct rbd_client_id cid = { 0 };
3679 bool need_to_send;
3681 if (struct_v >= 2) {
3682 cid.gid = ceph_decode_64(p);
3683 cid.handle = ceph_decode_64(p);
3686 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3687 cid.handle);
3688 if (rbd_cid_equal(&cid, &my_cid))
3689 return false;
3691 down_read(&rbd_dev->lock_rwsem);
3692 need_to_send = __rbd_is_lock_owner(rbd_dev);
3693 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3694 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3695 dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3696 rbd_dev);
3697 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3700 up_read(&rbd_dev->lock_rwsem);
3701 return need_to_send;
3704 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3705 u64 notify_id, u64 cookie, s32 *result)
3707 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3708 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3709 char buf[buf_size];
3710 int ret;
3712 if (result) {
3713 void *p = buf;
3715 /* encode ResponseMessage */
3716 ceph_start_encoding(&p, 1, 1,
3717 buf_size - CEPH_ENCODING_START_BLK_LEN);
3718 ceph_encode_32(&p, *result);
3719 } else {
3720 buf_size = 0;
3723 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3724 &rbd_dev->header_oloc, notify_id, cookie,
3725 buf, buf_size);
3726 if (ret)
3727 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3730 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3731 u64 cookie)
3733 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3734 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3737 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3738 u64 notify_id, u64 cookie, s32 result)
3740 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3741 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3744 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3745 u64 notifier_id, void *data, size_t data_len)
3747 struct rbd_device *rbd_dev = arg;
3748 void *p = data;
3749 void *const end = p + data_len;
3750 u8 struct_v = 0;
3751 u32 len;
3752 u32 notify_op;
3753 int ret;
3755 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3756 __func__, rbd_dev, cookie, notify_id, data_len);
3757 if (data_len) {
3758 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3759 &struct_v, &len);
3760 if (ret) {
3761 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3762 ret);
3763 return;
3766 notify_op = ceph_decode_32(&p);
3767 } else {
3768 /* legacy notification for header updates */
3769 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3770 len = 0;
3773 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3774 switch (notify_op) {
3775 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3776 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3777 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3778 break;
3779 case RBD_NOTIFY_OP_RELEASED_LOCK:
3780 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3781 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3782 break;
3783 case RBD_NOTIFY_OP_REQUEST_LOCK:
3784 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3786 * send ResponseMessage(0) back so the client
3787 * can detect a missing owner
3789 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3790 cookie, 0);
3791 else
3792 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3793 break;
3794 case RBD_NOTIFY_OP_HEADER_UPDATE:
3795 ret = rbd_dev_refresh(rbd_dev);
3796 if (ret)
3797 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3799 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3800 break;
3801 default:
3802 if (rbd_is_lock_owner(rbd_dev))
3803 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3804 cookie, -EOPNOTSUPP);
3805 else
3806 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3807 break;
3811 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3813 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3815 struct rbd_device *rbd_dev = arg;
3817 rbd_warn(rbd_dev, "encountered watch error: %d", err);
3819 down_write(&rbd_dev->lock_rwsem);
3820 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3821 up_write(&rbd_dev->lock_rwsem);
3823 mutex_lock(&rbd_dev->watch_mutex);
3824 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3825 __rbd_unregister_watch(rbd_dev);
3826 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3828 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3830 mutex_unlock(&rbd_dev->watch_mutex);
3834 * watch_mutex must be locked
3836 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3838 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3839 struct ceph_osd_linger_request *handle;
3841 rbd_assert(!rbd_dev->watch_handle);
3842 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3844 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3845 &rbd_dev->header_oloc, rbd_watch_cb,
3846 rbd_watch_errcb, rbd_dev);
3847 if (IS_ERR(handle))
3848 return PTR_ERR(handle);
3850 rbd_dev->watch_handle = handle;
3851 return 0;
3855 * watch_mutex must be locked
3857 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3859 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3860 int ret;
3862 rbd_assert(rbd_dev->watch_handle);
3863 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3865 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3866 if (ret)
3867 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3869 rbd_dev->watch_handle = NULL;
3872 static int rbd_register_watch(struct rbd_device *rbd_dev)
3874 int ret;
3876 mutex_lock(&rbd_dev->watch_mutex);
3877 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3878 ret = __rbd_register_watch(rbd_dev);
3879 if (ret)
3880 goto out;
3882 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3883 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3885 out:
3886 mutex_unlock(&rbd_dev->watch_mutex);
3887 return ret;
3890 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3892 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3894 cancel_work_sync(&rbd_dev->acquired_lock_work);
3895 cancel_work_sync(&rbd_dev->released_lock_work);
3896 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3897 cancel_work_sync(&rbd_dev->unlock_work);
3900 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3902 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3903 cancel_tasks_sync(rbd_dev);
3905 mutex_lock(&rbd_dev->watch_mutex);
3906 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3907 __rbd_unregister_watch(rbd_dev);
3908 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3909 mutex_unlock(&rbd_dev->watch_mutex);
3911 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3912 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3915 static void rbd_reregister_watch(struct work_struct *work)
3917 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3918 struct rbd_device, watch_dwork);
3919 bool was_lock_owner = false;
3920 bool need_to_wake = false;
3921 int ret;
3923 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3925 down_write(&rbd_dev->lock_rwsem);
3926 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3927 was_lock_owner = rbd_release_lock(rbd_dev);
3929 mutex_lock(&rbd_dev->watch_mutex);
3930 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3931 mutex_unlock(&rbd_dev->watch_mutex);
3932 goto out;
3935 ret = __rbd_register_watch(rbd_dev);
3936 if (ret) {
3937 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3938 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3939 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3940 need_to_wake = true;
3941 } else {
3942 queue_delayed_work(rbd_dev->task_wq,
3943 &rbd_dev->watch_dwork,
3944 RBD_RETRY_DELAY);
3946 mutex_unlock(&rbd_dev->watch_mutex);
3947 goto out;
3950 need_to_wake = true;
3951 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3952 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3953 mutex_unlock(&rbd_dev->watch_mutex);
3955 ret = rbd_dev_refresh(rbd_dev);
3956 if (ret)
3957 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3959 if (was_lock_owner) {
3960 ret = rbd_try_lock(rbd_dev);
3961 if (ret)
3962 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3963 ret);
3966 out:
3967 up_write(&rbd_dev->lock_rwsem);
3968 if (need_to_wake)
3969 wake_requests(rbd_dev, true);
3973 * Synchronous osd object method call. Returns the number of bytes
3974 * returned in the outbound buffer, or a negative error code.
3976 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3977 const char *object_name,
3978 const char *class_name,
3979 const char *method_name,
3980 const void *outbound,
3981 size_t outbound_size,
3982 void *inbound,
3983 size_t inbound_size)
3985 struct rbd_obj_request *obj_request;
3986 struct page **pages;
3987 u32 page_count;
3988 int ret;
3991 * Method calls are ultimately read operations. The result
3992 * should placed into the inbound buffer provided. They
3993 * also supply outbound data--parameters for the object
3994 * method. Currently if this is present it will be a
3995 * snapshot id.
3997 page_count = (u32)calc_pages_for(0, inbound_size);
3998 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3999 if (IS_ERR(pages))
4000 return PTR_ERR(pages);
4002 ret = -ENOMEM;
4003 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
4004 OBJ_REQUEST_PAGES);
4005 if (!obj_request)
4006 goto out;
4008 obj_request->pages = pages;
4009 obj_request->page_count = page_count;
4011 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4012 obj_request);
4013 if (!obj_request->osd_req)
4014 goto out;
4016 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
4017 class_name, method_name);
4018 if (outbound_size) {
4019 struct ceph_pagelist *pagelist;
4021 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
4022 if (!pagelist)
4023 goto out;
4025 ceph_pagelist_init(pagelist);
4026 ceph_pagelist_append(pagelist, outbound, outbound_size);
4027 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
4028 pagelist);
4030 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
4031 obj_request->pages, inbound_size,
4032 0, false, false);
4034 rbd_obj_request_submit(obj_request);
4035 ret = rbd_obj_request_wait(obj_request);
4036 if (ret)
4037 goto out;
4039 ret = obj_request->result;
4040 if (ret < 0)
4041 goto out;
4043 rbd_assert(obj_request->xferred < (u64)INT_MAX);
4044 ret = (int)obj_request->xferred;
4045 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
4046 out:
4047 if (obj_request)
4048 rbd_obj_request_put(obj_request);
4049 else
4050 ceph_release_page_vector(pages, page_count);
4052 return ret;
4056 * lock_rwsem must be held for read
4058 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4060 DEFINE_WAIT(wait);
4062 do {
4064 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4065 * and cancel_delayed_work() in wake_requests().
4067 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4068 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4069 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4070 TASK_UNINTERRUPTIBLE);
4071 up_read(&rbd_dev->lock_rwsem);
4072 schedule();
4073 down_read(&rbd_dev->lock_rwsem);
4074 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4075 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4077 finish_wait(&rbd_dev->lock_waitq, &wait);
4080 static void rbd_queue_workfn(struct work_struct *work)
4082 struct request *rq = blk_mq_rq_from_pdu(work);
4083 struct rbd_device *rbd_dev = rq->q->queuedata;
4084 struct rbd_img_request *img_request;
4085 struct ceph_snap_context *snapc = NULL;
4086 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4087 u64 length = blk_rq_bytes(rq);
4088 enum obj_operation_type op_type;
4089 u64 mapping_size;
4090 bool must_be_locked;
4091 int result;
4093 if (rq->cmd_type != REQ_TYPE_FS) {
4094 dout("%s: non-fs request type %d\n", __func__,
4095 (int) rq->cmd_type);
4096 result = -EIO;
4097 goto err;
4100 if (req_op(rq) == REQ_OP_DISCARD)
4101 op_type = OBJ_OP_DISCARD;
4102 else if (req_op(rq) == REQ_OP_WRITE)
4103 op_type = OBJ_OP_WRITE;
4104 else
4105 op_type = OBJ_OP_READ;
4107 /* Ignore/skip any zero-length requests */
4109 if (!length) {
4110 dout("%s: zero-length request\n", __func__);
4111 result = 0;
4112 goto err_rq;
4115 /* Only reads are allowed to a read-only device */
4117 if (op_type != OBJ_OP_READ) {
4118 if (rbd_dev->mapping.read_only) {
4119 result = -EROFS;
4120 goto err_rq;
4122 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4126 * Quit early if the mapped snapshot no longer exists. It's
4127 * still possible the snapshot will have disappeared by the
4128 * time our request arrives at the osd, but there's no sense in
4129 * sending it if we already know.
4131 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4132 dout("request for non-existent snapshot");
4133 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4134 result = -ENXIO;
4135 goto err_rq;
4138 if (offset && length > U64_MAX - offset + 1) {
4139 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4140 length);
4141 result = -EINVAL;
4142 goto err_rq; /* Shouldn't happen */
4145 blk_mq_start_request(rq);
4147 down_read(&rbd_dev->header_rwsem);
4148 mapping_size = rbd_dev->mapping.size;
4149 if (op_type != OBJ_OP_READ) {
4150 snapc = rbd_dev->header.snapc;
4151 ceph_get_snap_context(snapc);
4152 must_be_locked = rbd_is_lock_supported(rbd_dev);
4153 } else {
4154 must_be_locked = rbd_dev->opts->lock_on_read &&
4155 rbd_is_lock_supported(rbd_dev);
4157 up_read(&rbd_dev->header_rwsem);
4159 if (offset + length > mapping_size) {
4160 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4161 length, mapping_size);
4162 result = -EIO;
4163 goto err_rq;
4166 if (must_be_locked) {
4167 down_read(&rbd_dev->lock_rwsem);
4168 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4169 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
4170 rbd_wait_state_locked(rbd_dev);
4172 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4173 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4174 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4175 result = -EBLACKLISTED;
4176 goto err_unlock;
4180 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4181 snapc);
4182 if (!img_request) {
4183 result = -ENOMEM;
4184 goto err_unlock;
4186 img_request->rq = rq;
4187 snapc = NULL; /* img_request consumes a ref */
4189 if (op_type == OBJ_OP_DISCARD)
4190 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4191 NULL);
4192 else
4193 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4194 rq->bio);
4195 if (result)
4196 goto err_img_request;
4198 result = rbd_img_request_submit(img_request);
4199 if (result)
4200 goto err_img_request;
4202 if (must_be_locked)
4203 up_read(&rbd_dev->lock_rwsem);
4204 return;
4206 err_img_request:
4207 rbd_img_request_put(img_request);
4208 err_unlock:
4209 if (must_be_locked)
4210 up_read(&rbd_dev->lock_rwsem);
4211 err_rq:
4212 if (result)
4213 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4214 obj_op_name(op_type), length, offset, result);
4215 ceph_put_snap_context(snapc);
4216 err:
4217 blk_mq_end_request(rq, result);
4220 static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4221 const struct blk_mq_queue_data *bd)
4223 struct request *rq = bd->rq;
4224 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4226 queue_work(rbd_wq, work);
4227 return BLK_MQ_RQ_QUEUE_OK;
4230 static void rbd_free_disk(struct rbd_device *rbd_dev)
4232 struct gendisk *disk = rbd_dev->disk;
4234 if (!disk)
4235 return;
4237 rbd_dev->disk = NULL;
4238 if (disk->flags & GENHD_FL_UP) {
4239 del_gendisk(disk);
4240 if (disk->queue)
4241 blk_cleanup_queue(disk->queue);
4242 blk_mq_free_tag_set(&rbd_dev->tag_set);
4244 put_disk(disk);
4247 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4248 const char *object_name,
4249 u64 offset, u64 length, void *buf)
4252 struct rbd_obj_request *obj_request;
4253 struct page **pages = NULL;
4254 u32 page_count;
4255 size_t size;
4256 int ret;
4258 page_count = (u32) calc_pages_for(offset, length);
4259 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4260 if (IS_ERR(pages))
4261 return PTR_ERR(pages);
4263 ret = -ENOMEM;
4264 obj_request = rbd_obj_request_create(object_name, offset, length,
4265 OBJ_REQUEST_PAGES);
4266 if (!obj_request)
4267 goto out;
4269 obj_request->pages = pages;
4270 obj_request->page_count = page_count;
4272 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
4273 obj_request);
4274 if (!obj_request->osd_req)
4275 goto out;
4277 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
4278 offset, length, 0, 0);
4279 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
4280 obj_request->pages,
4281 obj_request->length,
4282 obj_request->offset & ~PAGE_MASK,
4283 false, false);
4285 rbd_obj_request_submit(obj_request);
4286 ret = rbd_obj_request_wait(obj_request);
4287 if (ret)
4288 goto out;
4290 ret = obj_request->result;
4291 if (ret < 0)
4292 goto out;
4294 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
4295 size = (size_t) obj_request->xferred;
4296 ceph_copy_from_page_vector(pages, buf, 0, size);
4297 rbd_assert(size <= (size_t)INT_MAX);
4298 ret = (int)size;
4299 out:
4300 if (obj_request)
4301 rbd_obj_request_put(obj_request);
4302 else
4303 ceph_release_page_vector(pages, page_count);
4305 return ret;
4309 * Read the complete header for the given rbd device. On successful
4310 * return, the rbd_dev->header field will contain up-to-date
4311 * information about the image.
4313 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4315 struct rbd_image_header_ondisk *ondisk = NULL;
4316 u32 snap_count = 0;
4317 u64 names_size = 0;
4318 u32 want_count;
4319 int ret;
4322 * The complete header will include an array of its 64-bit
4323 * snapshot ids, followed by the names of those snapshots as
4324 * a contiguous block of NUL-terminated strings. Note that
4325 * the number of snapshots could change by the time we read
4326 * it in, in which case we re-read it.
4328 do {
4329 size_t size;
4331 kfree(ondisk);
4333 size = sizeof (*ondisk);
4334 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4335 size += names_size;
4336 ondisk = kmalloc(size, GFP_KERNEL);
4337 if (!ondisk)
4338 return -ENOMEM;
4340 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
4341 0, size, ondisk);
4342 if (ret < 0)
4343 goto out;
4344 if ((size_t)ret < size) {
4345 ret = -ENXIO;
4346 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4347 size, ret);
4348 goto out;
4350 if (!rbd_dev_ondisk_valid(ondisk)) {
4351 ret = -ENXIO;
4352 rbd_warn(rbd_dev, "invalid header");
4353 goto out;
4356 names_size = le64_to_cpu(ondisk->snap_names_len);
4357 want_count = snap_count;
4358 snap_count = le32_to_cpu(ondisk->snap_count);
4359 } while (snap_count != want_count);
4361 ret = rbd_header_from_disk(rbd_dev, ondisk);
4362 out:
4363 kfree(ondisk);
4365 return ret;
4369 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4370 * has disappeared from the (just updated) snapshot context.
4372 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4374 u64 snap_id;
4376 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4377 return;
4379 snap_id = rbd_dev->spec->snap_id;
4380 if (snap_id == CEPH_NOSNAP)
4381 return;
4383 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4384 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4387 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4389 sector_t size;
4392 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4393 * try to update its size. If REMOVING is set, updating size
4394 * is just useless work since the device can't be opened.
4396 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4397 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4398 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4399 dout("setting size to %llu sectors", (unsigned long long)size);
4400 set_capacity(rbd_dev->disk, size);
4401 revalidate_disk(rbd_dev->disk);
4405 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4407 u64 mapping_size;
4408 int ret;
4410 down_write(&rbd_dev->header_rwsem);
4411 mapping_size = rbd_dev->mapping.size;
4413 ret = rbd_dev_header_info(rbd_dev);
4414 if (ret)
4415 goto out;
4418 * If there is a parent, see if it has disappeared due to the
4419 * mapped image getting flattened.
4421 if (rbd_dev->parent) {
4422 ret = rbd_dev_v2_parent_info(rbd_dev);
4423 if (ret)
4424 goto out;
4427 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4428 rbd_dev->mapping.size = rbd_dev->header.image_size;
4429 } else {
4430 /* validate mapped snapshot's EXISTS flag */
4431 rbd_exists_validate(rbd_dev);
4434 out:
4435 up_write(&rbd_dev->header_rwsem);
4436 if (!ret && mapping_size != rbd_dev->mapping.size)
4437 rbd_dev_update_size(rbd_dev);
4439 return ret;
4442 static int rbd_init_request(void *data, struct request *rq,
4443 unsigned int hctx_idx, unsigned int request_idx,
4444 unsigned int numa_node)
4446 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4448 INIT_WORK(work, rbd_queue_workfn);
4449 return 0;
4452 static struct blk_mq_ops rbd_mq_ops = {
4453 .queue_rq = rbd_queue_rq,
4454 .init_request = rbd_init_request,
4457 static int rbd_init_disk(struct rbd_device *rbd_dev)
4459 struct gendisk *disk;
4460 struct request_queue *q;
4461 u64 segment_size;
4462 int err;
4464 /* create gendisk info */
4465 disk = alloc_disk(single_major ?
4466 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4467 RBD_MINORS_PER_MAJOR);
4468 if (!disk)
4469 return -ENOMEM;
4471 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4472 rbd_dev->dev_id);
4473 disk->major = rbd_dev->major;
4474 disk->first_minor = rbd_dev->minor;
4475 if (single_major)
4476 disk->flags |= GENHD_FL_EXT_DEVT;
4477 disk->fops = &rbd_bd_ops;
4478 disk->private_data = rbd_dev;
4480 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4481 rbd_dev->tag_set.ops = &rbd_mq_ops;
4482 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4483 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4484 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4485 rbd_dev->tag_set.nr_hw_queues = 1;
4486 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4488 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4489 if (err)
4490 goto out_disk;
4492 q = blk_mq_init_queue(&rbd_dev->tag_set);
4493 if (IS_ERR(q)) {
4494 err = PTR_ERR(q);
4495 goto out_tag_set;
4498 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4499 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4501 /* set io sizes to object size */
4502 segment_size = rbd_obj_bytes(&rbd_dev->header);
4503 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4504 q->limits.max_sectors = queue_max_hw_sectors(q);
4505 blk_queue_max_segments(q, USHRT_MAX);
4506 blk_queue_max_segment_size(q, segment_size);
4507 blk_queue_io_min(q, segment_size);
4508 blk_queue_io_opt(q, segment_size);
4510 /* enable the discard support */
4511 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4512 q->limits.discard_granularity = segment_size;
4513 q->limits.discard_alignment = segment_size;
4514 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4515 q->limits.discard_zeroes_data = 1;
4517 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4518 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
4520 disk->queue = q;
4522 q->queuedata = rbd_dev;
4524 rbd_dev->disk = disk;
4526 return 0;
4527 out_tag_set:
4528 blk_mq_free_tag_set(&rbd_dev->tag_set);
4529 out_disk:
4530 put_disk(disk);
4531 return err;
4535 sysfs
4538 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4540 return container_of(dev, struct rbd_device, dev);
4543 static ssize_t rbd_size_show(struct device *dev,
4544 struct device_attribute *attr, char *buf)
4546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4548 return sprintf(buf, "%llu\n",
4549 (unsigned long long)rbd_dev->mapping.size);
4553 * Note this shows the features for whatever's mapped, which is not
4554 * necessarily the base image.
4556 static ssize_t rbd_features_show(struct device *dev,
4557 struct device_attribute *attr, char *buf)
4559 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4561 return sprintf(buf, "0x%016llx\n",
4562 (unsigned long long)rbd_dev->mapping.features);
4565 static ssize_t rbd_major_show(struct device *dev,
4566 struct device_attribute *attr, char *buf)
4568 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4570 if (rbd_dev->major)
4571 return sprintf(buf, "%d\n", rbd_dev->major);
4573 return sprintf(buf, "(none)\n");
4576 static ssize_t rbd_minor_show(struct device *dev,
4577 struct device_attribute *attr, char *buf)
4579 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4581 return sprintf(buf, "%d\n", rbd_dev->minor);
4584 static ssize_t rbd_client_addr_show(struct device *dev,
4585 struct device_attribute *attr, char *buf)
4587 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4588 struct ceph_entity_addr *client_addr =
4589 ceph_client_addr(rbd_dev->rbd_client->client);
4591 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4592 le32_to_cpu(client_addr->nonce));
4595 static ssize_t rbd_client_id_show(struct device *dev,
4596 struct device_attribute *attr, char *buf)
4598 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4600 return sprintf(buf, "client%lld\n",
4601 ceph_client_gid(rbd_dev->rbd_client->client));
4604 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4605 struct device_attribute *attr, char *buf)
4607 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4609 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4612 static ssize_t rbd_config_info_show(struct device *dev,
4613 struct device_attribute *attr, char *buf)
4615 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4617 if (!capable(CAP_SYS_ADMIN))
4618 return -EPERM;
4620 return sprintf(buf, "%s\n", rbd_dev->config_info);
4623 static ssize_t rbd_pool_show(struct device *dev,
4624 struct device_attribute *attr, char *buf)
4626 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4628 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4631 static ssize_t rbd_pool_id_show(struct device *dev,
4632 struct device_attribute *attr, char *buf)
4634 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4636 return sprintf(buf, "%llu\n",
4637 (unsigned long long) rbd_dev->spec->pool_id);
4640 static ssize_t rbd_name_show(struct device *dev,
4641 struct device_attribute *attr, char *buf)
4643 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4645 if (rbd_dev->spec->image_name)
4646 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4648 return sprintf(buf, "(unknown)\n");
4651 static ssize_t rbd_image_id_show(struct device *dev,
4652 struct device_attribute *attr, char *buf)
4654 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4656 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4660 * Shows the name of the currently-mapped snapshot (or
4661 * RBD_SNAP_HEAD_NAME for the base image).
4663 static ssize_t rbd_snap_show(struct device *dev,
4664 struct device_attribute *attr,
4665 char *buf)
4667 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4669 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4672 static ssize_t rbd_snap_id_show(struct device *dev,
4673 struct device_attribute *attr, char *buf)
4675 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4677 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4681 * For a v2 image, shows the chain of parent images, separated by empty
4682 * lines. For v1 images or if there is no parent, shows "(no parent
4683 * image)".
4685 static ssize_t rbd_parent_show(struct device *dev,
4686 struct device_attribute *attr,
4687 char *buf)
4689 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4690 ssize_t count = 0;
4692 if (!rbd_dev->parent)
4693 return sprintf(buf, "(no parent image)\n");
4695 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4696 struct rbd_spec *spec = rbd_dev->parent_spec;
4698 count += sprintf(&buf[count], "%s"
4699 "pool_id %llu\npool_name %s\n"
4700 "image_id %s\nimage_name %s\n"
4701 "snap_id %llu\nsnap_name %s\n"
4702 "overlap %llu\n",
4703 !count ? "" : "\n", /* first? */
4704 spec->pool_id, spec->pool_name,
4705 spec->image_id, spec->image_name ?: "(unknown)",
4706 spec->snap_id, spec->snap_name,
4707 rbd_dev->parent_overlap);
4710 return count;
4713 static ssize_t rbd_image_refresh(struct device *dev,
4714 struct device_attribute *attr,
4715 const char *buf,
4716 size_t size)
4718 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4719 int ret;
4721 if (!capable(CAP_SYS_ADMIN))
4722 return -EPERM;
4724 ret = rbd_dev_refresh(rbd_dev);
4725 if (ret)
4726 return ret;
4728 return size;
4731 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4732 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4733 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4734 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4735 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4736 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4737 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4738 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4739 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4740 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4741 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4742 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4743 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4744 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4745 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4746 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4748 static struct attribute *rbd_attrs[] = {
4749 &dev_attr_size.attr,
4750 &dev_attr_features.attr,
4751 &dev_attr_major.attr,
4752 &dev_attr_minor.attr,
4753 &dev_attr_client_addr.attr,
4754 &dev_attr_client_id.attr,
4755 &dev_attr_cluster_fsid.attr,
4756 &dev_attr_config_info.attr,
4757 &dev_attr_pool.attr,
4758 &dev_attr_pool_id.attr,
4759 &dev_attr_name.attr,
4760 &dev_attr_image_id.attr,
4761 &dev_attr_current_snap.attr,
4762 &dev_attr_snap_id.attr,
4763 &dev_attr_parent.attr,
4764 &dev_attr_refresh.attr,
4765 NULL
4768 static struct attribute_group rbd_attr_group = {
4769 .attrs = rbd_attrs,
4772 static const struct attribute_group *rbd_attr_groups[] = {
4773 &rbd_attr_group,
4774 NULL
4777 static void rbd_dev_release(struct device *dev);
4779 static struct device_type rbd_device_type = {
4780 .name = "rbd",
4781 .groups = rbd_attr_groups,
4782 .release = rbd_dev_release,
4785 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4787 kref_get(&spec->kref);
4789 return spec;
4792 static void rbd_spec_free(struct kref *kref);
4793 static void rbd_spec_put(struct rbd_spec *spec)
4795 if (spec)
4796 kref_put(&spec->kref, rbd_spec_free);
4799 static struct rbd_spec *rbd_spec_alloc(void)
4801 struct rbd_spec *spec;
4803 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4804 if (!spec)
4805 return NULL;
4807 spec->pool_id = CEPH_NOPOOL;
4808 spec->snap_id = CEPH_NOSNAP;
4809 kref_init(&spec->kref);
4811 return spec;
4814 static void rbd_spec_free(struct kref *kref)
4816 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4818 kfree(spec->pool_name);
4819 kfree(spec->image_id);
4820 kfree(spec->image_name);
4821 kfree(spec->snap_name);
4822 kfree(spec);
4825 static void rbd_dev_free(struct rbd_device *rbd_dev)
4827 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4828 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4830 ceph_oid_destroy(&rbd_dev->header_oid);
4831 ceph_oloc_destroy(&rbd_dev->header_oloc);
4832 kfree(rbd_dev->config_info);
4834 rbd_put_client(rbd_dev->rbd_client);
4835 rbd_spec_put(rbd_dev->spec);
4836 kfree(rbd_dev->opts);
4837 kfree(rbd_dev);
4840 static void rbd_dev_release(struct device *dev)
4842 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4843 bool need_put = !!rbd_dev->opts;
4845 if (need_put) {
4846 destroy_workqueue(rbd_dev->task_wq);
4847 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4850 rbd_dev_free(rbd_dev);
4853 * This is racy, but way better than putting module outside of
4854 * the release callback. The race window is pretty small, so
4855 * doing something similar to dm (dm-builtin.c) is overkill.
4857 if (need_put)
4858 module_put(THIS_MODULE);
4861 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4862 struct rbd_spec *spec)
4864 struct rbd_device *rbd_dev;
4866 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4867 if (!rbd_dev)
4868 return NULL;
4870 spin_lock_init(&rbd_dev->lock);
4871 INIT_LIST_HEAD(&rbd_dev->node);
4872 init_rwsem(&rbd_dev->header_rwsem);
4874 ceph_oid_init(&rbd_dev->header_oid);
4875 ceph_oloc_init(&rbd_dev->header_oloc);
4877 mutex_init(&rbd_dev->watch_mutex);
4878 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4879 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4881 init_rwsem(&rbd_dev->lock_rwsem);
4882 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4883 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4884 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4885 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4886 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4887 init_waitqueue_head(&rbd_dev->lock_waitq);
4889 rbd_dev->dev.bus = &rbd_bus_type;
4890 rbd_dev->dev.type = &rbd_device_type;
4891 rbd_dev->dev.parent = &rbd_root_dev;
4892 device_initialize(&rbd_dev->dev);
4894 rbd_dev->rbd_client = rbdc;
4895 rbd_dev->spec = spec;
4897 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
4898 rbd_dev->layout.stripe_count = 1;
4899 rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
4900 rbd_dev->layout.pool_id = spec->pool_id;
4901 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
4903 return rbd_dev;
4907 * Create a mapping rbd_dev.
4909 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4910 struct rbd_spec *spec,
4911 struct rbd_options *opts)
4913 struct rbd_device *rbd_dev;
4915 rbd_dev = __rbd_dev_create(rbdc, spec);
4916 if (!rbd_dev)
4917 return NULL;
4919 rbd_dev->opts = opts;
4921 /* get an id and fill in device name */
4922 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4923 minor_to_rbd_dev_id(1 << MINORBITS),
4924 GFP_KERNEL);
4925 if (rbd_dev->dev_id < 0)
4926 goto fail_rbd_dev;
4928 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4929 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4930 rbd_dev->name);
4931 if (!rbd_dev->task_wq)
4932 goto fail_dev_id;
4934 /* we have a ref from do_rbd_add() */
4935 __module_get(THIS_MODULE);
4937 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4938 return rbd_dev;
4940 fail_dev_id:
4941 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4942 fail_rbd_dev:
4943 rbd_dev_free(rbd_dev);
4944 return NULL;
4947 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4949 if (rbd_dev)
4950 put_device(&rbd_dev->dev);
4954 * Get the size and object order for an image snapshot, or if
4955 * snap_id is CEPH_NOSNAP, gets this information for the base
4956 * image.
4958 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4959 u8 *order, u64 *snap_size)
4961 __le64 snapid = cpu_to_le64(snap_id);
4962 int ret;
4963 struct {
4964 u8 order;
4965 __le64 size;
4966 } __attribute__ ((packed)) size_buf = { 0 };
4968 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4969 "rbd", "get_size",
4970 &snapid, sizeof (snapid),
4971 &size_buf, sizeof (size_buf));
4972 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4973 if (ret < 0)
4974 return ret;
4975 if (ret < sizeof (size_buf))
4976 return -ERANGE;
4978 if (order) {
4979 *order = size_buf.order;
4980 dout(" order %u", (unsigned int)*order);
4982 *snap_size = le64_to_cpu(size_buf.size);
4984 dout(" snap_id 0x%016llx snap_size = %llu\n",
4985 (unsigned long long)snap_id,
4986 (unsigned long long)*snap_size);
4988 return 0;
4991 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4993 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4994 &rbd_dev->header.obj_order,
4995 &rbd_dev->header.image_size);
4998 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5000 void *reply_buf;
5001 int ret;
5002 void *p;
5004 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
5005 if (!reply_buf)
5006 return -ENOMEM;
5008 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5009 "rbd", "get_object_prefix", NULL, 0,
5010 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
5011 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5012 if (ret < 0)
5013 goto out;
5015 p = reply_buf;
5016 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5017 p + ret, NULL, GFP_NOIO);
5018 ret = 0;
5020 if (IS_ERR(rbd_dev->header.object_prefix)) {
5021 ret = PTR_ERR(rbd_dev->header.object_prefix);
5022 rbd_dev->header.object_prefix = NULL;
5023 } else {
5024 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
5026 out:
5027 kfree(reply_buf);
5029 return ret;
5032 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5033 u64 *snap_features)
5035 __le64 snapid = cpu_to_le64(snap_id);
5036 struct {
5037 __le64 features;
5038 __le64 incompat;
5039 } __attribute__ ((packed)) features_buf = { 0 };
5040 u64 unsup;
5041 int ret;
5043 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5044 "rbd", "get_features",
5045 &snapid, sizeof (snapid),
5046 &features_buf, sizeof (features_buf));
5047 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5048 if (ret < 0)
5049 return ret;
5050 if (ret < sizeof (features_buf))
5051 return -ERANGE;
5053 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5054 if (unsup) {
5055 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5056 unsup);
5057 return -ENXIO;
5060 *snap_features = le64_to_cpu(features_buf.features);
5062 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5063 (unsigned long long)snap_id,
5064 (unsigned long long)*snap_features,
5065 (unsigned long long)le64_to_cpu(features_buf.incompat));
5067 return 0;
5070 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5072 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5073 &rbd_dev->header.features);
5076 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5078 struct rbd_spec *parent_spec;
5079 size_t size;
5080 void *reply_buf = NULL;
5081 __le64 snapid;
5082 void *p;
5083 void *end;
5084 u64 pool_id;
5085 char *image_id;
5086 u64 snap_id;
5087 u64 overlap;
5088 int ret;
5090 parent_spec = rbd_spec_alloc();
5091 if (!parent_spec)
5092 return -ENOMEM;
5094 size = sizeof (__le64) + /* pool_id */
5095 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5096 sizeof (__le64) + /* snap_id */
5097 sizeof (__le64); /* overlap */
5098 reply_buf = kmalloc(size, GFP_KERNEL);
5099 if (!reply_buf) {
5100 ret = -ENOMEM;
5101 goto out_err;
5104 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5105 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5106 "rbd", "get_parent",
5107 &snapid, sizeof (snapid),
5108 reply_buf, size);
5109 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5110 if (ret < 0)
5111 goto out_err;
5113 p = reply_buf;
5114 end = reply_buf + ret;
5115 ret = -ERANGE;
5116 ceph_decode_64_safe(&p, end, pool_id, out_err);
5117 if (pool_id == CEPH_NOPOOL) {
5119 * Either the parent never existed, or we have
5120 * record of it but the image got flattened so it no
5121 * longer has a parent. When the parent of a
5122 * layered image disappears we immediately set the
5123 * overlap to 0. The effect of this is that all new
5124 * requests will be treated as if the image had no
5125 * parent.
5127 if (rbd_dev->parent_overlap) {
5128 rbd_dev->parent_overlap = 0;
5129 rbd_dev_parent_put(rbd_dev);
5130 pr_info("%s: clone image has been flattened\n",
5131 rbd_dev->disk->disk_name);
5134 goto out; /* No parent? No problem. */
5137 /* The ceph file layout needs to fit pool id in 32 bits */
5139 ret = -EIO;
5140 if (pool_id > (u64)U32_MAX) {
5141 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5142 (unsigned long long)pool_id, U32_MAX);
5143 goto out_err;
5146 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5147 if (IS_ERR(image_id)) {
5148 ret = PTR_ERR(image_id);
5149 goto out_err;
5151 ceph_decode_64_safe(&p, end, snap_id, out_err);
5152 ceph_decode_64_safe(&p, end, overlap, out_err);
5155 * The parent won't change (except when the clone is
5156 * flattened, already handled that). So we only need to
5157 * record the parent spec we have not already done so.
5159 if (!rbd_dev->parent_spec) {
5160 parent_spec->pool_id = pool_id;
5161 parent_spec->image_id = image_id;
5162 parent_spec->snap_id = snap_id;
5163 rbd_dev->parent_spec = parent_spec;
5164 parent_spec = NULL; /* rbd_dev now owns this */
5165 } else {
5166 kfree(image_id);
5170 * We always update the parent overlap. If it's zero we issue
5171 * a warning, as we will proceed as if there was no parent.
5173 if (!overlap) {
5174 if (parent_spec) {
5175 /* refresh, careful to warn just once */
5176 if (rbd_dev->parent_overlap)
5177 rbd_warn(rbd_dev,
5178 "clone now standalone (overlap became 0)");
5179 } else {
5180 /* initial probe */
5181 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5184 rbd_dev->parent_overlap = overlap;
5186 out:
5187 ret = 0;
5188 out_err:
5189 kfree(reply_buf);
5190 rbd_spec_put(parent_spec);
5192 return ret;
5195 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5197 struct {
5198 __le64 stripe_unit;
5199 __le64 stripe_count;
5200 } __attribute__ ((packed)) striping_info_buf = { 0 };
5201 size_t size = sizeof (striping_info_buf);
5202 void *p;
5203 u64 obj_size;
5204 u64 stripe_unit;
5205 u64 stripe_count;
5206 int ret;
5208 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5209 "rbd", "get_stripe_unit_count", NULL, 0,
5210 (char *)&striping_info_buf, size);
5211 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5212 if (ret < 0)
5213 return ret;
5214 if (ret < size)
5215 return -ERANGE;
5218 * We don't actually support the "fancy striping" feature
5219 * (STRIPINGV2) yet, but if the striping sizes are the
5220 * defaults the behavior is the same as before. So find
5221 * out, and only fail if the image has non-default values.
5223 ret = -EINVAL;
5224 obj_size = (u64)1 << rbd_dev->header.obj_order;
5225 p = &striping_info_buf;
5226 stripe_unit = ceph_decode_64(&p);
5227 if (stripe_unit != obj_size) {
5228 rbd_warn(rbd_dev, "unsupported stripe unit "
5229 "(got %llu want %llu)",
5230 stripe_unit, obj_size);
5231 return -EINVAL;
5233 stripe_count = ceph_decode_64(&p);
5234 if (stripe_count != 1) {
5235 rbd_warn(rbd_dev, "unsupported stripe count "
5236 "(got %llu want 1)", stripe_count);
5237 return -EINVAL;
5239 rbd_dev->header.stripe_unit = stripe_unit;
5240 rbd_dev->header.stripe_count = stripe_count;
5242 return 0;
5245 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5247 size_t image_id_size;
5248 char *image_id;
5249 void *p;
5250 void *end;
5251 size_t size;
5252 void *reply_buf = NULL;
5253 size_t len = 0;
5254 char *image_name = NULL;
5255 int ret;
5257 rbd_assert(!rbd_dev->spec->image_name);
5259 len = strlen(rbd_dev->spec->image_id);
5260 image_id_size = sizeof (__le32) + len;
5261 image_id = kmalloc(image_id_size, GFP_KERNEL);
5262 if (!image_id)
5263 return NULL;
5265 p = image_id;
5266 end = image_id + image_id_size;
5267 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5269 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5270 reply_buf = kmalloc(size, GFP_KERNEL);
5271 if (!reply_buf)
5272 goto out;
5274 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
5275 "rbd", "dir_get_name",
5276 image_id, image_id_size,
5277 reply_buf, size);
5278 if (ret < 0)
5279 goto out;
5280 p = reply_buf;
5281 end = reply_buf + ret;
5283 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5284 if (IS_ERR(image_name))
5285 image_name = NULL;
5286 else
5287 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5288 out:
5289 kfree(reply_buf);
5290 kfree(image_id);
5292 return image_name;
5295 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5297 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5298 const char *snap_name;
5299 u32 which = 0;
5301 /* Skip over names until we find the one we are looking for */
5303 snap_name = rbd_dev->header.snap_names;
5304 while (which < snapc->num_snaps) {
5305 if (!strcmp(name, snap_name))
5306 return snapc->snaps[which];
5307 snap_name += strlen(snap_name) + 1;
5308 which++;
5310 return CEPH_NOSNAP;
5313 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5315 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5316 u32 which;
5317 bool found = false;
5318 u64 snap_id;
5320 for (which = 0; !found && which < snapc->num_snaps; which++) {
5321 const char *snap_name;
5323 snap_id = snapc->snaps[which];
5324 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5325 if (IS_ERR(snap_name)) {
5326 /* ignore no-longer existing snapshots */
5327 if (PTR_ERR(snap_name) == -ENOENT)
5328 continue;
5329 else
5330 break;
5332 found = !strcmp(name, snap_name);
5333 kfree(snap_name);
5335 return found ? snap_id : CEPH_NOSNAP;
5339 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5340 * no snapshot by that name is found, or if an error occurs.
5342 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5344 if (rbd_dev->image_format == 1)
5345 return rbd_v1_snap_id_by_name(rbd_dev, name);
5347 return rbd_v2_snap_id_by_name(rbd_dev, name);
5351 * An image being mapped will have everything but the snap id.
5353 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5355 struct rbd_spec *spec = rbd_dev->spec;
5357 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5358 rbd_assert(spec->image_id && spec->image_name);
5359 rbd_assert(spec->snap_name);
5361 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5362 u64 snap_id;
5364 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5365 if (snap_id == CEPH_NOSNAP)
5366 return -ENOENT;
5368 spec->snap_id = snap_id;
5369 } else {
5370 spec->snap_id = CEPH_NOSNAP;
5373 return 0;
5377 * A parent image will have all ids but none of the names.
5379 * All names in an rbd spec are dynamically allocated. It's OK if we
5380 * can't figure out the name for an image id.
5382 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5384 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5385 struct rbd_spec *spec = rbd_dev->spec;
5386 const char *pool_name;
5387 const char *image_name;
5388 const char *snap_name;
5389 int ret;
5391 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5392 rbd_assert(spec->image_id);
5393 rbd_assert(spec->snap_id != CEPH_NOSNAP);
5395 /* Get the pool name; we have to make our own copy of this */
5397 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5398 if (!pool_name) {
5399 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5400 return -EIO;
5402 pool_name = kstrdup(pool_name, GFP_KERNEL);
5403 if (!pool_name)
5404 return -ENOMEM;
5406 /* Fetch the image name; tolerate failure here */
5408 image_name = rbd_dev_image_name(rbd_dev);
5409 if (!image_name)
5410 rbd_warn(rbd_dev, "unable to get image name");
5412 /* Fetch the snapshot name */
5414 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5415 if (IS_ERR(snap_name)) {
5416 ret = PTR_ERR(snap_name);
5417 goto out_err;
5420 spec->pool_name = pool_name;
5421 spec->image_name = image_name;
5422 spec->snap_name = snap_name;
5424 return 0;
5426 out_err:
5427 kfree(image_name);
5428 kfree(pool_name);
5429 return ret;
5432 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5434 size_t size;
5435 int ret;
5436 void *reply_buf;
5437 void *p;
5438 void *end;
5439 u64 seq;
5440 u32 snap_count;
5441 struct ceph_snap_context *snapc;
5442 u32 i;
5445 * We'll need room for the seq value (maximum snapshot id),
5446 * snapshot count, and array of that many snapshot ids.
5447 * For now we have a fixed upper limit on the number we're
5448 * prepared to receive.
5450 size = sizeof (__le64) + sizeof (__le32) +
5451 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5452 reply_buf = kzalloc(size, GFP_KERNEL);
5453 if (!reply_buf)
5454 return -ENOMEM;
5456 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5457 "rbd", "get_snapcontext", NULL, 0,
5458 reply_buf, size);
5459 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5460 if (ret < 0)
5461 goto out;
5463 p = reply_buf;
5464 end = reply_buf + ret;
5465 ret = -ERANGE;
5466 ceph_decode_64_safe(&p, end, seq, out);
5467 ceph_decode_32_safe(&p, end, snap_count, out);
5470 * Make sure the reported number of snapshot ids wouldn't go
5471 * beyond the end of our buffer. But before checking that,
5472 * make sure the computed size of the snapshot context we
5473 * allocate is representable in a size_t.
5475 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5476 / sizeof (u64)) {
5477 ret = -EINVAL;
5478 goto out;
5480 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5481 goto out;
5482 ret = 0;
5484 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5485 if (!snapc) {
5486 ret = -ENOMEM;
5487 goto out;
5489 snapc->seq = seq;
5490 for (i = 0; i < snap_count; i++)
5491 snapc->snaps[i] = ceph_decode_64(&p);
5493 ceph_put_snap_context(rbd_dev->header.snapc);
5494 rbd_dev->header.snapc = snapc;
5496 dout(" snap context seq = %llu, snap_count = %u\n",
5497 (unsigned long long)seq, (unsigned int)snap_count);
5498 out:
5499 kfree(reply_buf);
5501 return ret;
5504 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5505 u64 snap_id)
5507 size_t size;
5508 void *reply_buf;
5509 __le64 snapid;
5510 int ret;
5511 void *p;
5512 void *end;
5513 char *snap_name;
5515 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5516 reply_buf = kmalloc(size, GFP_KERNEL);
5517 if (!reply_buf)
5518 return ERR_PTR(-ENOMEM);
5520 snapid = cpu_to_le64(snap_id);
5521 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
5522 "rbd", "get_snapshot_name",
5523 &snapid, sizeof (snapid),
5524 reply_buf, size);
5525 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5526 if (ret < 0) {
5527 snap_name = ERR_PTR(ret);
5528 goto out;
5531 p = reply_buf;
5532 end = reply_buf + ret;
5533 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5534 if (IS_ERR(snap_name))
5535 goto out;
5537 dout(" snap_id 0x%016llx snap_name = %s\n",
5538 (unsigned long long)snap_id, snap_name);
5539 out:
5540 kfree(reply_buf);
5542 return snap_name;
5545 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5547 bool first_time = rbd_dev->header.object_prefix == NULL;
5548 int ret;
5550 ret = rbd_dev_v2_image_size(rbd_dev);
5551 if (ret)
5552 return ret;
5554 if (first_time) {
5555 ret = rbd_dev_v2_header_onetime(rbd_dev);
5556 if (ret)
5557 return ret;
5560 ret = rbd_dev_v2_snap_context(rbd_dev);
5561 if (ret && first_time) {
5562 kfree(rbd_dev->header.object_prefix);
5563 rbd_dev->header.object_prefix = NULL;
5566 return ret;
5569 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5571 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5573 if (rbd_dev->image_format == 1)
5574 return rbd_dev_v1_header_info(rbd_dev);
5576 return rbd_dev_v2_header_info(rbd_dev);
5580 * Skips over white space at *buf, and updates *buf to point to the
5581 * first found non-space character (if any). Returns the length of
5582 * the token (string of non-white space characters) found. Note
5583 * that *buf must be terminated with '\0'.
5585 static inline size_t next_token(const char **buf)
5588 * These are the characters that produce nonzero for
5589 * isspace() in the "C" and "POSIX" locales.
5591 const char *spaces = " \f\n\r\t\v";
5593 *buf += strspn(*buf, spaces); /* Find start of token */
5595 return strcspn(*buf, spaces); /* Return token length */
5599 * Finds the next token in *buf, dynamically allocates a buffer big
5600 * enough to hold a copy of it, and copies the token into the new
5601 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5602 * that a duplicate buffer is created even for a zero-length token.
5604 * Returns a pointer to the newly-allocated duplicate, or a null
5605 * pointer if memory for the duplicate was not available. If
5606 * the lenp argument is a non-null pointer, the length of the token
5607 * (not including the '\0') is returned in *lenp.
5609 * If successful, the *buf pointer will be updated to point beyond
5610 * the end of the found token.
5612 * Note: uses GFP_KERNEL for allocation.
5614 static inline char *dup_token(const char **buf, size_t *lenp)
5616 char *dup;
5617 size_t len;
5619 len = next_token(buf);
5620 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5621 if (!dup)
5622 return NULL;
5623 *(dup + len) = '\0';
5624 *buf += len;
5626 if (lenp)
5627 *lenp = len;
5629 return dup;
5633 * Parse the options provided for an "rbd add" (i.e., rbd image
5634 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5635 * and the data written is passed here via a NUL-terminated buffer.
5636 * Returns 0 if successful or an error code otherwise.
5638 * The information extracted from these options is recorded in
5639 * the other parameters which return dynamically-allocated
5640 * structures:
5641 * ceph_opts
5642 * The address of a pointer that will refer to a ceph options
5643 * structure. Caller must release the returned pointer using
5644 * ceph_destroy_options() when it is no longer needed.
5645 * rbd_opts
5646 * Address of an rbd options pointer. Fully initialized by
5647 * this function; caller must release with kfree().
5648 * spec
5649 * Address of an rbd image specification pointer. Fully
5650 * initialized by this function based on parsed options.
5651 * Caller must release with rbd_spec_put().
5653 * The options passed take this form:
5654 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5655 * where:
5656 * <mon_addrs>
5657 * A comma-separated list of one or more monitor addresses.
5658 * A monitor address is an ip address, optionally followed
5659 * by a port number (separated by a colon).
5660 * I.e.: ip1[:port1][,ip2[:port2]...]
5661 * <options>
5662 * A comma-separated list of ceph and/or rbd options.
5663 * <pool_name>
5664 * The name of the rados pool containing the rbd image.
5665 * <image_name>
5666 * The name of the image in that pool to map.
5667 * <snap_id>
5668 * An optional snapshot id. If provided, the mapping will
5669 * present data from the image at the time that snapshot was
5670 * created. The image head is used if no snapshot id is
5671 * provided. Snapshot mappings are always read-only.
5673 static int rbd_add_parse_args(const char *buf,
5674 struct ceph_options **ceph_opts,
5675 struct rbd_options **opts,
5676 struct rbd_spec **rbd_spec)
5678 size_t len;
5679 char *options;
5680 const char *mon_addrs;
5681 char *snap_name;
5682 size_t mon_addrs_size;
5683 struct rbd_spec *spec = NULL;
5684 struct rbd_options *rbd_opts = NULL;
5685 struct ceph_options *copts;
5686 int ret;
5688 /* The first four tokens are required */
5690 len = next_token(&buf);
5691 if (!len) {
5692 rbd_warn(NULL, "no monitor address(es) provided");
5693 return -EINVAL;
5695 mon_addrs = buf;
5696 mon_addrs_size = len + 1;
5697 buf += len;
5699 ret = -EINVAL;
5700 options = dup_token(&buf, NULL);
5701 if (!options)
5702 return -ENOMEM;
5703 if (!*options) {
5704 rbd_warn(NULL, "no options provided");
5705 goto out_err;
5708 spec = rbd_spec_alloc();
5709 if (!spec)
5710 goto out_mem;
5712 spec->pool_name = dup_token(&buf, NULL);
5713 if (!spec->pool_name)
5714 goto out_mem;
5715 if (!*spec->pool_name) {
5716 rbd_warn(NULL, "no pool name provided");
5717 goto out_err;
5720 spec->image_name = dup_token(&buf, NULL);
5721 if (!spec->image_name)
5722 goto out_mem;
5723 if (!*spec->image_name) {
5724 rbd_warn(NULL, "no image name provided");
5725 goto out_err;
5729 * Snapshot name is optional; default is to use "-"
5730 * (indicating the head/no snapshot).
5732 len = next_token(&buf);
5733 if (!len) {
5734 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5735 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5736 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5737 ret = -ENAMETOOLONG;
5738 goto out_err;
5740 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5741 if (!snap_name)
5742 goto out_mem;
5743 *(snap_name + len) = '\0';
5744 spec->snap_name = snap_name;
5746 /* Initialize all rbd options to the defaults */
5748 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5749 if (!rbd_opts)
5750 goto out_mem;
5752 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5753 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5754 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5756 copts = ceph_parse_options(options, mon_addrs,
5757 mon_addrs + mon_addrs_size - 1,
5758 parse_rbd_opts_token, rbd_opts);
5759 if (IS_ERR(copts)) {
5760 ret = PTR_ERR(copts);
5761 goto out_err;
5763 kfree(options);
5765 *ceph_opts = copts;
5766 *opts = rbd_opts;
5767 *rbd_spec = spec;
5769 return 0;
5770 out_mem:
5771 ret = -ENOMEM;
5772 out_err:
5773 kfree(rbd_opts);
5774 rbd_spec_put(spec);
5775 kfree(options);
5777 return ret;
5781 * Return pool id (>= 0) or a negative error code.
5783 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5785 struct ceph_options *opts = rbdc->client->options;
5786 u64 newest_epoch;
5787 int tries = 0;
5788 int ret;
5790 again:
5791 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5792 if (ret == -ENOENT && tries++ < 1) {
5793 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5794 &newest_epoch);
5795 if (ret < 0)
5796 return ret;
5798 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5799 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5800 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5801 newest_epoch,
5802 opts->mount_timeout);
5803 goto again;
5804 } else {
5805 /* the osdmap we have is new enough */
5806 return -ENOENT;
5810 return ret;
5814 * An rbd format 2 image has a unique identifier, distinct from the
5815 * name given to it by the user. Internally, that identifier is
5816 * what's used to specify the names of objects related to the image.
5818 * A special "rbd id" object is used to map an rbd image name to its
5819 * id. If that object doesn't exist, then there is no v2 rbd image
5820 * with the supplied name.
5822 * This function will record the given rbd_dev's image_id field if
5823 * it can be determined, and in that case will return 0. If any
5824 * errors occur a negative errno will be returned and the rbd_dev's
5825 * image_id field will be unchanged (and should be NULL).
5827 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5829 int ret;
5830 size_t size;
5831 char *object_name;
5832 void *response;
5833 char *image_id;
5836 * When probing a parent image, the image id is already
5837 * known (and the image name likely is not). There's no
5838 * need to fetch the image id again in this case. We
5839 * do still need to set the image format though.
5841 if (rbd_dev->spec->image_id) {
5842 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5844 return 0;
5848 * First, see if the format 2 image id file exists, and if
5849 * so, get the image's persistent id from it.
5851 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
5852 object_name = kmalloc(size, GFP_NOIO);
5853 if (!object_name)
5854 return -ENOMEM;
5855 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
5856 dout("rbd id object name is %s\n", object_name);
5858 /* Response will be an encoded string, which includes a length */
5860 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5861 response = kzalloc(size, GFP_NOIO);
5862 if (!response) {
5863 ret = -ENOMEM;
5864 goto out;
5867 /* If it doesn't exist we'll assume it's a format 1 image */
5869 ret = rbd_obj_method_sync(rbd_dev, object_name,
5870 "rbd", "get_id", NULL, 0,
5871 response, RBD_IMAGE_ID_LEN_MAX);
5872 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5873 if (ret == -ENOENT) {
5874 image_id = kstrdup("", GFP_KERNEL);
5875 ret = image_id ? 0 : -ENOMEM;
5876 if (!ret)
5877 rbd_dev->image_format = 1;
5878 } else if (ret >= 0) {
5879 void *p = response;
5881 image_id = ceph_extract_encoded_string(&p, p + ret,
5882 NULL, GFP_NOIO);
5883 ret = PTR_ERR_OR_ZERO(image_id);
5884 if (!ret)
5885 rbd_dev->image_format = 2;
5888 if (!ret) {
5889 rbd_dev->spec->image_id = image_id;
5890 dout("image_id is %s\n", image_id);
5892 out:
5893 kfree(response);
5894 kfree(object_name);
5896 return ret;
5900 * Undo whatever state changes are made by v1 or v2 header info
5901 * call.
5903 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5905 struct rbd_image_header *header;
5907 rbd_dev_parent_put(rbd_dev);
5909 /* Free dynamic fields from the header, then zero it out */
5911 header = &rbd_dev->header;
5912 ceph_put_snap_context(header->snapc);
5913 kfree(header->snap_sizes);
5914 kfree(header->snap_names);
5915 kfree(header->object_prefix);
5916 memset(header, 0, sizeof (*header));
5919 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5921 int ret;
5923 ret = rbd_dev_v2_object_prefix(rbd_dev);
5924 if (ret)
5925 goto out_err;
5928 * Get the and check features for the image. Currently the
5929 * features are assumed to never change.
5931 ret = rbd_dev_v2_features(rbd_dev);
5932 if (ret)
5933 goto out_err;
5935 /* If the image supports fancy striping, get its parameters */
5937 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5938 ret = rbd_dev_v2_striping_info(rbd_dev);
5939 if (ret < 0)
5940 goto out_err;
5942 /* No support for crypto and compression type format 2 images */
5944 return 0;
5945 out_err:
5946 rbd_dev->header.features = 0;
5947 kfree(rbd_dev->header.object_prefix);
5948 rbd_dev->header.object_prefix = NULL;
5950 return ret;
5954 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5955 * rbd_dev_image_probe() recursion depth, which means it's also the
5956 * length of the already discovered part of the parent chain.
5958 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5960 struct rbd_device *parent = NULL;
5961 int ret;
5963 if (!rbd_dev->parent_spec)
5964 return 0;
5966 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5967 pr_info("parent chain is too long (%d)\n", depth);
5968 ret = -EINVAL;
5969 goto out_err;
5972 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5973 if (!parent) {
5974 ret = -ENOMEM;
5975 goto out_err;
5979 * Images related by parent/child relationships always share
5980 * rbd_client and spec/parent_spec, so bump their refcounts.
5982 __rbd_get_client(rbd_dev->rbd_client);
5983 rbd_spec_get(rbd_dev->parent_spec);
5985 ret = rbd_dev_image_probe(parent, depth);
5986 if (ret < 0)
5987 goto out_err;
5989 rbd_dev->parent = parent;
5990 atomic_set(&rbd_dev->parent_ref, 1);
5991 return 0;
5993 out_err:
5994 rbd_dev_unparent(rbd_dev);
5995 rbd_dev_destroy(parent);
5996 return ret;
6000 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6001 * upon return.
6003 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6005 int ret;
6007 /* Record our major and minor device numbers. */
6009 if (!single_major) {
6010 ret = register_blkdev(0, rbd_dev->name);
6011 if (ret < 0)
6012 goto err_out_unlock;
6014 rbd_dev->major = ret;
6015 rbd_dev->minor = 0;
6016 } else {
6017 rbd_dev->major = rbd_major;
6018 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6021 /* Set up the blkdev mapping. */
6023 ret = rbd_init_disk(rbd_dev);
6024 if (ret)
6025 goto err_out_blkdev;
6027 ret = rbd_dev_mapping_set(rbd_dev);
6028 if (ret)
6029 goto err_out_disk;
6031 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6032 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
6034 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6035 ret = device_add(&rbd_dev->dev);
6036 if (ret)
6037 goto err_out_mapping;
6039 /* Everything's ready. Announce the disk to the world. */
6041 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6042 up_write(&rbd_dev->header_rwsem);
6044 spin_lock(&rbd_dev_list_lock);
6045 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6046 spin_unlock(&rbd_dev_list_lock);
6048 add_disk(rbd_dev->disk);
6049 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6050 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6051 rbd_dev->header.features);
6053 return ret;
6055 err_out_mapping:
6056 rbd_dev_mapping_clear(rbd_dev);
6057 err_out_disk:
6058 rbd_free_disk(rbd_dev);
6059 err_out_blkdev:
6060 if (!single_major)
6061 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6062 err_out_unlock:
6063 up_write(&rbd_dev->header_rwsem);
6064 return ret;
6067 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6069 struct rbd_spec *spec = rbd_dev->spec;
6070 int ret;
6072 /* Record the header object name for this rbd image. */
6074 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6076 rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
6077 if (rbd_dev->image_format == 1)
6078 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6079 spec->image_name, RBD_SUFFIX);
6080 else
6081 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6082 RBD_HEADER_PREFIX, spec->image_id);
6084 return ret;
6087 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6089 rbd_dev_unprobe(rbd_dev);
6090 rbd_dev->image_format = 0;
6091 kfree(rbd_dev->spec->image_id);
6092 rbd_dev->spec->image_id = NULL;
6094 rbd_dev_destroy(rbd_dev);
6098 * Probe for the existence of the header object for the given rbd
6099 * device. If this image is the one being mapped (i.e., not a
6100 * parent), initiate a watch on its header object before using that
6101 * object to get detailed information about the rbd image.
6103 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6105 int ret;
6108 * Get the id from the image id object. Unless there's an
6109 * error, rbd_dev->spec->image_id will be filled in with
6110 * a dynamically-allocated string, and rbd_dev->image_format
6111 * will be set to either 1 or 2.
6113 ret = rbd_dev_image_id(rbd_dev);
6114 if (ret)
6115 return ret;
6117 ret = rbd_dev_header_name(rbd_dev);
6118 if (ret)
6119 goto err_out_format;
6121 if (!depth) {
6122 ret = rbd_register_watch(rbd_dev);
6123 if (ret) {
6124 if (ret == -ENOENT)
6125 pr_info("image %s/%s does not exist\n",
6126 rbd_dev->spec->pool_name,
6127 rbd_dev->spec->image_name);
6128 goto err_out_format;
6132 ret = rbd_dev_header_info(rbd_dev);
6133 if (ret)
6134 goto err_out_watch;
6137 * If this image is the one being mapped, we have pool name and
6138 * id, image name and id, and snap name - need to fill snap id.
6139 * Otherwise this is a parent image, identified by pool, image
6140 * and snap ids - need to fill in names for those ids.
6142 if (!depth)
6143 ret = rbd_spec_fill_snap_id(rbd_dev);
6144 else
6145 ret = rbd_spec_fill_names(rbd_dev);
6146 if (ret) {
6147 if (ret == -ENOENT)
6148 pr_info("snap %s/%s@%s does not exist\n",
6149 rbd_dev->spec->pool_name,
6150 rbd_dev->spec->image_name,
6151 rbd_dev->spec->snap_name);
6152 goto err_out_probe;
6155 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6156 ret = rbd_dev_v2_parent_info(rbd_dev);
6157 if (ret)
6158 goto err_out_probe;
6161 * Need to warn users if this image is the one being
6162 * mapped and has a parent.
6164 if (!depth && rbd_dev->parent_spec)
6165 rbd_warn(rbd_dev,
6166 "WARNING: kernel layering is EXPERIMENTAL!");
6169 ret = rbd_dev_probe_parent(rbd_dev, depth);
6170 if (ret)
6171 goto err_out_probe;
6173 dout("discovered format %u image, header name is %s\n",
6174 rbd_dev->image_format, rbd_dev->header_oid.name);
6175 return 0;
6177 err_out_probe:
6178 rbd_dev_unprobe(rbd_dev);
6179 err_out_watch:
6180 if (!depth)
6181 rbd_unregister_watch(rbd_dev);
6182 err_out_format:
6183 rbd_dev->image_format = 0;
6184 kfree(rbd_dev->spec->image_id);
6185 rbd_dev->spec->image_id = NULL;
6186 return ret;
6189 static ssize_t do_rbd_add(struct bus_type *bus,
6190 const char *buf,
6191 size_t count)
6193 struct rbd_device *rbd_dev = NULL;
6194 struct ceph_options *ceph_opts = NULL;
6195 struct rbd_options *rbd_opts = NULL;
6196 struct rbd_spec *spec = NULL;
6197 struct rbd_client *rbdc;
6198 bool read_only;
6199 int rc;
6201 if (!capable(CAP_SYS_ADMIN))
6202 return -EPERM;
6204 if (!try_module_get(THIS_MODULE))
6205 return -ENODEV;
6207 /* parse add command */
6208 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6209 if (rc < 0)
6210 goto out;
6212 rbdc = rbd_get_client(ceph_opts);
6213 if (IS_ERR(rbdc)) {
6214 rc = PTR_ERR(rbdc);
6215 goto err_out_args;
6218 /* pick the pool */
6219 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6220 if (rc < 0) {
6221 if (rc == -ENOENT)
6222 pr_info("pool %s does not exist\n", spec->pool_name);
6223 goto err_out_client;
6225 spec->pool_id = (u64)rc;
6227 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6228 if (!rbd_dev) {
6229 rc = -ENOMEM;
6230 goto err_out_client;
6232 rbdc = NULL; /* rbd_dev now owns this */
6233 spec = NULL; /* rbd_dev now owns this */
6234 rbd_opts = NULL; /* rbd_dev now owns this */
6236 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6237 if (!rbd_dev->config_info) {
6238 rc = -ENOMEM;
6239 goto err_out_rbd_dev;
6242 down_write(&rbd_dev->header_rwsem);
6243 rc = rbd_dev_image_probe(rbd_dev, 0);
6244 if (rc < 0) {
6245 up_write(&rbd_dev->header_rwsem);
6246 goto err_out_rbd_dev;
6249 /* If we are mapping a snapshot it must be marked read-only */
6251 read_only = rbd_dev->opts->read_only;
6252 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6253 read_only = true;
6254 rbd_dev->mapping.read_only = read_only;
6256 rc = rbd_dev_device_setup(rbd_dev);
6257 if (rc) {
6259 * rbd_unregister_watch() can't be moved into
6260 * rbd_dev_image_release() without refactoring, see
6261 * commit 1f3ef78861ac.
6263 rbd_unregister_watch(rbd_dev);
6264 rbd_dev_image_release(rbd_dev);
6265 goto out;
6268 rc = count;
6269 out:
6270 module_put(THIS_MODULE);
6271 return rc;
6273 err_out_rbd_dev:
6274 rbd_dev_destroy(rbd_dev);
6275 err_out_client:
6276 rbd_put_client(rbdc);
6277 err_out_args:
6278 rbd_spec_put(spec);
6279 kfree(rbd_opts);
6280 goto out;
6283 static ssize_t rbd_add(struct bus_type *bus,
6284 const char *buf,
6285 size_t count)
6287 if (single_major)
6288 return -EINVAL;
6290 return do_rbd_add(bus, buf, count);
6293 static ssize_t rbd_add_single_major(struct bus_type *bus,
6294 const char *buf,
6295 size_t count)
6297 return do_rbd_add(bus, buf, count);
6300 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6302 rbd_free_disk(rbd_dev);
6304 spin_lock(&rbd_dev_list_lock);
6305 list_del_init(&rbd_dev->node);
6306 spin_unlock(&rbd_dev_list_lock);
6308 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6309 device_del(&rbd_dev->dev);
6310 rbd_dev_mapping_clear(rbd_dev);
6311 if (!single_major)
6312 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6315 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6317 while (rbd_dev->parent) {
6318 struct rbd_device *first = rbd_dev;
6319 struct rbd_device *second = first->parent;
6320 struct rbd_device *third;
6323 * Follow to the parent with no grandparent and
6324 * remove it.
6326 while (second && (third = second->parent)) {
6327 first = second;
6328 second = third;
6330 rbd_assert(second);
6331 rbd_dev_image_release(second);
6332 first->parent = NULL;
6333 first->parent_overlap = 0;
6335 rbd_assert(first->parent_spec);
6336 rbd_spec_put(first->parent_spec);
6337 first->parent_spec = NULL;
6341 static ssize_t do_rbd_remove(struct bus_type *bus,
6342 const char *buf,
6343 size_t count)
6345 struct rbd_device *rbd_dev = NULL;
6346 struct list_head *tmp;
6347 int dev_id;
6348 char opt_buf[6];
6349 bool force = false;
6350 int ret;
6352 if (!capable(CAP_SYS_ADMIN))
6353 return -EPERM;
6355 dev_id = -1;
6356 opt_buf[0] = '\0';
6357 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6358 if (dev_id < 0) {
6359 pr_err("dev_id out of range\n");
6360 return -EINVAL;
6362 if (opt_buf[0] != '\0') {
6363 if (!strcmp(opt_buf, "force")) {
6364 force = true;
6365 } else {
6366 pr_err("bad remove option at '%s'\n", opt_buf);
6367 return -EINVAL;
6371 ret = -ENOENT;
6372 spin_lock(&rbd_dev_list_lock);
6373 list_for_each(tmp, &rbd_dev_list) {
6374 rbd_dev = list_entry(tmp, struct rbd_device, node);
6375 if (rbd_dev->dev_id == dev_id) {
6376 ret = 0;
6377 break;
6380 if (!ret) {
6381 spin_lock_irq(&rbd_dev->lock);
6382 if (rbd_dev->open_count && !force)
6383 ret = -EBUSY;
6384 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6385 &rbd_dev->flags))
6386 ret = -EINPROGRESS;
6387 spin_unlock_irq(&rbd_dev->lock);
6389 spin_unlock(&rbd_dev_list_lock);
6390 if (ret)
6391 return ret;
6393 if (force) {
6395 * Prevent new IO from being queued and wait for existing
6396 * IO to complete/fail.
6398 blk_mq_freeze_queue(rbd_dev->disk->queue);
6399 blk_set_queue_dying(rbd_dev->disk->queue);
6402 down_write(&rbd_dev->lock_rwsem);
6403 if (__rbd_is_lock_owner(rbd_dev))
6404 rbd_unlock(rbd_dev);
6405 up_write(&rbd_dev->lock_rwsem);
6406 rbd_unregister_watch(rbd_dev);
6409 * Don't free anything from rbd_dev->disk until after all
6410 * notifies are completely processed. Otherwise
6411 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6412 * in a potential use after free of rbd_dev->disk or rbd_dev.
6414 rbd_dev_device_release(rbd_dev);
6415 rbd_dev_image_release(rbd_dev);
6417 return count;
6420 static ssize_t rbd_remove(struct bus_type *bus,
6421 const char *buf,
6422 size_t count)
6424 if (single_major)
6425 return -EINVAL;
6427 return do_rbd_remove(bus, buf, count);
6430 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6431 const char *buf,
6432 size_t count)
6434 return do_rbd_remove(bus, buf, count);
6438 * create control files in sysfs
6439 * /sys/bus/rbd/...
6441 static int rbd_sysfs_init(void)
6443 int ret;
6445 ret = device_register(&rbd_root_dev);
6446 if (ret < 0)
6447 return ret;
6449 ret = bus_register(&rbd_bus_type);
6450 if (ret < 0)
6451 device_unregister(&rbd_root_dev);
6453 return ret;
6456 static void rbd_sysfs_cleanup(void)
6458 bus_unregister(&rbd_bus_type);
6459 device_unregister(&rbd_root_dev);
6462 static int rbd_slab_init(void)
6464 rbd_assert(!rbd_img_request_cache);
6465 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6466 if (!rbd_img_request_cache)
6467 return -ENOMEM;
6469 rbd_assert(!rbd_obj_request_cache);
6470 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6471 if (!rbd_obj_request_cache)
6472 goto out_err;
6474 rbd_assert(!rbd_segment_name_cache);
6475 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
6476 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
6477 if (rbd_segment_name_cache)
6478 return 0;
6479 out_err:
6480 kmem_cache_destroy(rbd_obj_request_cache);
6481 rbd_obj_request_cache = NULL;
6483 kmem_cache_destroy(rbd_img_request_cache);
6484 rbd_img_request_cache = NULL;
6486 return -ENOMEM;
6489 static void rbd_slab_exit(void)
6491 rbd_assert(rbd_segment_name_cache);
6492 kmem_cache_destroy(rbd_segment_name_cache);
6493 rbd_segment_name_cache = NULL;
6495 rbd_assert(rbd_obj_request_cache);
6496 kmem_cache_destroy(rbd_obj_request_cache);
6497 rbd_obj_request_cache = NULL;
6499 rbd_assert(rbd_img_request_cache);
6500 kmem_cache_destroy(rbd_img_request_cache);
6501 rbd_img_request_cache = NULL;
6504 static int __init rbd_init(void)
6506 int rc;
6508 if (!libceph_compatible(NULL)) {
6509 rbd_warn(NULL, "libceph incompatibility (quitting)");
6510 return -EINVAL;
6513 rc = rbd_slab_init();
6514 if (rc)
6515 return rc;
6518 * The number of active work items is limited by the number of
6519 * rbd devices * queue depth, so leave @max_active at default.
6521 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6522 if (!rbd_wq) {
6523 rc = -ENOMEM;
6524 goto err_out_slab;
6527 if (single_major) {
6528 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6529 if (rbd_major < 0) {
6530 rc = rbd_major;
6531 goto err_out_wq;
6535 rc = rbd_sysfs_init();
6536 if (rc)
6537 goto err_out_blkdev;
6539 if (single_major)
6540 pr_info("loaded (major %d)\n", rbd_major);
6541 else
6542 pr_info("loaded\n");
6544 return 0;
6546 err_out_blkdev:
6547 if (single_major)
6548 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6549 err_out_wq:
6550 destroy_workqueue(rbd_wq);
6551 err_out_slab:
6552 rbd_slab_exit();
6553 return rc;
6556 static void __exit rbd_exit(void)
6558 ida_destroy(&rbd_dev_id_ida);
6559 rbd_sysfs_cleanup();
6560 if (single_major)
6561 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6562 destroy_workqueue(rbd_wq);
6563 rbd_slab_exit();
6566 module_init(rbd_init);
6567 module_exit(rbd_exit);
6569 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6570 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6571 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6572 /* following authorship retained from original osdblk.c */
6573 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6575 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6576 MODULE_LICENSE("GPL");