3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
49 #include "rbd_types.h"
51 #define RBD_DEBUG /* Activate rbd_assert() calls */
54 * Increment the given counter and return its updated value.
55 * If the counter is already 0 it will not be incremented.
56 * If the counter is already at its maximum value returns
57 * -EINVAL without updating it.
59 static int atomic_inc_return_safe(atomic_t
*v
)
63 counter
= (unsigned int)__atomic_add_unless(v
, 1, 0);
64 if (counter
<= (unsigned int)INT_MAX
)
72 /* Decrement the counter. Return the resulting value, or -EINVAL */
73 static int atomic_dec_return_safe(atomic_t
*v
)
77 counter
= atomic_dec_return(v
);
86 #define RBD_DRV_NAME "rbd"
88 #define RBD_MINORS_PER_MAJOR 256
89 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
91 #define RBD_MAX_PARENT_CHAIN_LEN 16
93 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
94 #define RBD_MAX_SNAP_NAME_LEN \
95 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
99 #define RBD_SNAP_HEAD_NAME "-"
101 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
103 /* This allows a single page to hold an image name sent by OSD */
104 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
105 #define RBD_IMAGE_ID_LEN_MAX 64
107 #define RBD_OBJ_PREFIX_LEN_MAX 64
109 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */
110 #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
114 #define RBD_FEATURE_LAYERING (1<<0)
115 #define RBD_FEATURE_STRIPINGV2 (1<<1)
116 #define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
117 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
118 RBD_FEATURE_STRIPINGV2 | \
119 RBD_FEATURE_EXCLUSIVE_LOCK)
121 /* Features supported by this (client software) implementation. */
123 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
126 * An RBD device name will be "rbd#", where the "rbd" comes from
127 * RBD_DRV_NAME above, and # is a unique integer identifier.
129 #define DEV_NAME_LEN 32
132 * block device image metadata (in-memory version)
134 struct rbd_image_header
{
135 /* These six fields never change for a given rbd image */
142 u64 features
; /* Might be changeable someday? */
144 /* The remaining fields need to be updated occasionally */
146 struct ceph_snap_context
*snapc
;
147 char *snap_names
; /* format 1 only */
148 u64
*snap_sizes
; /* format 1 only */
152 * An rbd image specification.
154 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
155 * identify an image. Each rbd_dev structure includes a pointer to
156 * an rbd_spec structure that encapsulates this identity.
158 * Each of the id's in an rbd_spec has an associated name. For a
159 * user-mapped image, the names are supplied and the id's associated
160 * with them are looked up. For a layered image, a parent image is
161 * defined by the tuple, and the names are looked up.
163 * An rbd_dev structure contains a parent_spec pointer which is
164 * non-null if the image it represents is a child in a layered
165 * image. This pointer will refer to the rbd_spec structure used
166 * by the parent rbd_dev for its own identity (i.e., the structure
167 * is shared between the parent and child).
169 * Since these structures are populated once, during the discovery
170 * phase of image construction, they are effectively immutable so
171 * we make no effort to synchronize access to them.
173 * Note that code herein does not assume the image name is known (it
174 * could be a null pointer).
178 const char *pool_name
;
180 const char *image_id
;
181 const char *image_name
;
184 const char *snap_name
;
190 * an instance of the client. multiple devices may share an rbd client.
193 struct ceph_client
*client
;
195 struct list_head node
;
198 struct rbd_img_request
;
199 typedef void (*rbd_img_callback_t
)(struct rbd_img_request
*);
201 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203 struct rbd_obj_request
;
204 typedef void (*rbd_obj_callback_t
)(struct rbd_obj_request
*);
206 enum obj_request_type
{
207 OBJ_REQUEST_NODATA
, OBJ_REQUEST_BIO
, OBJ_REQUEST_PAGES
210 enum obj_operation_type
{
217 OBJ_REQ_DONE
, /* completion flag: not done = 0, done = 1 */
218 OBJ_REQ_IMG_DATA
, /* object usage: standalone = 0, image = 1 */
219 OBJ_REQ_KNOWN
, /* EXISTS flag valid: no = 0, yes = 1 */
220 OBJ_REQ_EXISTS
, /* target exists: no = 0, yes = 1 */
223 struct rbd_obj_request
{
224 const char *object_name
;
225 u64 offset
; /* object start byte */
226 u64 length
; /* bytes from offset */
230 * An object request associated with an image will have its
231 * img_data flag set; a standalone object request will not.
233 * A standalone object request will have which == BAD_WHICH
234 * and a null obj_request pointer.
236 * An object request initiated in support of a layered image
237 * object (to check for its existence before a write) will
238 * have which == BAD_WHICH and a non-null obj_request pointer.
240 * Finally, an object request for rbd image data will have
241 * which != BAD_WHICH, and will have a non-null img_request
242 * pointer. The value of which will be in the range
243 * 0..(img_request->obj_request_count-1).
246 struct rbd_obj_request
*obj_request
; /* STAT op */
248 struct rbd_img_request
*img_request
;
250 /* links for img_request->obj_requests list */
251 struct list_head links
;
254 u32 which
; /* posn image request list */
256 enum obj_request_type type
;
258 struct bio
*bio_list
;
264 struct page
**copyup_pages
;
265 u32 copyup_page_count
;
267 struct ceph_osd_request
*osd_req
;
269 u64 xferred
; /* bytes transferred */
272 rbd_obj_callback_t callback
;
273 struct completion completion
;
279 IMG_REQ_WRITE
, /* I/O direction: read = 0, write = 1 */
280 IMG_REQ_CHILD
, /* initiator: block = 0, child image = 1 */
281 IMG_REQ_LAYERED
, /* ENOENT handling: normal = 0, layered = 1 */
282 IMG_REQ_DISCARD
, /* discard: normal = 0, discard request = 1 */
285 struct rbd_img_request
{
286 struct rbd_device
*rbd_dev
;
287 u64 offset
; /* starting image byte offset */
288 u64 length
; /* byte count from offset */
291 u64 snap_id
; /* for reads */
292 struct ceph_snap_context
*snapc
; /* for writes */
295 struct request
*rq
; /* block request */
296 struct rbd_obj_request
*obj_request
; /* obj req initiator */
298 struct page
**copyup_pages
;
299 u32 copyup_page_count
;
300 spinlock_t completion_lock
;/* protects next_completion */
302 rbd_img_callback_t callback
;
303 u64 xferred
;/* aggregate bytes transferred */
304 int result
; /* first nonzero obj_request result */
306 u32 obj_request_count
;
307 struct list_head obj_requests
; /* rbd_obj_request structs */
312 #define for_each_obj_request(ireq, oreq) \
313 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
314 #define for_each_obj_request_from(ireq, oreq) \
315 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
316 #define for_each_obj_request_safe(ireq, oreq, n) \
317 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
319 enum rbd_watch_state
{
320 RBD_WATCH_STATE_UNREGISTERED
,
321 RBD_WATCH_STATE_REGISTERED
,
322 RBD_WATCH_STATE_ERROR
,
325 enum rbd_lock_state
{
326 RBD_LOCK_STATE_UNLOCKED
,
327 RBD_LOCK_STATE_LOCKED
,
328 RBD_LOCK_STATE_RELEASING
,
331 /* WatchNotify::ClientId */
332 struct rbd_client_id
{
347 int dev_id
; /* blkdev unique id */
349 int major
; /* blkdev assigned major */
351 struct gendisk
*disk
; /* blkdev's gendisk and rq */
353 u32 image_format
; /* Either 1 or 2 */
354 struct rbd_client
*rbd_client
;
356 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
358 spinlock_t lock
; /* queue, flags, open_count */
360 struct rbd_image_header header
;
361 unsigned long flags
; /* possibly lock protected */
362 struct rbd_spec
*spec
;
363 struct rbd_options
*opts
;
364 char *config_info
; /* add{,_single_major} string */
366 struct ceph_object_id header_oid
;
367 struct ceph_object_locator header_oloc
;
369 struct ceph_file_layout layout
; /* used for all rbd requests */
371 struct mutex watch_mutex
;
372 enum rbd_watch_state watch_state
;
373 struct ceph_osd_linger_request
*watch_handle
;
375 struct delayed_work watch_dwork
;
377 struct rw_semaphore lock_rwsem
;
378 enum rbd_lock_state lock_state
;
379 struct rbd_client_id owner_cid
;
380 struct work_struct acquired_lock_work
;
381 struct work_struct released_lock_work
;
382 struct delayed_work lock_dwork
;
383 struct work_struct unlock_work
;
384 wait_queue_head_t lock_waitq
;
386 struct workqueue_struct
*task_wq
;
388 struct rbd_spec
*parent_spec
;
391 struct rbd_device
*parent
;
393 /* Block layer tags. */
394 struct blk_mq_tag_set tag_set
;
396 /* protects updating the header */
397 struct rw_semaphore header_rwsem
;
399 struct rbd_mapping mapping
;
401 struct list_head node
;
405 unsigned long open_count
; /* protected by lock */
409 * Flag bits for rbd_dev->flags:
410 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
412 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
415 RBD_DEV_FLAG_EXISTS
, /* mapped snapshot has not been deleted */
416 RBD_DEV_FLAG_REMOVING
, /* this mapping is being removed */
417 RBD_DEV_FLAG_BLACKLISTED
, /* our ceph_client is blacklisted */
420 static DEFINE_MUTEX(client_mutex
); /* Serialize client creation */
422 static LIST_HEAD(rbd_dev_list
); /* devices */
423 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
425 static LIST_HEAD(rbd_client_list
); /* clients */
426 static DEFINE_SPINLOCK(rbd_client_list_lock
);
428 /* Slab caches for frequently-allocated structures */
430 static struct kmem_cache
*rbd_img_request_cache
;
431 static struct kmem_cache
*rbd_obj_request_cache
;
432 static struct kmem_cache
*rbd_segment_name_cache
;
434 static int rbd_major
;
435 static DEFINE_IDA(rbd_dev_id_ida
);
437 static struct workqueue_struct
*rbd_wq
;
440 * Default to false for now, as single-major requires >= 0.75 version of
441 * userspace rbd utility.
443 static bool single_major
= false;
444 module_param(single_major
, bool, S_IRUGO
);
445 MODULE_PARM_DESC(single_major
, "Use a single major number for all rbd devices (default: false)");
447 static int rbd_img_request_submit(struct rbd_img_request
*img_request
);
449 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
451 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
453 static ssize_t
rbd_add_single_major(struct bus_type
*bus
, const char *buf
,
455 static ssize_t
rbd_remove_single_major(struct bus_type
*bus
, const char *buf
,
457 static int rbd_dev_image_probe(struct rbd_device
*rbd_dev
, int depth
);
458 static void rbd_spec_put(struct rbd_spec
*spec
);
460 static int rbd_dev_id_to_minor(int dev_id
)
462 return dev_id
<< RBD_SINGLE_MAJOR_PART_SHIFT
;
465 static int minor_to_rbd_dev_id(int minor
)
467 return minor
>> RBD_SINGLE_MAJOR_PART_SHIFT
;
470 static bool rbd_is_lock_supported(struct rbd_device
*rbd_dev
)
472 return (rbd_dev
->header
.features
& RBD_FEATURE_EXCLUSIVE_LOCK
) &&
473 rbd_dev
->spec
->snap_id
== CEPH_NOSNAP
&&
474 !rbd_dev
->mapping
.read_only
;
477 static bool __rbd_is_lock_owner(struct rbd_device
*rbd_dev
)
479 return rbd_dev
->lock_state
== RBD_LOCK_STATE_LOCKED
||
480 rbd_dev
->lock_state
== RBD_LOCK_STATE_RELEASING
;
483 static bool rbd_is_lock_owner(struct rbd_device
*rbd_dev
)
487 down_read(&rbd_dev
->lock_rwsem
);
488 is_lock_owner
= __rbd_is_lock_owner(rbd_dev
);
489 up_read(&rbd_dev
->lock_rwsem
);
490 return is_lock_owner
;
493 static BUS_ATTR(add
, S_IWUSR
, NULL
, rbd_add
);
494 static BUS_ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
);
495 static BUS_ATTR(add_single_major
, S_IWUSR
, NULL
, rbd_add_single_major
);
496 static BUS_ATTR(remove_single_major
, S_IWUSR
, NULL
, rbd_remove_single_major
);
498 static struct attribute
*rbd_bus_attrs
[] = {
500 &bus_attr_remove
.attr
,
501 &bus_attr_add_single_major
.attr
,
502 &bus_attr_remove_single_major
.attr
,
506 static umode_t
rbd_bus_is_visible(struct kobject
*kobj
,
507 struct attribute
*attr
, int index
)
510 (attr
== &bus_attr_add_single_major
.attr
||
511 attr
== &bus_attr_remove_single_major
.attr
))
517 static const struct attribute_group rbd_bus_group
= {
518 .attrs
= rbd_bus_attrs
,
519 .is_visible
= rbd_bus_is_visible
,
521 __ATTRIBUTE_GROUPS(rbd_bus
);
523 static struct bus_type rbd_bus_type
= {
525 .bus_groups
= rbd_bus_groups
,
528 static void rbd_root_dev_release(struct device
*dev
)
532 static struct device rbd_root_dev
= {
534 .release
= rbd_root_dev_release
,
537 static __printf(2, 3)
538 void rbd_warn(struct rbd_device
*rbd_dev
, const char *fmt
, ...)
540 struct va_format vaf
;
548 printk(KERN_WARNING
"%s: %pV\n", RBD_DRV_NAME
, &vaf
);
549 else if (rbd_dev
->disk
)
550 printk(KERN_WARNING
"%s: %s: %pV\n",
551 RBD_DRV_NAME
, rbd_dev
->disk
->disk_name
, &vaf
);
552 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_name
)
553 printk(KERN_WARNING
"%s: image %s: %pV\n",
554 RBD_DRV_NAME
, rbd_dev
->spec
->image_name
, &vaf
);
555 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_id
)
556 printk(KERN_WARNING
"%s: id %s: %pV\n",
557 RBD_DRV_NAME
, rbd_dev
->spec
->image_id
, &vaf
);
559 printk(KERN_WARNING
"%s: rbd_dev %p: %pV\n",
560 RBD_DRV_NAME
, rbd_dev
, &vaf
);
565 #define rbd_assert(expr) \
566 if (unlikely(!(expr))) { \
567 printk(KERN_ERR "\nAssertion failure in %s() " \
569 "\trbd_assert(%s);\n\n", \
570 __func__, __LINE__, #expr); \
573 #else /* !RBD_DEBUG */
574 # define rbd_assert(expr) ((void) 0)
575 #endif /* !RBD_DEBUG */
577 static void rbd_osd_copyup_callback(struct rbd_obj_request
*obj_request
);
578 static int rbd_img_obj_request_submit(struct rbd_obj_request
*obj_request
);
579 static void rbd_img_parent_read(struct rbd_obj_request
*obj_request
);
580 static void rbd_dev_remove_parent(struct rbd_device
*rbd_dev
);
582 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
);
583 static int rbd_dev_v2_header_onetime(struct rbd_device
*rbd_dev
);
584 static int rbd_dev_header_info(struct rbd_device
*rbd_dev
);
585 static int rbd_dev_v2_parent_info(struct rbd_device
*rbd_dev
);
586 static const char *rbd_dev_v2_snap_name(struct rbd_device
*rbd_dev
,
588 static int _rbd_dev_v2_snap_size(struct rbd_device
*rbd_dev
, u64 snap_id
,
589 u8
*order
, u64
*snap_size
);
590 static int _rbd_dev_v2_snap_features(struct rbd_device
*rbd_dev
, u64 snap_id
,
593 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
595 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
596 bool removing
= false;
598 if ((mode
& FMODE_WRITE
) && rbd_dev
->mapping
.read_only
)
601 spin_lock_irq(&rbd_dev
->lock
);
602 if (test_bit(RBD_DEV_FLAG_REMOVING
, &rbd_dev
->flags
))
605 rbd_dev
->open_count
++;
606 spin_unlock_irq(&rbd_dev
->lock
);
610 (void) get_device(&rbd_dev
->dev
);
615 static void rbd_release(struct gendisk
*disk
, fmode_t mode
)
617 struct rbd_device
*rbd_dev
= disk
->private_data
;
618 unsigned long open_count_before
;
620 spin_lock_irq(&rbd_dev
->lock
);
621 open_count_before
= rbd_dev
->open_count
--;
622 spin_unlock_irq(&rbd_dev
->lock
);
623 rbd_assert(open_count_before
> 0);
625 put_device(&rbd_dev
->dev
);
628 static int rbd_ioctl_set_ro(struct rbd_device
*rbd_dev
, unsigned long arg
)
633 bool ro_changed
= false;
635 /* get_user() may sleep, so call it before taking rbd_dev->lock */
636 if (get_user(val
, (int __user
*)(arg
)))
639 ro
= val
? true : false;
640 /* Snapshot doesn't allow to write*/
641 if (rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
&& !ro
)
644 spin_lock_irq(&rbd_dev
->lock
);
645 /* prevent others open this device */
646 if (rbd_dev
->open_count
> 1) {
651 if (rbd_dev
->mapping
.read_only
!= ro
) {
652 rbd_dev
->mapping
.read_only
= ro
;
657 spin_unlock_irq(&rbd_dev
->lock
);
658 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
659 if (ret
== 0 && ro_changed
)
660 set_disk_ro(rbd_dev
->disk
, ro
? 1 : 0);
665 static int rbd_ioctl(struct block_device
*bdev
, fmode_t mode
,
666 unsigned int cmd
, unsigned long arg
)
668 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
673 ret
= rbd_ioctl_set_ro(rbd_dev
, arg
);
683 static int rbd_compat_ioctl(struct block_device
*bdev
, fmode_t mode
,
684 unsigned int cmd
, unsigned long arg
)
686 return rbd_ioctl(bdev
, mode
, cmd
, arg
);
688 #endif /* CONFIG_COMPAT */
690 static const struct block_device_operations rbd_bd_ops
= {
691 .owner
= THIS_MODULE
,
693 .release
= rbd_release
,
696 .compat_ioctl
= rbd_compat_ioctl
,
701 * Initialize an rbd client instance. Success or not, this function
702 * consumes ceph_opts. Caller holds client_mutex.
704 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
706 struct rbd_client
*rbdc
;
709 dout("%s:\n", __func__
);
710 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
714 kref_init(&rbdc
->kref
);
715 INIT_LIST_HEAD(&rbdc
->node
);
717 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
718 if (IS_ERR(rbdc
->client
))
720 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
722 ret
= ceph_open_session(rbdc
->client
);
726 spin_lock(&rbd_client_list_lock
);
727 list_add_tail(&rbdc
->node
, &rbd_client_list
);
728 spin_unlock(&rbd_client_list_lock
);
730 dout("%s: rbdc %p\n", __func__
, rbdc
);
734 ceph_destroy_client(rbdc
->client
);
739 ceph_destroy_options(ceph_opts
);
740 dout("%s: error %d\n", __func__
, ret
);
745 static struct rbd_client
*__rbd_get_client(struct rbd_client
*rbdc
)
747 kref_get(&rbdc
->kref
);
753 * Find a ceph client with specific addr and configuration. If
754 * found, bump its reference count.
756 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
758 struct rbd_client
*client_node
;
761 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
764 spin_lock(&rbd_client_list_lock
);
765 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
766 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
767 __rbd_get_client(client_node
);
773 spin_unlock(&rbd_client_list_lock
);
775 return found
? client_node
: NULL
;
779 * (Per device) rbd map options
786 /* string args above */
793 static match_table_t rbd_opts_tokens
= {
794 {Opt_queue_depth
, "queue_depth=%d"},
796 /* string args above */
797 {Opt_read_only
, "read_only"},
798 {Opt_read_only
, "ro"}, /* Alternate spelling */
799 {Opt_read_write
, "read_write"},
800 {Opt_read_write
, "rw"}, /* Alternate spelling */
801 {Opt_lock_on_read
, "lock_on_read"},
811 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
812 #define RBD_READ_ONLY_DEFAULT false
813 #define RBD_LOCK_ON_READ_DEFAULT false
815 static int parse_rbd_opts_token(char *c
, void *private)
817 struct rbd_options
*rbd_opts
= private;
818 substring_t argstr
[MAX_OPT_ARGS
];
819 int token
, intval
, ret
;
821 token
= match_token(c
, rbd_opts_tokens
, argstr
);
822 if (token
< Opt_last_int
) {
823 ret
= match_int(&argstr
[0], &intval
);
825 pr_err("bad mount option arg (not int) at '%s'\n", c
);
828 dout("got int token %d val %d\n", token
, intval
);
829 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
830 dout("got string token %d val %s\n", token
, argstr
[0].from
);
832 dout("got token %d\n", token
);
836 case Opt_queue_depth
:
838 pr_err("queue_depth out of range\n");
841 rbd_opts
->queue_depth
= intval
;
844 rbd_opts
->read_only
= true;
847 rbd_opts
->read_only
= false;
849 case Opt_lock_on_read
:
850 rbd_opts
->lock_on_read
= true;
853 /* libceph prints "bad option" msg */
860 static char* obj_op_name(enum obj_operation_type op_type
)
875 * Get a ceph client with specific addr and configuration, if one does
876 * not exist create it. Either way, ceph_opts is consumed by this
879 static struct rbd_client
*rbd_get_client(struct ceph_options
*ceph_opts
)
881 struct rbd_client
*rbdc
;
883 mutex_lock_nested(&client_mutex
, SINGLE_DEPTH_NESTING
);
884 rbdc
= rbd_client_find(ceph_opts
);
885 if (rbdc
) /* using an existing client */
886 ceph_destroy_options(ceph_opts
);
888 rbdc
= rbd_client_create(ceph_opts
);
889 mutex_unlock(&client_mutex
);
895 * Destroy ceph client
897 * Caller must hold rbd_client_list_lock.
899 static void rbd_client_release(struct kref
*kref
)
901 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
903 dout("%s: rbdc %p\n", __func__
, rbdc
);
904 spin_lock(&rbd_client_list_lock
);
905 list_del(&rbdc
->node
);
906 spin_unlock(&rbd_client_list_lock
);
908 ceph_destroy_client(rbdc
->client
);
913 * Drop reference to ceph client node. If it's not referenced anymore, release
916 static void rbd_put_client(struct rbd_client
*rbdc
)
919 kref_put(&rbdc
->kref
, rbd_client_release
);
922 static bool rbd_image_format_valid(u32 image_format
)
924 return image_format
== 1 || image_format
== 2;
927 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
932 /* The header has to start with the magic rbd header text */
933 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
936 /* The bio layer requires at least sector-sized I/O */
938 if (ondisk
->options
.order
< SECTOR_SHIFT
)
941 /* If we use u64 in a few spots we may be able to loosen this */
943 if (ondisk
->options
.order
> 8 * sizeof (int) - 1)
947 * The size of a snapshot header has to fit in a size_t, and
948 * that limits the number of snapshots.
950 snap_count
= le32_to_cpu(ondisk
->snap_count
);
951 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
952 if (snap_count
> size
/ sizeof (__le64
))
956 * Not only that, but the size of the entire the snapshot
957 * header must also be representable in a size_t.
959 size
-= snap_count
* sizeof (__le64
);
960 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
967 * Fill an rbd image header with information from the given format 1
970 static int rbd_header_from_disk(struct rbd_device
*rbd_dev
,
971 struct rbd_image_header_ondisk
*ondisk
)
973 struct rbd_image_header
*header
= &rbd_dev
->header
;
974 bool first_time
= header
->object_prefix
== NULL
;
975 struct ceph_snap_context
*snapc
;
976 char *object_prefix
= NULL
;
977 char *snap_names
= NULL
;
978 u64
*snap_sizes
= NULL
;
983 /* Allocate this now to avoid having to handle failure below */
988 len
= strnlen(ondisk
->object_prefix
,
989 sizeof (ondisk
->object_prefix
));
990 object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
993 memcpy(object_prefix
, ondisk
->object_prefix
, len
);
994 object_prefix
[len
] = '\0';
997 /* Allocate the snapshot context and fill it in */
999 snap_count
= le32_to_cpu(ondisk
->snap_count
);
1000 snapc
= ceph_create_snap_context(snap_count
, GFP_KERNEL
);
1003 snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
1005 struct rbd_image_snap_ondisk
*snaps
;
1006 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
1008 /* We'll keep a copy of the snapshot names... */
1010 if (snap_names_len
> (u64
)SIZE_MAX
)
1012 snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
1016 /* ...as well as the array of their sizes. */
1017 snap_sizes
= kmalloc_array(snap_count
,
1018 sizeof(*header
->snap_sizes
),
1024 * Copy the names, and fill in each snapshot's id
1027 * Note that rbd_dev_v1_header_info() guarantees the
1028 * ondisk buffer we're working with has
1029 * snap_names_len bytes beyond the end of the
1030 * snapshot id array, this memcpy() is safe.
1032 memcpy(snap_names
, &ondisk
->snaps
[snap_count
], snap_names_len
);
1033 snaps
= ondisk
->snaps
;
1034 for (i
= 0; i
< snap_count
; i
++) {
1035 snapc
->snaps
[i
] = le64_to_cpu(snaps
[i
].id
);
1036 snap_sizes
[i
] = le64_to_cpu(snaps
[i
].image_size
);
1040 /* We won't fail any more, fill in the header */
1043 header
->object_prefix
= object_prefix
;
1044 header
->obj_order
= ondisk
->options
.order
;
1045 header
->crypt_type
= ondisk
->options
.crypt_type
;
1046 header
->comp_type
= ondisk
->options
.comp_type
;
1047 /* The rest aren't used for format 1 images */
1048 header
->stripe_unit
= 0;
1049 header
->stripe_count
= 0;
1050 header
->features
= 0;
1052 ceph_put_snap_context(header
->snapc
);
1053 kfree(header
->snap_names
);
1054 kfree(header
->snap_sizes
);
1057 /* The remaining fields always get updated (when we refresh) */
1059 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
1060 header
->snapc
= snapc
;
1061 header
->snap_names
= snap_names
;
1062 header
->snap_sizes
= snap_sizes
;
1070 ceph_put_snap_context(snapc
);
1071 kfree(object_prefix
);
1076 static const char *_rbd_dev_v1_snap_name(struct rbd_device
*rbd_dev
, u32 which
)
1078 const char *snap_name
;
1080 rbd_assert(which
< rbd_dev
->header
.snapc
->num_snaps
);
1082 /* Skip over names until we find the one we are looking for */
1084 snap_name
= rbd_dev
->header
.snap_names
;
1086 snap_name
+= strlen(snap_name
) + 1;
1088 return kstrdup(snap_name
, GFP_KERNEL
);
1092 * Snapshot id comparison function for use with qsort()/bsearch().
1093 * Note that result is for snapshots in *descending* order.
1095 static int snapid_compare_reverse(const void *s1
, const void *s2
)
1097 u64 snap_id1
= *(u64
*)s1
;
1098 u64 snap_id2
= *(u64
*)s2
;
1100 if (snap_id1
< snap_id2
)
1102 return snap_id1
== snap_id2
? 0 : -1;
1106 * Search a snapshot context to see if the given snapshot id is
1109 * Returns the position of the snapshot id in the array if it's found,
1110 * or BAD_SNAP_INDEX otherwise.
1112 * Note: The snapshot array is in kept sorted (by the osd) in
1113 * reverse order, highest snapshot id first.
1115 static u32
rbd_dev_snap_index(struct rbd_device
*rbd_dev
, u64 snap_id
)
1117 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
1120 found
= bsearch(&snap_id
, &snapc
->snaps
, snapc
->num_snaps
,
1121 sizeof (snap_id
), snapid_compare_reverse
);
1123 return found
? (u32
)(found
- &snapc
->snaps
[0]) : BAD_SNAP_INDEX
;
1126 static const char *rbd_dev_v1_snap_name(struct rbd_device
*rbd_dev
,
1130 const char *snap_name
;
1132 which
= rbd_dev_snap_index(rbd_dev
, snap_id
);
1133 if (which
== BAD_SNAP_INDEX
)
1134 return ERR_PTR(-ENOENT
);
1136 snap_name
= _rbd_dev_v1_snap_name(rbd_dev
, which
);
1137 return snap_name
? snap_name
: ERR_PTR(-ENOMEM
);
1140 static const char *rbd_snap_name(struct rbd_device
*rbd_dev
, u64 snap_id
)
1142 if (snap_id
== CEPH_NOSNAP
)
1143 return RBD_SNAP_HEAD_NAME
;
1145 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
1146 if (rbd_dev
->image_format
== 1)
1147 return rbd_dev_v1_snap_name(rbd_dev
, snap_id
);
1149 return rbd_dev_v2_snap_name(rbd_dev
, snap_id
);
1152 static int rbd_snap_size(struct rbd_device
*rbd_dev
, u64 snap_id
,
1155 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
1156 if (snap_id
== CEPH_NOSNAP
) {
1157 *snap_size
= rbd_dev
->header
.image_size
;
1158 } else if (rbd_dev
->image_format
== 1) {
1161 which
= rbd_dev_snap_index(rbd_dev
, snap_id
);
1162 if (which
== BAD_SNAP_INDEX
)
1165 *snap_size
= rbd_dev
->header
.snap_sizes
[which
];
1170 ret
= _rbd_dev_v2_snap_size(rbd_dev
, snap_id
, NULL
, &size
);
1179 static int rbd_snap_features(struct rbd_device
*rbd_dev
, u64 snap_id
,
1182 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
1183 if (snap_id
== CEPH_NOSNAP
) {
1184 *snap_features
= rbd_dev
->header
.features
;
1185 } else if (rbd_dev
->image_format
== 1) {
1186 *snap_features
= 0; /* No features for format 1 */
1191 ret
= _rbd_dev_v2_snap_features(rbd_dev
, snap_id
, &features
);
1195 *snap_features
= features
;
1200 static int rbd_dev_mapping_set(struct rbd_device
*rbd_dev
)
1202 u64 snap_id
= rbd_dev
->spec
->snap_id
;
1207 ret
= rbd_snap_size(rbd_dev
, snap_id
, &size
);
1210 ret
= rbd_snap_features(rbd_dev
, snap_id
, &features
);
1214 rbd_dev
->mapping
.size
= size
;
1215 rbd_dev
->mapping
.features
= features
;
1220 static void rbd_dev_mapping_clear(struct rbd_device
*rbd_dev
)
1222 rbd_dev
->mapping
.size
= 0;
1223 rbd_dev
->mapping
.features
= 0;
1226 static void rbd_segment_name_free(const char *name
)
1228 /* The explicit cast here is needed to drop the const qualifier */
1230 kmem_cache_free(rbd_segment_name_cache
, (void *)name
);
1233 static const char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
1240 name
= kmem_cache_alloc(rbd_segment_name_cache
, GFP_NOIO
);
1243 segment
= offset
>> rbd_dev
->header
.obj_order
;
1244 name_format
= "%s.%012llx";
1245 if (rbd_dev
->image_format
== 2)
1246 name_format
= "%s.%016llx";
1247 ret
= snprintf(name
, CEPH_MAX_OID_NAME_LEN
+ 1, name_format
,
1248 rbd_dev
->header
.object_prefix
, segment
);
1249 if (ret
< 0 || ret
> CEPH_MAX_OID_NAME_LEN
) {
1250 pr_err("error formatting segment name for #%llu (%d)\n",
1252 rbd_segment_name_free(name
);
1259 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
1261 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
1263 return offset
& (segment_size
- 1);
1266 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
1267 u64 offset
, u64 length
)
1269 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
1271 offset
&= segment_size
- 1;
1273 rbd_assert(length
<= U64_MAX
- offset
);
1274 if (offset
+ length
> segment_size
)
1275 length
= segment_size
- offset
;
1281 * returns the size of an object in the image
1283 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
1285 return 1 << header
->obj_order
;
1292 static void bio_chain_put(struct bio
*chain
)
1298 chain
= chain
->bi_next
;
1304 * zeros a bio chain, starting at specific offset
1306 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
1309 struct bvec_iter iter
;
1310 unsigned long flags
;
1315 bio_for_each_segment(bv
, chain
, iter
) {
1316 if (pos
+ bv
.bv_len
> start_ofs
) {
1317 int remainder
= max(start_ofs
- pos
, 0);
1318 buf
= bvec_kmap_irq(&bv
, &flags
);
1319 memset(buf
+ remainder
, 0,
1320 bv
.bv_len
- remainder
);
1321 flush_dcache_page(bv
.bv_page
);
1322 bvec_kunmap_irq(buf
, &flags
);
1327 chain
= chain
->bi_next
;
1332 * similar to zero_bio_chain(), zeros data defined by a page array,
1333 * starting at the given byte offset from the start of the array and
1334 * continuing up to the given end offset. The pages array is
1335 * assumed to be big enough to hold all bytes up to the end.
1337 static void zero_pages(struct page
**pages
, u64 offset
, u64 end
)
1339 struct page
**page
= &pages
[offset
>> PAGE_SHIFT
];
1341 rbd_assert(end
> offset
);
1342 rbd_assert(end
- offset
<= (u64
)SIZE_MAX
);
1343 while (offset
< end
) {
1346 unsigned long flags
;
1349 page_offset
= offset
& ~PAGE_MASK
;
1350 length
= min_t(size_t, PAGE_SIZE
- page_offset
, end
- offset
);
1351 local_irq_save(flags
);
1352 kaddr
= kmap_atomic(*page
);
1353 memset(kaddr
+ page_offset
, 0, length
);
1354 flush_dcache_page(*page
);
1355 kunmap_atomic(kaddr
);
1356 local_irq_restore(flags
);
1364 * Clone a portion of a bio, starting at the given byte offset
1365 * and continuing for the number of bytes indicated.
1367 static struct bio
*bio_clone_range(struct bio
*bio_src
,
1368 unsigned int offset
,
1374 bio
= bio_clone(bio_src
, gfpmask
);
1376 return NULL
; /* ENOMEM */
1378 bio_advance(bio
, offset
);
1379 bio
->bi_iter
.bi_size
= len
;
1385 * Clone a portion of a bio chain, starting at the given byte offset
1386 * into the first bio in the source chain and continuing for the
1387 * number of bytes indicated. The result is another bio chain of
1388 * exactly the given length, or a null pointer on error.
1390 * The bio_src and offset parameters are both in-out. On entry they
1391 * refer to the first source bio and the offset into that bio where
1392 * the start of data to be cloned is located.
1394 * On return, bio_src is updated to refer to the bio in the source
1395 * chain that contains first un-cloned byte, and *offset will
1396 * contain the offset of that byte within that bio.
1398 static struct bio
*bio_chain_clone_range(struct bio
**bio_src
,
1399 unsigned int *offset
,
1403 struct bio
*bi
= *bio_src
;
1404 unsigned int off
= *offset
;
1405 struct bio
*chain
= NULL
;
1408 /* Build up a chain of clone bios up to the limit */
1410 if (!bi
|| off
>= bi
->bi_iter
.bi_size
|| !len
)
1411 return NULL
; /* Nothing to clone */
1415 unsigned int bi_size
;
1419 rbd_warn(NULL
, "bio_chain exhausted with %u left", len
);
1420 goto out_err
; /* EINVAL; ran out of bio's */
1422 bi_size
= min_t(unsigned int, bi
->bi_iter
.bi_size
- off
, len
);
1423 bio
= bio_clone_range(bi
, off
, bi_size
, gfpmask
);
1425 goto out_err
; /* ENOMEM */
1428 end
= &bio
->bi_next
;
1431 if (off
== bi
->bi_iter
.bi_size
) {
1442 bio_chain_put(chain
);
1448 * The default/initial value for all object request flags is 0. For
1449 * each flag, once its value is set to 1 it is never reset to 0
1452 static void obj_request_img_data_set(struct rbd_obj_request
*obj_request
)
1454 if (test_and_set_bit(OBJ_REQ_IMG_DATA
, &obj_request
->flags
)) {
1455 struct rbd_device
*rbd_dev
;
1457 rbd_dev
= obj_request
->img_request
->rbd_dev
;
1458 rbd_warn(rbd_dev
, "obj_request %p already marked img_data",
1463 static bool obj_request_img_data_test(struct rbd_obj_request
*obj_request
)
1466 return test_bit(OBJ_REQ_IMG_DATA
, &obj_request
->flags
) != 0;
1469 static void obj_request_done_set(struct rbd_obj_request
*obj_request
)
1471 if (test_and_set_bit(OBJ_REQ_DONE
, &obj_request
->flags
)) {
1472 struct rbd_device
*rbd_dev
= NULL
;
1474 if (obj_request_img_data_test(obj_request
))
1475 rbd_dev
= obj_request
->img_request
->rbd_dev
;
1476 rbd_warn(rbd_dev
, "obj_request %p already marked done",
1481 static bool obj_request_done_test(struct rbd_obj_request
*obj_request
)
1484 return test_bit(OBJ_REQ_DONE
, &obj_request
->flags
) != 0;
1488 * This sets the KNOWN flag after (possibly) setting the EXISTS
1489 * flag. The latter is set based on the "exists" value provided.
1491 * Note that for our purposes once an object exists it never goes
1492 * away again. It's possible that the response from two existence
1493 * checks are separated by the creation of the target object, and
1494 * the first ("doesn't exist") response arrives *after* the second
1495 * ("does exist"). In that case we ignore the second one.
1497 static void obj_request_existence_set(struct rbd_obj_request
*obj_request
,
1501 set_bit(OBJ_REQ_EXISTS
, &obj_request
->flags
);
1502 set_bit(OBJ_REQ_KNOWN
, &obj_request
->flags
);
1506 static bool obj_request_known_test(struct rbd_obj_request
*obj_request
)
1509 return test_bit(OBJ_REQ_KNOWN
, &obj_request
->flags
) != 0;
1512 static bool obj_request_exists_test(struct rbd_obj_request
*obj_request
)
1515 return test_bit(OBJ_REQ_EXISTS
, &obj_request
->flags
) != 0;
1518 static bool obj_request_overlaps_parent(struct rbd_obj_request
*obj_request
)
1520 struct rbd_device
*rbd_dev
= obj_request
->img_request
->rbd_dev
;
1522 return obj_request
->img_offset
<
1523 round_up(rbd_dev
->parent_overlap
, rbd_obj_bytes(&rbd_dev
->header
));
1526 static void rbd_obj_request_get(struct rbd_obj_request
*obj_request
)
1528 dout("%s: obj %p (was %d)\n", __func__
, obj_request
,
1529 atomic_read(&obj_request
->kref
.refcount
));
1530 kref_get(&obj_request
->kref
);
1533 static void rbd_obj_request_destroy(struct kref
*kref
);
1534 static void rbd_obj_request_put(struct rbd_obj_request
*obj_request
)
1536 rbd_assert(obj_request
!= NULL
);
1537 dout("%s: obj %p (was %d)\n", __func__
, obj_request
,
1538 atomic_read(&obj_request
->kref
.refcount
));
1539 kref_put(&obj_request
->kref
, rbd_obj_request_destroy
);
1542 static void rbd_img_request_get(struct rbd_img_request
*img_request
)
1544 dout("%s: img %p (was %d)\n", __func__
, img_request
,
1545 atomic_read(&img_request
->kref
.refcount
));
1546 kref_get(&img_request
->kref
);
1549 static bool img_request_child_test(struct rbd_img_request
*img_request
);
1550 static void rbd_parent_request_destroy(struct kref
*kref
);
1551 static void rbd_img_request_destroy(struct kref
*kref
);
1552 static void rbd_img_request_put(struct rbd_img_request
*img_request
)
1554 rbd_assert(img_request
!= NULL
);
1555 dout("%s: img %p (was %d)\n", __func__
, img_request
,
1556 atomic_read(&img_request
->kref
.refcount
));
1557 if (img_request_child_test(img_request
))
1558 kref_put(&img_request
->kref
, rbd_parent_request_destroy
);
1560 kref_put(&img_request
->kref
, rbd_img_request_destroy
);
1563 static inline void rbd_img_obj_request_add(struct rbd_img_request
*img_request
,
1564 struct rbd_obj_request
*obj_request
)
1566 rbd_assert(obj_request
->img_request
== NULL
);
1568 /* Image request now owns object's original reference */
1569 obj_request
->img_request
= img_request
;
1570 obj_request
->which
= img_request
->obj_request_count
;
1571 rbd_assert(!obj_request_img_data_test(obj_request
));
1572 obj_request_img_data_set(obj_request
);
1573 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1574 img_request
->obj_request_count
++;
1575 list_add_tail(&obj_request
->links
, &img_request
->obj_requests
);
1576 dout("%s: img %p obj %p w=%u\n", __func__
, img_request
, obj_request
,
1577 obj_request
->which
);
1580 static inline void rbd_img_obj_request_del(struct rbd_img_request
*img_request
,
1581 struct rbd_obj_request
*obj_request
)
1583 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1585 dout("%s: img %p obj %p w=%u\n", __func__
, img_request
, obj_request
,
1586 obj_request
->which
);
1587 list_del(&obj_request
->links
);
1588 rbd_assert(img_request
->obj_request_count
> 0);
1589 img_request
->obj_request_count
--;
1590 rbd_assert(obj_request
->which
== img_request
->obj_request_count
);
1591 obj_request
->which
= BAD_WHICH
;
1592 rbd_assert(obj_request_img_data_test(obj_request
));
1593 rbd_assert(obj_request
->img_request
== img_request
);
1594 obj_request
->img_request
= NULL
;
1595 obj_request
->callback
= NULL
;
1596 rbd_obj_request_put(obj_request
);
1599 static bool obj_request_type_valid(enum obj_request_type type
)
1602 case OBJ_REQUEST_NODATA
:
1603 case OBJ_REQUEST_BIO
:
1604 case OBJ_REQUEST_PAGES
:
1611 static void rbd_img_obj_callback(struct rbd_obj_request
*obj_request
);
1613 static void rbd_obj_request_submit(struct rbd_obj_request
*obj_request
)
1615 struct ceph_osd_request
*osd_req
= obj_request
->osd_req
;
1617 dout("%s %p osd_req %p\n", __func__
, obj_request
, osd_req
);
1618 if (obj_request_img_data_test(obj_request
)) {
1619 WARN_ON(obj_request
->callback
!= rbd_img_obj_callback
);
1620 rbd_img_request_get(obj_request
->img_request
);
1622 ceph_osdc_start_request(osd_req
->r_osdc
, osd_req
, false);
1625 static void rbd_obj_request_end(struct rbd_obj_request
*obj_request
)
1627 dout("%s %p\n", __func__
, obj_request
);
1628 ceph_osdc_cancel_request(obj_request
->osd_req
);
1632 * Wait for an object request to complete. If interrupted, cancel the
1633 * underlying osd request.
1635 * @timeout: in jiffies, 0 means "wait forever"
1637 static int __rbd_obj_request_wait(struct rbd_obj_request
*obj_request
,
1638 unsigned long timeout
)
1642 dout("%s %p\n", __func__
, obj_request
);
1643 ret
= wait_for_completion_interruptible_timeout(
1644 &obj_request
->completion
,
1645 ceph_timeout_jiffies(timeout
));
1649 rbd_obj_request_end(obj_request
);
1654 dout("%s %p ret %d\n", __func__
, obj_request
, (int)ret
);
1658 static int rbd_obj_request_wait(struct rbd_obj_request
*obj_request
)
1660 return __rbd_obj_request_wait(obj_request
, 0);
1663 static void rbd_img_request_complete(struct rbd_img_request
*img_request
)
1666 dout("%s: img %p\n", __func__
, img_request
);
1669 * If no error occurred, compute the aggregate transfer
1670 * count for the image request. We could instead use
1671 * atomic64_cmpxchg() to update it as each object request
1672 * completes; not clear which way is better off hand.
1674 if (!img_request
->result
) {
1675 struct rbd_obj_request
*obj_request
;
1678 for_each_obj_request(img_request
, obj_request
)
1679 xferred
+= obj_request
->xferred
;
1680 img_request
->xferred
= xferred
;
1683 if (img_request
->callback
)
1684 img_request
->callback(img_request
);
1686 rbd_img_request_put(img_request
);
1690 * The default/initial value for all image request flags is 0. Each
1691 * is conditionally set to 1 at image request initialization time
1692 * and currently never change thereafter.
1694 static void img_request_write_set(struct rbd_img_request
*img_request
)
1696 set_bit(IMG_REQ_WRITE
, &img_request
->flags
);
1700 static bool img_request_write_test(struct rbd_img_request
*img_request
)
1703 return test_bit(IMG_REQ_WRITE
, &img_request
->flags
) != 0;
1707 * Set the discard flag when the img_request is an discard request
1709 static void img_request_discard_set(struct rbd_img_request
*img_request
)
1711 set_bit(IMG_REQ_DISCARD
, &img_request
->flags
);
1715 static bool img_request_discard_test(struct rbd_img_request
*img_request
)
1718 return test_bit(IMG_REQ_DISCARD
, &img_request
->flags
) != 0;
1721 static void img_request_child_set(struct rbd_img_request
*img_request
)
1723 set_bit(IMG_REQ_CHILD
, &img_request
->flags
);
1727 static void img_request_child_clear(struct rbd_img_request
*img_request
)
1729 clear_bit(IMG_REQ_CHILD
, &img_request
->flags
);
1733 static bool img_request_child_test(struct rbd_img_request
*img_request
)
1736 return test_bit(IMG_REQ_CHILD
, &img_request
->flags
) != 0;
1739 static void img_request_layered_set(struct rbd_img_request
*img_request
)
1741 set_bit(IMG_REQ_LAYERED
, &img_request
->flags
);
1745 static void img_request_layered_clear(struct rbd_img_request
*img_request
)
1747 clear_bit(IMG_REQ_LAYERED
, &img_request
->flags
);
1751 static bool img_request_layered_test(struct rbd_img_request
*img_request
)
1754 return test_bit(IMG_REQ_LAYERED
, &img_request
->flags
) != 0;
1757 static enum obj_operation_type
1758 rbd_img_request_op_type(struct rbd_img_request
*img_request
)
1760 if (img_request_write_test(img_request
))
1761 return OBJ_OP_WRITE
;
1762 else if (img_request_discard_test(img_request
))
1763 return OBJ_OP_DISCARD
;
1769 rbd_img_obj_request_read_callback(struct rbd_obj_request
*obj_request
)
1771 u64 xferred
= obj_request
->xferred
;
1772 u64 length
= obj_request
->length
;
1774 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__
,
1775 obj_request
, obj_request
->img_request
, obj_request
->result
,
1778 * ENOENT means a hole in the image. We zero-fill the entire
1779 * length of the request. A short read also implies zero-fill
1780 * to the end of the request. An error requires the whole
1781 * length of the request to be reported finished with an error
1782 * to the block layer. In each case we update the xferred
1783 * count to indicate the whole request was satisfied.
1785 rbd_assert(obj_request
->type
!= OBJ_REQUEST_NODATA
);
1786 if (obj_request
->result
== -ENOENT
) {
1787 if (obj_request
->type
== OBJ_REQUEST_BIO
)
1788 zero_bio_chain(obj_request
->bio_list
, 0);
1790 zero_pages(obj_request
->pages
, 0, length
);
1791 obj_request
->result
= 0;
1792 } else if (xferred
< length
&& !obj_request
->result
) {
1793 if (obj_request
->type
== OBJ_REQUEST_BIO
)
1794 zero_bio_chain(obj_request
->bio_list
, xferred
);
1796 zero_pages(obj_request
->pages
, xferred
, length
);
1798 obj_request
->xferred
= length
;
1799 obj_request_done_set(obj_request
);
1802 static void rbd_obj_request_complete(struct rbd_obj_request
*obj_request
)
1804 dout("%s: obj %p cb %p\n", __func__
, obj_request
,
1805 obj_request
->callback
);
1806 if (obj_request
->callback
)
1807 obj_request
->callback(obj_request
);
1809 complete_all(&obj_request
->completion
);
1812 static void rbd_obj_request_error(struct rbd_obj_request
*obj_request
, int err
)
1814 obj_request
->result
= err
;
1815 obj_request
->xferred
= 0;
1817 * kludge - mirror rbd_obj_request_submit() to match a put in
1818 * rbd_img_obj_callback()
1820 if (obj_request_img_data_test(obj_request
)) {
1821 WARN_ON(obj_request
->callback
!= rbd_img_obj_callback
);
1822 rbd_img_request_get(obj_request
->img_request
);
1824 obj_request_done_set(obj_request
);
1825 rbd_obj_request_complete(obj_request
);
1828 static void rbd_osd_read_callback(struct rbd_obj_request
*obj_request
)
1830 struct rbd_img_request
*img_request
= NULL
;
1831 struct rbd_device
*rbd_dev
= NULL
;
1832 bool layered
= false;
1834 if (obj_request_img_data_test(obj_request
)) {
1835 img_request
= obj_request
->img_request
;
1836 layered
= img_request
&& img_request_layered_test(img_request
);
1837 rbd_dev
= img_request
->rbd_dev
;
1840 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__
,
1841 obj_request
, img_request
, obj_request
->result
,
1842 obj_request
->xferred
, obj_request
->length
);
1843 if (layered
&& obj_request
->result
== -ENOENT
&&
1844 obj_request
->img_offset
< rbd_dev
->parent_overlap
)
1845 rbd_img_parent_read(obj_request
);
1846 else if (img_request
)
1847 rbd_img_obj_request_read_callback(obj_request
);
1849 obj_request_done_set(obj_request
);
1852 static void rbd_osd_write_callback(struct rbd_obj_request
*obj_request
)
1854 dout("%s: obj %p result %d %llu\n", __func__
, obj_request
,
1855 obj_request
->result
, obj_request
->length
);
1857 * There is no such thing as a successful short write. Set
1858 * it to our originally-requested length.
1860 obj_request
->xferred
= obj_request
->length
;
1861 obj_request_done_set(obj_request
);
1864 static void rbd_osd_discard_callback(struct rbd_obj_request
*obj_request
)
1866 dout("%s: obj %p result %d %llu\n", __func__
, obj_request
,
1867 obj_request
->result
, obj_request
->length
);
1869 * There is no such thing as a successful short discard. Set
1870 * it to our originally-requested length.
1872 obj_request
->xferred
= obj_request
->length
;
1873 /* discarding a non-existent object is not a problem */
1874 if (obj_request
->result
== -ENOENT
)
1875 obj_request
->result
= 0;
1876 obj_request_done_set(obj_request
);
1880 * For a simple stat call there's nothing to do. We'll do more if
1881 * this is part of a write sequence for a layered image.
1883 static void rbd_osd_stat_callback(struct rbd_obj_request
*obj_request
)
1885 dout("%s: obj %p\n", __func__
, obj_request
);
1886 obj_request_done_set(obj_request
);
1889 static void rbd_osd_call_callback(struct rbd_obj_request
*obj_request
)
1891 dout("%s: obj %p\n", __func__
, obj_request
);
1893 if (obj_request_img_data_test(obj_request
))
1894 rbd_osd_copyup_callback(obj_request
);
1896 obj_request_done_set(obj_request
);
1899 static void rbd_osd_req_callback(struct ceph_osd_request
*osd_req
)
1901 struct rbd_obj_request
*obj_request
= osd_req
->r_priv
;
1904 dout("%s: osd_req %p\n", __func__
, osd_req
);
1905 rbd_assert(osd_req
== obj_request
->osd_req
);
1906 if (obj_request_img_data_test(obj_request
)) {
1907 rbd_assert(obj_request
->img_request
);
1908 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1910 rbd_assert(obj_request
->which
== BAD_WHICH
);
1913 if (osd_req
->r_result
< 0)
1914 obj_request
->result
= osd_req
->r_result
;
1917 * We support a 64-bit length, but ultimately it has to be
1918 * passed to the block layer, which just supports a 32-bit
1921 obj_request
->xferred
= osd_req
->r_ops
[0].outdata_len
;
1922 rbd_assert(obj_request
->xferred
< (u64
)UINT_MAX
);
1924 opcode
= osd_req
->r_ops
[0].op
;
1926 case CEPH_OSD_OP_READ
:
1927 rbd_osd_read_callback(obj_request
);
1929 case CEPH_OSD_OP_SETALLOCHINT
:
1930 rbd_assert(osd_req
->r_ops
[1].op
== CEPH_OSD_OP_WRITE
||
1931 osd_req
->r_ops
[1].op
== CEPH_OSD_OP_WRITEFULL
);
1933 case CEPH_OSD_OP_WRITE
:
1934 case CEPH_OSD_OP_WRITEFULL
:
1935 rbd_osd_write_callback(obj_request
);
1937 case CEPH_OSD_OP_STAT
:
1938 rbd_osd_stat_callback(obj_request
);
1940 case CEPH_OSD_OP_DELETE
:
1941 case CEPH_OSD_OP_TRUNCATE
:
1942 case CEPH_OSD_OP_ZERO
:
1943 rbd_osd_discard_callback(obj_request
);
1945 case CEPH_OSD_OP_CALL
:
1946 rbd_osd_call_callback(obj_request
);
1949 rbd_warn(NULL
, "%s: unsupported op %hu",
1950 obj_request
->object_name
, (unsigned short) opcode
);
1954 if (obj_request_done_test(obj_request
))
1955 rbd_obj_request_complete(obj_request
);
1958 static void rbd_osd_req_format_read(struct rbd_obj_request
*obj_request
)
1960 struct ceph_osd_request
*osd_req
= obj_request
->osd_req
;
1962 rbd_assert(obj_request_img_data_test(obj_request
));
1963 osd_req
->r_snapid
= obj_request
->img_request
->snap_id
;
1966 static void rbd_osd_req_format_write(struct rbd_obj_request
*obj_request
)
1968 struct ceph_osd_request
*osd_req
= obj_request
->osd_req
;
1970 osd_req
->r_mtime
= CURRENT_TIME
;
1971 osd_req
->r_data_offset
= obj_request
->offset
;
1975 * Create an osd request. A read request has one osd op (read).
1976 * A write request has either one (watch) or two (hint+write) osd ops.
1977 * (All rbd data writes are prefixed with an allocation hint op, but
1978 * technically osd watch is a write request, hence this distinction.)
1980 static struct ceph_osd_request
*rbd_osd_req_create(
1981 struct rbd_device
*rbd_dev
,
1982 enum obj_operation_type op_type
,
1983 unsigned int num_ops
,
1984 struct rbd_obj_request
*obj_request
)
1986 struct ceph_snap_context
*snapc
= NULL
;
1987 struct ceph_osd_client
*osdc
;
1988 struct ceph_osd_request
*osd_req
;
1990 if (obj_request_img_data_test(obj_request
) &&
1991 (op_type
== OBJ_OP_DISCARD
|| op_type
== OBJ_OP_WRITE
)) {
1992 struct rbd_img_request
*img_request
= obj_request
->img_request
;
1993 if (op_type
== OBJ_OP_WRITE
) {
1994 rbd_assert(img_request_write_test(img_request
));
1996 rbd_assert(img_request_discard_test(img_request
));
1998 snapc
= img_request
->snapc
;
2001 rbd_assert(num_ops
== 1 || ((op_type
== OBJ_OP_WRITE
) && num_ops
== 2));
2003 /* Allocate and initialize the request, for the num_ops ops */
2005 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2006 osd_req
= ceph_osdc_alloc_request(osdc
, snapc
, num_ops
, false,
2011 if (op_type
== OBJ_OP_WRITE
|| op_type
== OBJ_OP_DISCARD
)
2012 osd_req
->r_flags
= CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
;
2014 osd_req
->r_flags
= CEPH_OSD_FLAG_READ
;
2016 osd_req
->r_callback
= rbd_osd_req_callback
;
2017 osd_req
->r_priv
= obj_request
;
2019 osd_req
->r_base_oloc
.pool
= rbd_dev
->layout
.pool_id
;
2020 if (ceph_oid_aprintf(&osd_req
->r_base_oid
, GFP_NOIO
, "%s",
2021 obj_request
->object_name
))
2024 if (ceph_osdc_alloc_messages(osd_req
, GFP_NOIO
))
2030 ceph_osdc_put_request(osd_req
);
2035 * Create a copyup osd request based on the information in the object
2036 * request supplied. A copyup request has two or three osd ops, a
2037 * copyup method call, potentially a hint op, and a write or truncate
2040 static struct ceph_osd_request
*
2041 rbd_osd_req_create_copyup(struct rbd_obj_request
*obj_request
)
2043 struct rbd_img_request
*img_request
;
2044 struct ceph_snap_context
*snapc
;
2045 struct rbd_device
*rbd_dev
;
2046 struct ceph_osd_client
*osdc
;
2047 struct ceph_osd_request
*osd_req
;
2048 int num_osd_ops
= 3;
2050 rbd_assert(obj_request_img_data_test(obj_request
));
2051 img_request
= obj_request
->img_request
;
2052 rbd_assert(img_request
);
2053 rbd_assert(img_request_write_test(img_request
) ||
2054 img_request_discard_test(img_request
));
2056 if (img_request_discard_test(img_request
))
2059 /* Allocate and initialize the request, for all the ops */
2061 snapc
= img_request
->snapc
;
2062 rbd_dev
= img_request
->rbd_dev
;
2063 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2064 osd_req
= ceph_osdc_alloc_request(osdc
, snapc
, num_osd_ops
,
2069 osd_req
->r_flags
= CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
;
2070 osd_req
->r_callback
= rbd_osd_req_callback
;
2071 osd_req
->r_priv
= obj_request
;
2073 osd_req
->r_base_oloc
.pool
= rbd_dev
->layout
.pool_id
;
2074 if (ceph_oid_aprintf(&osd_req
->r_base_oid
, GFP_NOIO
, "%s",
2075 obj_request
->object_name
))
2078 if (ceph_osdc_alloc_messages(osd_req
, GFP_NOIO
))
2084 ceph_osdc_put_request(osd_req
);
2089 static void rbd_osd_req_destroy(struct ceph_osd_request
*osd_req
)
2091 ceph_osdc_put_request(osd_req
);
2094 /* object_name is assumed to be a non-null pointer and NUL-terminated */
2096 static struct rbd_obj_request
*rbd_obj_request_create(const char *object_name
,
2097 u64 offset
, u64 length
,
2098 enum obj_request_type type
)
2100 struct rbd_obj_request
*obj_request
;
2104 rbd_assert(obj_request_type_valid(type
));
2106 size
= strlen(object_name
) + 1;
2107 name
= kmalloc(size
, GFP_NOIO
);
2111 obj_request
= kmem_cache_zalloc(rbd_obj_request_cache
, GFP_NOIO
);
2117 obj_request
->object_name
= memcpy(name
, object_name
, size
);
2118 obj_request
->offset
= offset
;
2119 obj_request
->length
= length
;
2120 obj_request
->flags
= 0;
2121 obj_request
->which
= BAD_WHICH
;
2122 obj_request
->type
= type
;
2123 INIT_LIST_HEAD(&obj_request
->links
);
2124 init_completion(&obj_request
->completion
);
2125 kref_init(&obj_request
->kref
);
2127 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__
, object_name
,
2128 offset
, length
, (int)type
, obj_request
);
2133 static void rbd_obj_request_destroy(struct kref
*kref
)
2135 struct rbd_obj_request
*obj_request
;
2137 obj_request
= container_of(kref
, struct rbd_obj_request
, kref
);
2139 dout("%s: obj %p\n", __func__
, obj_request
);
2141 rbd_assert(obj_request
->img_request
== NULL
);
2142 rbd_assert(obj_request
->which
== BAD_WHICH
);
2144 if (obj_request
->osd_req
)
2145 rbd_osd_req_destroy(obj_request
->osd_req
);
2147 rbd_assert(obj_request_type_valid(obj_request
->type
));
2148 switch (obj_request
->type
) {
2149 case OBJ_REQUEST_NODATA
:
2150 break; /* Nothing to do */
2151 case OBJ_REQUEST_BIO
:
2152 if (obj_request
->bio_list
)
2153 bio_chain_put(obj_request
->bio_list
);
2155 case OBJ_REQUEST_PAGES
:
2156 /* img_data requests don't own their page array */
2157 if (obj_request
->pages
&&
2158 !obj_request_img_data_test(obj_request
))
2159 ceph_release_page_vector(obj_request
->pages
,
2160 obj_request
->page_count
);
2164 kfree(obj_request
->object_name
);
2165 obj_request
->object_name
= NULL
;
2166 kmem_cache_free(rbd_obj_request_cache
, obj_request
);
2169 /* It's OK to call this for a device with no parent */
2171 static void rbd_spec_put(struct rbd_spec
*spec
);
2172 static void rbd_dev_unparent(struct rbd_device
*rbd_dev
)
2174 rbd_dev_remove_parent(rbd_dev
);
2175 rbd_spec_put(rbd_dev
->parent_spec
);
2176 rbd_dev
->parent_spec
= NULL
;
2177 rbd_dev
->parent_overlap
= 0;
2181 * Parent image reference counting is used to determine when an
2182 * image's parent fields can be safely torn down--after there are no
2183 * more in-flight requests to the parent image. When the last
2184 * reference is dropped, cleaning them up is safe.
2186 static void rbd_dev_parent_put(struct rbd_device
*rbd_dev
)
2190 if (!rbd_dev
->parent_spec
)
2193 counter
= atomic_dec_return_safe(&rbd_dev
->parent_ref
);
2197 /* Last reference; clean up parent data structures */
2200 rbd_dev_unparent(rbd_dev
);
2202 rbd_warn(rbd_dev
, "parent reference underflow");
2206 * If an image has a non-zero parent overlap, get a reference to its
2209 * Returns true if the rbd device has a parent with a non-zero
2210 * overlap and a reference for it was successfully taken, or
2213 static bool rbd_dev_parent_get(struct rbd_device
*rbd_dev
)
2217 if (!rbd_dev
->parent_spec
)
2220 down_read(&rbd_dev
->header_rwsem
);
2221 if (rbd_dev
->parent_overlap
)
2222 counter
= atomic_inc_return_safe(&rbd_dev
->parent_ref
);
2223 up_read(&rbd_dev
->header_rwsem
);
2226 rbd_warn(rbd_dev
, "parent reference overflow");
2232 * Caller is responsible for filling in the list of object requests
2233 * that comprises the image request, and the Linux request pointer
2234 * (if there is one).
2236 static struct rbd_img_request
*rbd_img_request_create(
2237 struct rbd_device
*rbd_dev
,
2238 u64 offset
, u64 length
,
2239 enum obj_operation_type op_type
,
2240 struct ceph_snap_context
*snapc
)
2242 struct rbd_img_request
*img_request
;
2244 img_request
= kmem_cache_alloc(rbd_img_request_cache
, GFP_NOIO
);
2248 img_request
->rq
= NULL
;
2249 img_request
->rbd_dev
= rbd_dev
;
2250 img_request
->offset
= offset
;
2251 img_request
->length
= length
;
2252 img_request
->flags
= 0;
2253 if (op_type
== OBJ_OP_DISCARD
) {
2254 img_request_discard_set(img_request
);
2255 img_request
->snapc
= snapc
;
2256 } else if (op_type
== OBJ_OP_WRITE
) {
2257 img_request_write_set(img_request
);
2258 img_request
->snapc
= snapc
;
2260 img_request
->snap_id
= rbd_dev
->spec
->snap_id
;
2262 if (rbd_dev_parent_get(rbd_dev
))
2263 img_request_layered_set(img_request
);
2264 spin_lock_init(&img_request
->completion_lock
);
2265 img_request
->next_completion
= 0;
2266 img_request
->callback
= NULL
;
2267 img_request
->result
= 0;
2268 img_request
->obj_request_count
= 0;
2269 INIT_LIST_HEAD(&img_request
->obj_requests
);
2270 kref_init(&img_request
->kref
);
2272 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__
, rbd_dev
,
2273 obj_op_name(op_type
), offset
, length
, img_request
);
2278 static void rbd_img_request_destroy(struct kref
*kref
)
2280 struct rbd_img_request
*img_request
;
2281 struct rbd_obj_request
*obj_request
;
2282 struct rbd_obj_request
*next_obj_request
;
2284 img_request
= container_of(kref
, struct rbd_img_request
, kref
);
2286 dout("%s: img %p\n", __func__
, img_request
);
2288 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
2289 rbd_img_obj_request_del(img_request
, obj_request
);
2290 rbd_assert(img_request
->obj_request_count
== 0);
2292 if (img_request_layered_test(img_request
)) {
2293 img_request_layered_clear(img_request
);
2294 rbd_dev_parent_put(img_request
->rbd_dev
);
2297 if (img_request_write_test(img_request
) ||
2298 img_request_discard_test(img_request
))
2299 ceph_put_snap_context(img_request
->snapc
);
2301 kmem_cache_free(rbd_img_request_cache
, img_request
);
2304 static struct rbd_img_request
*rbd_parent_request_create(
2305 struct rbd_obj_request
*obj_request
,
2306 u64 img_offset
, u64 length
)
2308 struct rbd_img_request
*parent_request
;
2309 struct rbd_device
*rbd_dev
;
2311 rbd_assert(obj_request
->img_request
);
2312 rbd_dev
= obj_request
->img_request
->rbd_dev
;
2314 parent_request
= rbd_img_request_create(rbd_dev
->parent
, img_offset
,
2315 length
, OBJ_OP_READ
, NULL
);
2316 if (!parent_request
)
2319 img_request_child_set(parent_request
);
2320 rbd_obj_request_get(obj_request
);
2321 parent_request
->obj_request
= obj_request
;
2323 return parent_request
;
2326 static void rbd_parent_request_destroy(struct kref
*kref
)
2328 struct rbd_img_request
*parent_request
;
2329 struct rbd_obj_request
*orig_request
;
2331 parent_request
= container_of(kref
, struct rbd_img_request
, kref
);
2332 orig_request
= parent_request
->obj_request
;
2334 parent_request
->obj_request
= NULL
;
2335 rbd_obj_request_put(orig_request
);
2336 img_request_child_clear(parent_request
);
2338 rbd_img_request_destroy(kref
);
2341 static bool rbd_img_obj_end_request(struct rbd_obj_request
*obj_request
)
2343 struct rbd_img_request
*img_request
;
2344 unsigned int xferred
;
2348 rbd_assert(obj_request_img_data_test(obj_request
));
2349 img_request
= obj_request
->img_request
;
2351 rbd_assert(obj_request
->xferred
<= (u64
)UINT_MAX
);
2352 xferred
= (unsigned int)obj_request
->xferred
;
2353 result
= obj_request
->result
;
2355 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
2356 enum obj_operation_type op_type
;
2358 if (img_request_discard_test(img_request
))
2359 op_type
= OBJ_OP_DISCARD
;
2360 else if (img_request_write_test(img_request
))
2361 op_type
= OBJ_OP_WRITE
;
2363 op_type
= OBJ_OP_READ
;
2365 rbd_warn(rbd_dev
, "%s %llx at %llx (%llx)",
2366 obj_op_name(op_type
), obj_request
->length
,
2367 obj_request
->img_offset
, obj_request
->offset
);
2368 rbd_warn(rbd_dev
, " result %d xferred %x",
2370 if (!img_request
->result
)
2371 img_request
->result
= result
;
2373 * Need to end I/O on the entire obj_request worth of
2374 * bytes in case of error.
2376 xferred
= obj_request
->length
;
2379 if (img_request_child_test(img_request
)) {
2380 rbd_assert(img_request
->obj_request
!= NULL
);
2381 more
= obj_request
->which
< img_request
->obj_request_count
- 1;
2383 rbd_assert(img_request
->rq
!= NULL
);
2385 more
= blk_update_request(img_request
->rq
, result
, xferred
);
2387 __blk_mq_end_request(img_request
->rq
, result
);
2393 static void rbd_img_obj_callback(struct rbd_obj_request
*obj_request
)
2395 struct rbd_img_request
*img_request
;
2396 u32 which
= obj_request
->which
;
2399 rbd_assert(obj_request_img_data_test(obj_request
));
2400 img_request
= obj_request
->img_request
;
2402 dout("%s: img %p obj %p\n", __func__
, img_request
, obj_request
);
2403 rbd_assert(img_request
!= NULL
);
2404 rbd_assert(img_request
->obj_request_count
> 0);
2405 rbd_assert(which
!= BAD_WHICH
);
2406 rbd_assert(which
< img_request
->obj_request_count
);
2408 spin_lock_irq(&img_request
->completion_lock
);
2409 if (which
!= img_request
->next_completion
)
2412 for_each_obj_request_from(img_request
, obj_request
) {
2414 rbd_assert(which
< img_request
->obj_request_count
);
2416 if (!obj_request_done_test(obj_request
))
2418 more
= rbd_img_obj_end_request(obj_request
);
2422 rbd_assert(more
^ (which
== img_request
->obj_request_count
));
2423 img_request
->next_completion
= which
;
2425 spin_unlock_irq(&img_request
->completion_lock
);
2426 rbd_img_request_put(img_request
);
2429 rbd_img_request_complete(img_request
);
2433 * Add individual osd ops to the given ceph_osd_request and prepare
2434 * them for submission. num_ops is the current number of
2435 * osd operations already to the object request.
2437 static void rbd_img_obj_request_fill(struct rbd_obj_request
*obj_request
,
2438 struct ceph_osd_request
*osd_request
,
2439 enum obj_operation_type op_type
,
2440 unsigned int num_ops
)
2442 struct rbd_img_request
*img_request
= obj_request
->img_request
;
2443 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
2444 u64 object_size
= rbd_obj_bytes(&rbd_dev
->header
);
2445 u64 offset
= obj_request
->offset
;
2446 u64 length
= obj_request
->length
;
2450 if (op_type
== OBJ_OP_DISCARD
) {
2451 if (!offset
&& length
== object_size
&&
2452 (!img_request_layered_test(img_request
) ||
2453 !obj_request_overlaps_parent(obj_request
))) {
2454 opcode
= CEPH_OSD_OP_DELETE
;
2455 } else if ((offset
+ length
== object_size
)) {
2456 opcode
= CEPH_OSD_OP_TRUNCATE
;
2458 down_read(&rbd_dev
->header_rwsem
);
2459 img_end
= rbd_dev
->header
.image_size
;
2460 up_read(&rbd_dev
->header_rwsem
);
2462 if (obj_request
->img_offset
+ length
== img_end
)
2463 opcode
= CEPH_OSD_OP_TRUNCATE
;
2465 opcode
= CEPH_OSD_OP_ZERO
;
2467 } else if (op_type
== OBJ_OP_WRITE
) {
2468 if (!offset
&& length
== object_size
)
2469 opcode
= CEPH_OSD_OP_WRITEFULL
;
2471 opcode
= CEPH_OSD_OP_WRITE
;
2472 osd_req_op_alloc_hint_init(osd_request
, num_ops
,
2473 object_size
, object_size
);
2476 opcode
= CEPH_OSD_OP_READ
;
2479 if (opcode
== CEPH_OSD_OP_DELETE
)
2480 osd_req_op_init(osd_request
, num_ops
, opcode
, 0);
2482 osd_req_op_extent_init(osd_request
, num_ops
, opcode
,
2483 offset
, length
, 0, 0);
2485 if (obj_request
->type
== OBJ_REQUEST_BIO
)
2486 osd_req_op_extent_osd_data_bio(osd_request
, num_ops
,
2487 obj_request
->bio_list
, length
);
2488 else if (obj_request
->type
== OBJ_REQUEST_PAGES
)
2489 osd_req_op_extent_osd_data_pages(osd_request
, num_ops
,
2490 obj_request
->pages
, length
,
2491 offset
& ~PAGE_MASK
, false, false);
2493 /* Discards are also writes */
2494 if (op_type
== OBJ_OP_WRITE
|| op_type
== OBJ_OP_DISCARD
)
2495 rbd_osd_req_format_write(obj_request
);
2497 rbd_osd_req_format_read(obj_request
);
2501 * Split up an image request into one or more object requests, each
2502 * to a different object. The "type" parameter indicates whether
2503 * "data_desc" is the pointer to the head of a list of bio
2504 * structures, or the base of a page array. In either case this
2505 * function assumes data_desc describes memory sufficient to hold
2506 * all data described by the image request.
2508 static int rbd_img_request_fill(struct rbd_img_request
*img_request
,
2509 enum obj_request_type type
,
2512 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
2513 struct rbd_obj_request
*obj_request
= NULL
;
2514 struct rbd_obj_request
*next_obj_request
;
2515 struct bio
*bio_list
= NULL
;
2516 unsigned int bio_offset
= 0;
2517 struct page
**pages
= NULL
;
2518 enum obj_operation_type op_type
;
2522 dout("%s: img %p type %d data_desc %p\n", __func__
, img_request
,
2523 (int)type
, data_desc
);
2525 img_offset
= img_request
->offset
;
2526 resid
= img_request
->length
;
2527 rbd_assert(resid
> 0);
2528 op_type
= rbd_img_request_op_type(img_request
);
2530 if (type
== OBJ_REQUEST_BIO
) {
2531 bio_list
= data_desc
;
2532 rbd_assert(img_offset
==
2533 bio_list
->bi_iter
.bi_sector
<< SECTOR_SHIFT
);
2534 } else if (type
== OBJ_REQUEST_PAGES
) {
2539 struct ceph_osd_request
*osd_req
;
2540 const char *object_name
;
2544 object_name
= rbd_segment_name(rbd_dev
, img_offset
);
2547 offset
= rbd_segment_offset(rbd_dev
, img_offset
);
2548 length
= rbd_segment_length(rbd_dev
, img_offset
, resid
);
2549 obj_request
= rbd_obj_request_create(object_name
,
2550 offset
, length
, type
);
2551 /* object request has its own copy of the object name */
2552 rbd_segment_name_free(object_name
);
2557 * set obj_request->img_request before creating the
2558 * osd_request so that it gets the right snapc
2560 rbd_img_obj_request_add(img_request
, obj_request
);
2562 if (type
== OBJ_REQUEST_BIO
) {
2563 unsigned int clone_size
;
2565 rbd_assert(length
<= (u64
)UINT_MAX
);
2566 clone_size
= (unsigned int)length
;
2567 obj_request
->bio_list
=
2568 bio_chain_clone_range(&bio_list
,
2572 if (!obj_request
->bio_list
)
2574 } else if (type
== OBJ_REQUEST_PAGES
) {
2575 unsigned int page_count
;
2577 obj_request
->pages
= pages
;
2578 page_count
= (u32
)calc_pages_for(offset
, length
);
2579 obj_request
->page_count
= page_count
;
2580 if ((offset
+ length
) & ~PAGE_MASK
)
2581 page_count
--; /* more on last page */
2582 pages
+= page_count
;
2585 osd_req
= rbd_osd_req_create(rbd_dev
, op_type
,
2586 (op_type
== OBJ_OP_WRITE
) ? 2 : 1,
2591 obj_request
->osd_req
= osd_req
;
2592 obj_request
->callback
= rbd_img_obj_callback
;
2593 obj_request
->img_offset
= img_offset
;
2595 rbd_img_obj_request_fill(obj_request
, osd_req
, op_type
, 0);
2597 img_offset
+= length
;
2604 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
2605 rbd_img_obj_request_del(img_request
, obj_request
);
2611 rbd_osd_copyup_callback(struct rbd_obj_request
*obj_request
)
2613 struct rbd_img_request
*img_request
;
2614 struct rbd_device
*rbd_dev
;
2615 struct page
**pages
;
2618 dout("%s: obj %p\n", __func__
, obj_request
);
2620 rbd_assert(obj_request
->type
== OBJ_REQUEST_BIO
||
2621 obj_request
->type
== OBJ_REQUEST_NODATA
);
2622 rbd_assert(obj_request_img_data_test(obj_request
));
2623 img_request
= obj_request
->img_request
;
2624 rbd_assert(img_request
);
2626 rbd_dev
= img_request
->rbd_dev
;
2627 rbd_assert(rbd_dev
);
2629 pages
= obj_request
->copyup_pages
;
2630 rbd_assert(pages
!= NULL
);
2631 obj_request
->copyup_pages
= NULL
;
2632 page_count
= obj_request
->copyup_page_count
;
2633 rbd_assert(page_count
);
2634 obj_request
->copyup_page_count
= 0;
2635 ceph_release_page_vector(pages
, page_count
);
2638 * We want the transfer count to reflect the size of the
2639 * original write request. There is no such thing as a
2640 * successful short write, so if the request was successful
2641 * we can just set it to the originally-requested length.
2643 if (!obj_request
->result
)
2644 obj_request
->xferred
= obj_request
->length
;
2646 obj_request_done_set(obj_request
);
2650 rbd_img_obj_parent_read_full_callback(struct rbd_img_request
*img_request
)
2652 struct rbd_obj_request
*orig_request
;
2653 struct ceph_osd_request
*osd_req
;
2654 struct rbd_device
*rbd_dev
;
2655 struct page
**pages
;
2656 enum obj_operation_type op_type
;
2661 rbd_assert(img_request_child_test(img_request
));
2663 /* First get what we need from the image request */
2665 pages
= img_request
->copyup_pages
;
2666 rbd_assert(pages
!= NULL
);
2667 img_request
->copyup_pages
= NULL
;
2668 page_count
= img_request
->copyup_page_count
;
2669 rbd_assert(page_count
);
2670 img_request
->copyup_page_count
= 0;
2672 orig_request
= img_request
->obj_request
;
2673 rbd_assert(orig_request
!= NULL
);
2674 rbd_assert(obj_request_type_valid(orig_request
->type
));
2675 img_result
= img_request
->result
;
2676 parent_length
= img_request
->length
;
2677 rbd_assert(img_result
|| parent_length
== img_request
->xferred
);
2678 rbd_img_request_put(img_request
);
2680 rbd_assert(orig_request
->img_request
);
2681 rbd_dev
= orig_request
->img_request
->rbd_dev
;
2682 rbd_assert(rbd_dev
);
2685 * If the overlap has become 0 (most likely because the
2686 * image has been flattened) we need to free the pages
2687 * and re-submit the original write request.
2689 if (!rbd_dev
->parent_overlap
) {
2690 ceph_release_page_vector(pages
, page_count
);
2691 rbd_obj_request_submit(orig_request
);
2699 * The original osd request is of no use to use any more.
2700 * We need a new one that can hold the three ops in a copyup
2701 * request. Allocate the new copyup osd request for the
2702 * original request, and release the old one.
2704 img_result
= -ENOMEM
;
2705 osd_req
= rbd_osd_req_create_copyup(orig_request
);
2708 rbd_osd_req_destroy(orig_request
->osd_req
);
2709 orig_request
->osd_req
= osd_req
;
2710 orig_request
->copyup_pages
= pages
;
2711 orig_request
->copyup_page_count
= page_count
;
2713 /* Initialize the copyup op */
2715 osd_req_op_cls_init(osd_req
, 0, CEPH_OSD_OP_CALL
, "rbd", "copyup");
2716 osd_req_op_cls_request_data_pages(osd_req
, 0, pages
, parent_length
, 0,
2719 /* Add the other op(s) */
2721 op_type
= rbd_img_request_op_type(orig_request
->img_request
);
2722 rbd_img_obj_request_fill(orig_request
, osd_req
, op_type
, 1);
2724 /* All set, send it off. */
2726 rbd_obj_request_submit(orig_request
);
2730 ceph_release_page_vector(pages
, page_count
);
2731 rbd_obj_request_error(orig_request
, img_result
);
2735 * Read from the parent image the range of data that covers the
2736 * entire target of the given object request. This is used for
2737 * satisfying a layered image write request when the target of an
2738 * object request from the image request does not exist.
2740 * A page array big enough to hold the returned data is allocated
2741 * and supplied to rbd_img_request_fill() as the "data descriptor."
2742 * When the read completes, this page array will be transferred to
2743 * the original object request for the copyup operation.
2745 * If an error occurs, it is recorded as the result of the original
2746 * object request in rbd_img_obj_exists_callback().
2748 static int rbd_img_obj_parent_read_full(struct rbd_obj_request
*obj_request
)
2750 struct rbd_device
*rbd_dev
= obj_request
->img_request
->rbd_dev
;
2751 struct rbd_img_request
*parent_request
= NULL
;
2754 struct page
**pages
= NULL
;
2758 rbd_assert(rbd_dev
->parent
!= NULL
);
2761 * Determine the byte range covered by the object in the
2762 * child image to which the original request was to be sent.
2764 img_offset
= obj_request
->img_offset
- obj_request
->offset
;
2765 length
= (u64
)1 << rbd_dev
->header
.obj_order
;
2768 * There is no defined parent data beyond the parent
2769 * overlap, so limit what we read at that boundary if
2772 if (img_offset
+ length
> rbd_dev
->parent_overlap
) {
2773 rbd_assert(img_offset
< rbd_dev
->parent_overlap
);
2774 length
= rbd_dev
->parent_overlap
- img_offset
;
2778 * Allocate a page array big enough to receive the data read
2781 page_count
= (u32
)calc_pages_for(0, length
);
2782 pages
= ceph_alloc_page_vector(page_count
, GFP_NOIO
);
2783 if (IS_ERR(pages
)) {
2784 result
= PTR_ERR(pages
);
2790 parent_request
= rbd_parent_request_create(obj_request
,
2791 img_offset
, length
);
2792 if (!parent_request
)
2795 result
= rbd_img_request_fill(parent_request
, OBJ_REQUEST_PAGES
, pages
);
2799 parent_request
->copyup_pages
= pages
;
2800 parent_request
->copyup_page_count
= page_count
;
2801 parent_request
->callback
= rbd_img_obj_parent_read_full_callback
;
2803 result
= rbd_img_request_submit(parent_request
);
2807 parent_request
->copyup_pages
= NULL
;
2808 parent_request
->copyup_page_count
= 0;
2809 parent_request
->obj_request
= NULL
;
2810 rbd_obj_request_put(obj_request
);
2813 ceph_release_page_vector(pages
, page_count
);
2815 rbd_img_request_put(parent_request
);
2819 static void rbd_img_obj_exists_callback(struct rbd_obj_request
*obj_request
)
2821 struct rbd_obj_request
*orig_request
;
2822 struct rbd_device
*rbd_dev
;
2825 rbd_assert(!obj_request_img_data_test(obj_request
));
2828 * All we need from the object request is the original
2829 * request and the result of the STAT op. Grab those, then
2830 * we're done with the request.
2832 orig_request
= obj_request
->obj_request
;
2833 obj_request
->obj_request
= NULL
;
2834 rbd_obj_request_put(orig_request
);
2835 rbd_assert(orig_request
);
2836 rbd_assert(orig_request
->img_request
);
2838 result
= obj_request
->result
;
2839 obj_request
->result
= 0;
2841 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__
,
2842 obj_request
, orig_request
, result
,
2843 obj_request
->xferred
, obj_request
->length
);
2844 rbd_obj_request_put(obj_request
);
2847 * If the overlap has become 0 (most likely because the
2848 * image has been flattened) we need to re-submit the
2851 rbd_dev
= orig_request
->img_request
->rbd_dev
;
2852 if (!rbd_dev
->parent_overlap
) {
2853 rbd_obj_request_submit(orig_request
);
2858 * Our only purpose here is to determine whether the object
2859 * exists, and we don't want to treat the non-existence as
2860 * an error. If something else comes back, transfer the
2861 * error to the original request and complete it now.
2864 obj_request_existence_set(orig_request
, true);
2865 } else if (result
== -ENOENT
) {
2866 obj_request_existence_set(orig_request
, false);
2868 goto fail_orig_request
;
2872 * Resubmit the original request now that we have recorded
2873 * whether the target object exists.
2875 result
= rbd_img_obj_request_submit(orig_request
);
2877 goto fail_orig_request
;
2882 rbd_obj_request_error(orig_request
, result
);
2885 static int rbd_img_obj_exists_submit(struct rbd_obj_request
*obj_request
)
2887 struct rbd_device
*rbd_dev
= obj_request
->img_request
->rbd_dev
;
2888 struct rbd_obj_request
*stat_request
;
2889 struct page
**pages
;
2894 stat_request
= rbd_obj_request_create(obj_request
->object_name
, 0, 0,
2899 stat_request
->osd_req
= rbd_osd_req_create(rbd_dev
, OBJ_OP_READ
, 1,
2901 if (!stat_request
->osd_req
) {
2903 goto fail_stat_request
;
2907 * The response data for a STAT call consists of:
2914 size
= sizeof (__le64
) + sizeof (__le32
) + sizeof (__le32
);
2915 page_count
= (u32
)calc_pages_for(0, size
);
2916 pages
= ceph_alloc_page_vector(page_count
, GFP_NOIO
);
2917 if (IS_ERR(pages
)) {
2918 ret
= PTR_ERR(pages
);
2919 goto fail_stat_request
;
2922 osd_req_op_init(stat_request
->osd_req
, 0, CEPH_OSD_OP_STAT
, 0);
2923 osd_req_op_raw_data_in_pages(stat_request
->osd_req
, 0, pages
, size
, 0,
2926 rbd_obj_request_get(obj_request
);
2927 stat_request
->obj_request
= obj_request
;
2928 stat_request
->pages
= pages
;
2929 stat_request
->page_count
= page_count
;
2930 stat_request
->callback
= rbd_img_obj_exists_callback
;
2932 rbd_obj_request_submit(stat_request
);
2936 rbd_obj_request_put(stat_request
);
2940 static bool img_obj_request_simple(struct rbd_obj_request
*obj_request
)
2942 struct rbd_img_request
*img_request
= obj_request
->img_request
;
2943 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
2946 if (!img_request_write_test(img_request
) &&
2947 !img_request_discard_test(img_request
))
2950 /* Non-layered writes */
2951 if (!img_request_layered_test(img_request
))
2955 * Layered writes outside of the parent overlap range don't
2956 * share any data with the parent.
2958 if (!obj_request_overlaps_parent(obj_request
))
2962 * Entire-object layered writes - we will overwrite whatever
2963 * parent data there is anyway.
2965 if (!obj_request
->offset
&&
2966 obj_request
->length
== rbd_obj_bytes(&rbd_dev
->header
))
2970 * If the object is known to already exist, its parent data has
2971 * already been copied.
2973 if (obj_request_known_test(obj_request
) &&
2974 obj_request_exists_test(obj_request
))
2980 static int rbd_img_obj_request_submit(struct rbd_obj_request
*obj_request
)
2982 rbd_assert(obj_request_img_data_test(obj_request
));
2983 rbd_assert(obj_request_type_valid(obj_request
->type
));
2984 rbd_assert(obj_request
->img_request
);
2986 if (img_obj_request_simple(obj_request
)) {
2987 rbd_obj_request_submit(obj_request
);
2992 * It's a layered write. The target object might exist but
2993 * we may not know that yet. If we know it doesn't exist,
2994 * start by reading the data for the full target object from
2995 * the parent so we can use it for a copyup to the target.
2997 if (obj_request_known_test(obj_request
))
2998 return rbd_img_obj_parent_read_full(obj_request
);
3000 /* We don't know whether the target exists. Go find out. */
3002 return rbd_img_obj_exists_submit(obj_request
);
3005 static int rbd_img_request_submit(struct rbd_img_request
*img_request
)
3007 struct rbd_obj_request
*obj_request
;
3008 struct rbd_obj_request
*next_obj_request
;
3011 dout("%s: img %p\n", __func__
, img_request
);
3013 rbd_img_request_get(img_request
);
3014 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
) {
3015 ret
= rbd_img_obj_request_submit(obj_request
);
3021 rbd_img_request_put(img_request
);
3025 static void rbd_img_parent_read_callback(struct rbd_img_request
*img_request
)
3027 struct rbd_obj_request
*obj_request
;
3028 struct rbd_device
*rbd_dev
;
3033 rbd_assert(img_request_child_test(img_request
));
3035 /* First get what we need from the image request and release it */
3037 obj_request
= img_request
->obj_request
;
3038 img_xferred
= img_request
->xferred
;
3039 img_result
= img_request
->result
;
3040 rbd_img_request_put(img_request
);
3043 * If the overlap has become 0 (most likely because the
3044 * image has been flattened) we need to re-submit the
3047 rbd_assert(obj_request
);
3048 rbd_assert(obj_request
->img_request
);
3049 rbd_dev
= obj_request
->img_request
->rbd_dev
;
3050 if (!rbd_dev
->parent_overlap
) {
3051 rbd_obj_request_submit(obj_request
);
3055 obj_request
->result
= img_result
;
3056 if (obj_request
->result
)
3060 * We need to zero anything beyond the parent overlap
3061 * boundary. Since rbd_img_obj_request_read_callback()
3062 * will zero anything beyond the end of a short read, an
3063 * easy way to do this is to pretend the data from the
3064 * parent came up short--ending at the overlap boundary.
3066 rbd_assert(obj_request
->img_offset
< U64_MAX
- obj_request
->length
);
3067 obj_end
= obj_request
->img_offset
+ obj_request
->length
;
3068 if (obj_end
> rbd_dev
->parent_overlap
) {
3071 if (obj_request
->img_offset
< rbd_dev
->parent_overlap
)
3072 xferred
= rbd_dev
->parent_overlap
-
3073 obj_request
->img_offset
;
3075 obj_request
->xferred
= min(img_xferred
, xferred
);
3077 obj_request
->xferred
= img_xferred
;
3080 rbd_img_obj_request_read_callback(obj_request
);
3081 rbd_obj_request_complete(obj_request
);
3084 static void rbd_img_parent_read(struct rbd_obj_request
*obj_request
)
3086 struct rbd_img_request
*img_request
;
3089 rbd_assert(obj_request_img_data_test(obj_request
));
3090 rbd_assert(obj_request
->img_request
!= NULL
);
3091 rbd_assert(obj_request
->result
== (s32
) -ENOENT
);
3092 rbd_assert(obj_request_type_valid(obj_request
->type
));
3094 /* rbd_read_finish(obj_request, obj_request->length); */
3095 img_request
= rbd_parent_request_create(obj_request
,
3096 obj_request
->img_offset
,
3097 obj_request
->length
);
3102 if (obj_request
->type
== OBJ_REQUEST_BIO
)
3103 result
= rbd_img_request_fill(img_request
, OBJ_REQUEST_BIO
,
3104 obj_request
->bio_list
);
3106 result
= rbd_img_request_fill(img_request
, OBJ_REQUEST_PAGES
,
3107 obj_request
->pages
);
3111 img_request
->callback
= rbd_img_parent_read_callback
;
3112 result
= rbd_img_request_submit(img_request
);
3119 rbd_img_request_put(img_request
);
3120 obj_request
->result
= result
;
3121 obj_request
->xferred
= 0;
3122 obj_request_done_set(obj_request
);
3125 static const struct rbd_client_id rbd_empty_cid
;
3127 static bool rbd_cid_equal(const struct rbd_client_id
*lhs
,
3128 const struct rbd_client_id
*rhs
)
3130 return lhs
->gid
== rhs
->gid
&& lhs
->handle
== rhs
->handle
;
3133 static struct rbd_client_id
rbd_get_cid(struct rbd_device
*rbd_dev
)
3135 struct rbd_client_id cid
;
3137 mutex_lock(&rbd_dev
->watch_mutex
);
3138 cid
.gid
= ceph_client_gid(rbd_dev
->rbd_client
->client
);
3139 cid
.handle
= rbd_dev
->watch_cookie
;
3140 mutex_unlock(&rbd_dev
->watch_mutex
);
3145 * lock_rwsem must be held for write
3147 static void rbd_set_owner_cid(struct rbd_device
*rbd_dev
,
3148 const struct rbd_client_id
*cid
)
3150 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__
, rbd_dev
,
3151 rbd_dev
->owner_cid
.gid
, rbd_dev
->owner_cid
.handle
,
3152 cid
->gid
, cid
->handle
);
3153 rbd_dev
->owner_cid
= *cid
; /* struct */
3156 static void format_lock_cookie(struct rbd_device
*rbd_dev
, char *buf
)
3158 mutex_lock(&rbd_dev
->watch_mutex
);
3159 sprintf(buf
, "%s %llu", RBD_LOCK_COOKIE_PREFIX
, rbd_dev
->watch_cookie
);
3160 mutex_unlock(&rbd_dev
->watch_mutex
);
3164 * lock_rwsem must be held for write
3166 static int rbd_lock(struct rbd_device
*rbd_dev
)
3168 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3169 struct rbd_client_id cid
= rbd_get_cid(rbd_dev
);
3173 WARN_ON(__rbd_is_lock_owner(rbd_dev
));
3175 format_lock_cookie(rbd_dev
, cookie
);
3176 ret
= ceph_cls_lock(osdc
, &rbd_dev
->header_oid
, &rbd_dev
->header_oloc
,
3177 RBD_LOCK_NAME
, CEPH_CLS_LOCK_EXCLUSIVE
, cookie
,
3178 RBD_LOCK_TAG
, "", 0);
3182 rbd_dev
->lock_state
= RBD_LOCK_STATE_LOCKED
;
3183 rbd_set_owner_cid(rbd_dev
, &cid
);
3184 queue_work(rbd_dev
->task_wq
, &rbd_dev
->acquired_lock_work
);
3189 * lock_rwsem must be held for write
3191 static int rbd_unlock(struct rbd_device
*rbd_dev
)
3193 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3197 WARN_ON(!__rbd_is_lock_owner(rbd_dev
));
3199 rbd_dev
->lock_state
= RBD_LOCK_STATE_UNLOCKED
;
3201 format_lock_cookie(rbd_dev
, cookie
);
3202 ret
= ceph_cls_unlock(osdc
, &rbd_dev
->header_oid
, &rbd_dev
->header_oloc
,
3203 RBD_LOCK_NAME
, cookie
);
3204 if (ret
&& ret
!= -ENOENT
) {
3205 rbd_warn(rbd_dev
, "cls_unlock failed: %d", ret
);
3209 rbd_set_owner_cid(rbd_dev
, &rbd_empty_cid
);
3210 queue_work(rbd_dev
->task_wq
, &rbd_dev
->released_lock_work
);
3214 static int __rbd_notify_op_lock(struct rbd_device
*rbd_dev
,
3215 enum rbd_notify_op notify_op
,
3216 struct page
***preply_pages
,
3219 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3220 struct rbd_client_id cid
= rbd_get_cid(rbd_dev
);
3221 int buf_size
= 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN
;
3225 dout("%s rbd_dev %p notify_op %d\n", __func__
, rbd_dev
, notify_op
);
3227 /* encode *LockPayload NotifyMessage (op + ClientId) */
3228 ceph_start_encoding(&p
, 2, 1, buf_size
- CEPH_ENCODING_START_BLK_LEN
);
3229 ceph_encode_32(&p
, notify_op
);
3230 ceph_encode_64(&p
, cid
.gid
);
3231 ceph_encode_64(&p
, cid
.handle
);
3233 return ceph_osdc_notify(osdc
, &rbd_dev
->header_oid
,
3234 &rbd_dev
->header_oloc
, buf
, buf_size
,
3235 RBD_NOTIFY_TIMEOUT
, preply_pages
, preply_len
);
3238 static void rbd_notify_op_lock(struct rbd_device
*rbd_dev
,
3239 enum rbd_notify_op notify_op
)
3241 struct page
**reply_pages
;
3244 __rbd_notify_op_lock(rbd_dev
, notify_op
, &reply_pages
, &reply_len
);
3245 ceph_release_page_vector(reply_pages
, calc_pages_for(0, reply_len
));
3248 static void rbd_notify_acquired_lock(struct work_struct
*work
)
3250 struct rbd_device
*rbd_dev
= container_of(work
, struct rbd_device
,
3251 acquired_lock_work
);
3253 rbd_notify_op_lock(rbd_dev
, RBD_NOTIFY_OP_ACQUIRED_LOCK
);
3256 static void rbd_notify_released_lock(struct work_struct
*work
)
3258 struct rbd_device
*rbd_dev
= container_of(work
, struct rbd_device
,
3259 released_lock_work
);
3261 rbd_notify_op_lock(rbd_dev
, RBD_NOTIFY_OP_RELEASED_LOCK
);
3264 static int rbd_request_lock(struct rbd_device
*rbd_dev
)
3266 struct page
**reply_pages
;
3268 bool lock_owner_responded
= false;
3271 dout("%s rbd_dev %p\n", __func__
, rbd_dev
);
3273 ret
= __rbd_notify_op_lock(rbd_dev
, RBD_NOTIFY_OP_REQUEST_LOCK
,
3274 &reply_pages
, &reply_len
);
3275 if (ret
&& ret
!= -ETIMEDOUT
) {
3276 rbd_warn(rbd_dev
, "failed to request lock: %d", ret
);
3280 if (reply_len
> 0 && reply_len
<= PAGE_SIZE
) {
3281 void *p
= page_address(reply_pages
[0]);
3282 void *const end
= p
+ reply_len
;
3285 ceph_decode_32_safe(&p
, end
, n
, e_inval
); /* num_acks */
3290 ceph_decode_need(&p
, end
, 8 + 8, e_inval
);
3291 p
+= 8 + 8; /* skip gid and cookie */
3293 ceph_decode_32_safe(&p
, end
, len
, e_inval
);
3297 if (lock_owner_responded
) {
3299 "duplicate lock owners detected");
3304 lock_owner_responded
= true;
3305 ret
= ceph_start_decoding(&p
, end
, 1, "ResponseMessage",
3309 "failed to decode ResponseMessage: %d",
3314 ret
= ceph_decode_32(&p
);
3318 if (!lock_owner_responded
) {
3319 rbd_warn(rbd_dev
, "no lock owners detected");
3324 ceph_release_page_vector(reply_pages
, calc_pages_for(0, reply_len
));
3332 static void wake_requests(struct rbd_device
*rbd_dev
, bool wake_all
)
3334 dout("%s rbd_dev %p wake_all %d\n", __func__
, rbd_dev
, wake_all
);
3336 cancel_delayed_work(&rbd_dev
->lock_dwork
);
3338 wake_up_all(&rbd_dev
->lock_waitq
);
3340 wake_up(&rbd_dev
->lock_waitq
);
3343 static int get_lock_owner_info(struct rbd_device
*rbd_dev
,
3344 struct ceph_locker
**lockers
, u32
*num_lockers
)
3346 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3351 dout("%s rbd_dev %p\n", __func__
, rbd_dev
);
3353 ret
= ceph_cls_lock_info(osdc
, &rbd_dev
->header_oid
,
3354 &rbd_dev
->header_oloc
, RBD_LOCK_NAME
,
3355 &lock_type
, &lock_tag
, lockers
, num_lockers
);
3359 if (*num_lockers
== 0) {
3360 dout("%s rbd_dev %p no lockers detected\n", __func__
, rbd_dev
);
3364 if (strcmp(lock_tag
, RBD_LOCK_TAG
)) {
3365 rbd_warn(rbd_dev
, "locked by external mechanism, tag %s",
3371 if (lock_type
== CEPH_CLS_LOCK_SHARED
) {
3372 rbd_warn(rbd_dev
, "shared lock type detected");
3377 if (strncmp((*lockers
)[0].id
.cookie
, RBD_LOCK_COOKIE_PREFIX
,
3378 strlen(RBD_LOCK_COOKIE_PREFIX
))) {
3379 rbd_warn(rbd_dev
, "locked by external mechanism, cookie %s",
3380 (*lockers
)[0].id
.cookie
);
3390 static int find_watcher(struct rbd_device
*rbd_dev
,
3391 const struct ceph_locker
*locker
)
3393 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3394 struct ceph_watch_item
*watchers
;
3400 ret
= ceph_osdc_list_watchers(osdc
, &rbd_dev
->header_oid
,
3401 &rbd_dev
->header_oloc
, &watchers
,
3406 sscanf(locker
->id
.cookie
, RBD_LOCK_COOKIE_PREFIX
" %llu", &cookie
);
3407 for (i
= 0; i
< num_watchers
; i
++) {
3408 if (!memcmp(&watchers
[i
].addr
, &locker
->info
.addr
,
3409 sizeof(locker
->info
.addr
)) &&
3410 watchers
[i
].cookie
== cookie
) {
3411 struct rbd_client_id cid
= {
3412 .gid
= le64_to_cpu(watchers
[i
].name
.num
),
3416 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__
,
3417 rbd_dev
, cid
.gid
, cid
.handle
);
3418 rbd_set_owner_cid(rbd_dev
, &cid
);
3424 dout("%s rbd_dev %p no watchers\n", __func__
, rbd_dev
);
3432 * lock_rwsem must be held for write
3434 static int rbd_try_lock(struct rbd_device
*rbd_dev
)
3436 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
3437 struct ceph_locker
*lockers
;
3442 ret
= rbd_lock(rbd_dev
);
3446 /* determine if the current lock holder is still alive */
3447 ret
= get_lock_owner_info(rbd_dev
, &lockers
, &num_lockers
);
3451 if (num_lockers
== 0)
3454 ret
= find_watcher(rbd_dev
, lockers
);
3457 ret
= 0; /* have to request lock */
3461 rbd_warn(rbd_dev
, "%s%llu seems dead, breaking lock",
3462 ENTITY_NAME(lockers
[0].id
.name
));
3464 ret
= ceph_monc_blacklist_add(&client
->monc
,
3465 &lockers
[0].info
.addr
);
3467 rbd_warn(rbd_dev
, "blacklist of %s%llu failed: %d",
3468 ENTITY_NAME(lockers
[0].id
.name
), ret
);
3472 ret
= ceph_cls_break_lock(&client
->osdc
, &rbd_dev
->header_oid
,
3473 &rbd_dev
->header_oloc
, RBD_LOCK_NAME
,
3474 lockers
[0].id
.cookie
,
3475 &lockers
[0].id
.name
);
3476 if (ret
&& ret
!= -ENOENT
)
3480 ceph_free_lockers(lockers
, num_lockers
);
3484 ceph_free_lockers(lockers
, num_lockers
);
3489 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3491 static enum rbd_lock_state
rbd_try_acquire_lock(struct rbd_device
*rbd_dev
,
3494 enum rbd_lock_state lock_state
;
3496 down_read(&rbd_dev
->lock_rwsem
);
3497 dout("%s rbd_dev %p read lock_state %d\n", __func__
, rbd_dev
,
3498 rbd_dev
->lock_state
);
3499 if (__rbd_is_lock_owner(rbd_dev
)) {
3500 lock_state
= rbd_dev
->lock_state
;
3501 up_read(&rbd_dev
->lock_rwsem
);
3505 up_read(&rbd_dev
->lock_rwsem
);
3506 down_write(&rbd_dev
->lock_rwsem
);
3507 dout("%s rbd_dev %p write lock_state %d\n", __func__
, rbd_dev
,
3508 rbd_dev
->lock_state
);
3509 if (!__rbd_is_lock_owner(rbd_dev
)) {
3510 *pret
= rbd_try_lock(rbd_dev
);
3512 rbd_warn(rbd_dev
, "failed to acquire lock: %d", *pret
);
3515 lock_state
= rbd_dev
->lock_state
;
3516 up_write(&rbd_dev
->lock_rwsem
);
3520 static void rbd_acquire_lock(struct work_struct
*work
)
3522 struct rbd_device
*rbd_dev
= container_of(to_delayed_work(work
),
3523 struct rbd_device
, lock_dwork
);
3524 enum rbd_lock_state lock_state
;
3527 dout("%s rbd_dev %p\n", __func__
, rbd_dev
);
3529 lock_state
= rbd_try_acquire_lock(rbd_dev
, &ret
);
3530 if (lock_state
!= RBD_LOCK_STATE_UNLOCKED
|| ret
== -EBLACKLISTED
) {
3531 if (lock_state
== RBD_LOCK_STATE_LOCKED
)
3532 wake_requests(rbd_dev
, true);
3533 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__
,
3534 rbd_dev
, lock_state
, ret
);
3538 ret
= rbd_request_lock(rbd_dev
);
3539 if (ret
== -ETIMEDOUT
) {
3540 goto again
; /* treat this as a dead client */
3541 } else if (ret
< 0) {
3542 rbd_warn(rbd_dev
, "error requesting lock: %d", ret
);
3543 mod_delayed_work(rbd_dev
->task_wq
, &rbd_dev
->lock_dwork
,
3547 * lock owner acked, but resend if we don't see them
3550 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__
,
3552 mod_delayed_work(rbd_dev
->task_wq
, &rbd_dev
->lock_dwork
,
3553 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT
* MSEC_PER_SEC
));
3558 * lock_rwsem must be held for write
3560 static bool rbd_release_lock(struct rbd_device
*rbd_dev
)
3562 dout("%s rbd_dev %p read lock_state %d\n", __func__
, rbd_dev
,
3563 rbd_dev
->lock_state
);
3564 if (rbd_dev
->lock_state
!= RBD_LOCK_STATE_LOCKED
)
3567 rbd_dev
->lock_state
= RBD_LOCK_STATE_RELEASING
;
3568 downgrade_write(&rbd_dev
->lock_rwsem
);
3570 * Ensure that all in-flight IO is flushed.
3572 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3573 * may be shared with other devices.
3575 ceph_osdc_sync(&rbd_dev
->rbd_client
->client
->osdc
);
3576 up_read(&rbd_dev
->lock_rwsem
);
3578 down_write(&rbd_dev
->lock_rwsem
);
3579 dout("%s rbd_dev %p write lock_state %d\n", __func__
, rbd_dev
,
3580 rbd_dev
->lock_state
);
3581 if (rbd_dev
->lock_state
!= RBD_LOCK_STATE_RELEASING
)
3584 if (!rbd_unlock(rbd_dev
))
3586 * Give others a chance to grab the lock - we would re-acquire
3587 * almost immediately if we got new IO during ceph_osdc_sync()
3588 * otherwise. We need to ack our own notifications, so this
3589 * lock_dwork will be requeued from rbd_wait_state_locked()
3590 * after wake_requests() in rbd_handle_released_lock().
3592 cancel_delayed_work(&rbd_dev
->lock_dwork
);
3597 static void rbd_release_lock_work(struct work_struct
*work
)
3599 struct rbd_device
*rbd_dev
= container_of(work
, struct rbd_device
,
3602 down_write(&rbd_dev
->lock_rwsem
);
3603 rbd_release_lock(rbd_dev
);
3604 up_write(&rbd_dev
->lock_rwsem
);
3607 static void rbd_handle_acquired_lock(struct rbd_device
*rbd_dev
, u8 struct_v
,
3610 struct rbd_client_id cid
= { 0 };
3612 if (struct_v
>= 2) {
3613 cid
.gid
= ceph_decode_64(p
);
3614 cid
.handle
= ceph_decode_64(p
);
3617 dout("%s rbd_dev %p cid %llu-%llu\n", __func__
, rbd_dev
, cid
.gid
,
3619 if (!rbd_cid_equal(&cid
, &rbd_empty_cid
)) {
3620 down_write(&rbd_dev
->lock_rwsem
);
3621 if (rbd_cid_equal(&cid
, &rbd_dev
->owner_cid
)) {
3623 * we already know that the remote client is
3626 up_write(&rbd_dev
->lock_rwsem
);
3630 rbd_set_owner_cid(rbd_dev
, &cid
);
3631 downgrade_write(&rbd_dev
->lock_rwsem
);
3633 down_read(&rbd_dev
->lock_rwsem
);
3636 if (!__rbd_is_lock_owner(rbd_dev
))
3637 wake_requests(rbd_dev
, false);
3638 up_read(&rbd_dev
->lock_rwsem
);
3641 static void rbd_handle_released_lock(struct rbd_device
*rbd_dev
, u8 struct_v
,
3644 struct rbd_client_id cid
= { 0 };
3646 if (struct_v
>= 2) {
3647 cid
.gid
= ceph_decode_64(p
);
3648 cid
.handle
= ceph_decode_64(p
);
3651 dout("%s rbd_dev %p cid %llu-%llu\n", __func__
, rbd_dev
, cid
.gid
,
3653 if (!rbd_cid_equal(&cid
, &rbd_empty_cid
)) {
3654 down_write(&rbd_dev
->lock_rwsem
);
3655 if (!rbd_cid_equal(&cid
, &rbd_dev
->owner_cid
)) {
3656 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3657 __func__
, rbd_dev
, cid
.gid
, cid
.handle
,
3658 rbd_dev
->owner_cid
.gid
, rbd_dev
->owner_cid
.handle
);
3659 up_write(&rbd_dev
->lock_rwsem
);
3663 rbd_set_owner_cid(rbd_dev
, &rbd_empty_cid
);
3664 downgrade_write(&rbd_dev
->lock_rwsem
);
3666 down_read(&rbd_dev
->lock_rwsem
);
3669 if (!__rbd_is_lock_owner(rbd_dev
))
3670 wake_requests(rbd_dev
, false);
3671 up_read(&rbd_dev
->lock_rwsem
);
3674 static bool rbd_handle_request_lock(struct rbd_device
*rbd_dev
, u8 struct_v
,
3677 struct rbd_client_id my_cid
= rbd_get_cid(rbd_dev
);
3678 struct rbd_client_id cid
= { 0 };
3681 if (struct_v
>= 2) {
3682 cid
.gid
= ceph_decode_64(p
);
3683 cid
.handle
= ceph_decode_64(p
);
3686 dout("%s rbd_dev %p cid %llu-%llu\n", __func__
, rbd_dev
, cid
.gid
,
3688 if (rbd_cid_equal(&cid
, &my_cid
))
3691 down_read(&rbd_dev
->lock_rwsem
);
3692 need_to_send
= __rbd_is_lock_owner(rbd_dev
);
3693 if (rbd_dev
->lock_state
== RBD_LOCK_STATE_LOCKED
) {
3694 if (!rbd_cid_equal(&rbd_dev
->owner_cid
, &rbd_empty_cid
)) {
3695 dout("%s rbd_dev %p queueing unlock_work\n", __func__
,
3697 queue_work(rbd_dev
->task_wq
, &rbd_dev
->unlock_work
);
3700 up_read(&rbd_dev
->lock_rwsem
);
3701 return need_to_send
;
3704 static void __rbd_acknowledge_notify(struct rbd_device
*rbd_dev
,
3705 u64 notify_id
, u64 cookie
, s32
*result
)
3707 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3708 int buf_size
= 4 + CEPH_ENCODING_START_BLK_LEN
;
3715 /* encode ResponseMessage */
3716 ceph_start_encoding(&p
, 1, 1,
3717 buf_size
- CEPH_ENCODING_START_BLK_LEN
);
3718 ceph_encode_32(&p
, *result
);
3723 ret
= ceph_osdc_notify_ack(osdc
, &rbd_dev
->header_oid
,
3724 &rbd_dev
->header_oloc
, notify_id
, cookie
,
3727 rbd_warn(rbd_dev
, "acknowledge_notify failed: %d", ret
);
3730 static void rbd_acknowledge_notify(struct rbd_device
*rbd_dev
, u64 notify_id
,
3733 dout("%s rbd_dev %p\n", __func__
, rbd_dev
);
3734 __rbd_acknowledge_notify(rbd_dev
, notify_id
, cookie
, NULL
);
3737 static void rbd_acknowledge_notify_result(struct rbd_device
*rbd_dev
,
3738 u64 notify_id
, u64 cookie
, s32 result
)
3740 dout("%s rbd_dev %p result %d\n", __func__
, rbd_dev
, result
);
3741 __rbd_acknowledge_notify(rbd_dev
, notify_id
, cookie
, &result
);
3744 static void rbd_watch_cb(void *arg
, u64 notify_id
, u64 cookie
,
3745 u64 notifier_id
, void *data
, size_t data_len
)
3747 struct rbd_device
*rbd_dev
= arg
;
3749 void *const end
= p
+ data_len
;
3755 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3756 __func__
, rbd_dev
, cookie
, notify_id
, data_len
);
3758 ret
= ceph_start_decoding(&p
, end
, 1, "NotifyMessage",
3761 rbd_warn(rbd_dev
, "failed to decode NotifyMessage: %d",
3766 notify_op
= ceph_decode_32(&p
);
3768 /* legacy notification for header updates */
3769 notify_op
= RBD_NOTIFY_OP_HEADER_UPDATE
;
3773 dout("%s rbd_dev %p notify_op %u\n", __func__
, rbd_dev
, notify_op
);
3774 switch (notify_op
) {
3775 case RBD_NOTIFY_OP_ACQUIRED_LOCK
:
3776 rbd_handle_acquired_lock(rbd_dev
, struct_v
, &p
);
3777 rbd_acknowledge_notify(rbd_dev
, notify_id
, cookie
);
3779 case RBD_NOTIFY_OP_RELEASED_LOCK
:
3780 rbd_handle_released_lock(rbd_dev
, struct_v
, &p
);
3781 rbd_acknowledge_notify(rbd_dev
, notify_id
, cookie
);
3783 case RBD_NOTIFY_OP_REQUEST_LOCK
:
3784 if (rbd_handle_request_lock(rbd_dev
, struct_v
, &p
))
3786 * send ResponseMessage(0) back so the client
3787 * can detect a missing owner
3789 rbd_acknowledge_notify_result(rbd_dev
, notify_id
,
3792 rbd_acknowledge_notify(rbd_dev
, notify_id
, cookie
);
3794 case RBD_NOTIFY_OP_HEADER_UPDATE
:
3795 ret
= rbd_dev_refresh(rbd_dev
);
3797 rbd_warn(rbd_dev
, "refresh failed: %d", ret
);
3799 rbd_acknowledge_notify(rbd_dev
, notify_id
, cookie
);
3802 if (rbd_is_lock_owner(rbd_dev
))
3803 rbd_acknowledge_notify_result(rbd_dev
, notify_id
,
3804 cookie
, -EOPNOTSUPP
);
3806 rbd_acknowledge_notify(rbd_dev
, notify_id
, cookie
);
3811 static void __rbd_unregister_watch(struct rbd_device
*rbd_dev
);
3813 static void rbd_watch_errcb(void *arg
, u64 cookie
, int err
)
3815 struct rbd_device
*rbd_dev
= arg
;
3817 rbd_warn(rbd_dev
, "encountered watch error: %d", err
);
3819 down_write(&rbd_dev
->lock_rwsem
);
3820 rbd_set_owner_cid(rbd_dev
, &rbd_empty_cid
);
3821 up_write(&rbd_dev
->lock_rwsem
);
3823 mutex_lock(&rbd_dev
->watch_mutex
);
3824 if (rbd_dev
->watch_state
== RBD_WATCH_STATE_REGISTERED
) {
3825 __rbd_unregister_watch(rbd_dev
);
3826 rbd_dev
->watch_state
= RBD_WATCH_STATE_ERROR
;
3828 queue_delayed_work(rbd_dev
->task_wq
, &rbd_dev
->watch_dwork
, 0);
3830 mutex_unlock(&rbd_dev
->watch_mutex
);
3834 * watch_mutex must be locked
3836 static int __rbd_register_watch(struct rbd_device
*rbd_dev
)
3838 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3839 struct ceph_osd_linger_request
*handle
;
3841 rbd_assert(!rbd_dev
->watch_handle
);
3842 dout("%s rbd_dev %p\n", __func__
, rbd_dev
);
3844 handle
= ceph_osdc_watch(osdc
, &rbd_dev
->header_oid
,
3845 &rbd_dev
->header_oloc
, rbd_watch_cb
,
3846 rbd_watch_errcb
, rbd_dev
);
3848 return PTR_ERR(handle
);
3850 rbd_dev
->watch_handle
= handle
;
3855 * watch_mutex must be locked
3857 static void __rbd_unregister_watch(struct rbd_device
*rbd_dev
)
3859 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3862 rbd_assert(rbd_dev
->watch_handle
);
3863 dout("%s rbd_dev %p\n", __func__
, rbd_dev
);
3865 ret
= ceph_osdc_unwatch(osdc
, rbd_dev
->watch_handle
);
3867 rbd_warn(rbd_dev
, "failed to unwatch: %d", ret
);
3869 rbd_dev
->watch_handle
= NULL
;
3872 static int rbd_register_watch(struct rbd_device
*rbd_dev
)
3876 mutex_lock(&rbd_dev
->watch_mutex
);
3877 rbd_assert(rbd_dev
->watch_state
== RBD_WATCH_STATE_UNREGISTERED
);
3878 ret
= __rbd_register_watch(rbd_dev
);
3882 rbd_dev
->watch_state
= RBD_WATCH_STATE_REGISTERED
;
3883 rbd_dev
->watch_cookie
= rbd_dev
->watch_handle
->linger_id
;
3886 mutex_unlock(&rbd_dev
->watch_mutex
);
3890 static void cancel_tasks_sync(struct rbd_device
*rbd_dev
)
3892 dout("%s rbd_dev %p\n", __func__
, rbd_dev
);
3894 cancel_work_sync(&rbd_dev
->acquired_lock_work
);
3895 cancel_work_sync(&rbd_dev
->released_lock_work
);
3896 cancel_delayed_work_sync(&rbd_dev
->lock_dwork
);
3897 cancel_work_sync(&rbd_dev
->unlock_work
);
3900 static void rbd_unregister_watch(struct rbd_device
*rbd_dev
)
3902 WARN_ON(waitqueue_active(&rbd_dev
->lock_waitq
));
3903 cancel_tasks_sync(rbd_dev
);
3905 mutex_lock(&rbd_dev
->watch_mutex
);
3906 if (rbd_dev
->watch_state
== RBD_WATCH_STATE_REGISTERED
)
3907 __rbd_unregister_watch(rbd_dev
);
3908 rbd_dev
->watch_state
= RBD_WATCH_STATE_UNREGISTERED
;
3909 mutex_unlock(&rbd_dev
->watch_mutex
);
3911 cancel_delayed_work_sync(&rbd_dev
->watch_dwork
);
3912 ceph_osdc_flush_notifies(&rbd_dev
->rbd_client
->client
->osdc
);
3915 static void rbd_reregister_watch(struct work_struct
*work
)
3917 struct rbd_device
*rbd_dev
= container_of(to_delayed_work(work
),
3918 struct rbd_device
, watch_dwork
);
3919 bool was_lock_owner
= false;
3920 bool need_to_wake
= false;
3923 dout("%s rbd_dev %p\n", __func__
, rbd_dev
);
3925 down_write(&rbd_dev
->lock_rwsem
);
3926 if (rbd_dev
->lock_state
== RBD_LOCK_STATE_LOCKED
)
3927 was_lock_owner
= rbd_release_lock(rbd_dev
);
3929 mutex_lock(&rbd_dev
->watch_mutex
);
3930 if (rbd_dev
->watch_state
!= RBD_WATCH_STATE_ERROR
) {
3931 mutex_unlock(&rbd_dev
->watch_mutex
);
3935 ret
= __rbd_register_watch(rbd_dev
);
3937 rbd_warn(rbd_dev
, "failed to reregister watch: %d", ret
);
3938 if (ret
== -EBLACKLISTED
|| ret
== -ENOENT
) {
3939 set_bit(RBD_DEV_FLAG_BLACKLISTED
, &rbd_dev
->flags
);
3940 need_to_wake
= true;
3942 queue_delayed_work(rbd_dev
->task_wq
,
3943 &rbd_dev
->watch_dwork
,
3946 mutex_unlock(&rbd_dev
->watch_mutex
);
3950 need_to_wake
= true;
3951 rbd_dev
->watch_state
= RBD_WATCH_STATE_REGISTERED
;
3952 rbd_dev
->watch_cookie
= rbd_dev
->watch_handle
->linger_id
;
3953 mutex_unlock(&rbd_dev
->watch_mutex
);
3955 ret
= rbd_dev_refresh(rbd_dev
);
3957 rbd_warn(rbd_dev
, "reregisteration refresh failed: %d", ret
);
3959 if (was_lock_owner
) {
3960 ret
= rbd_try_lock(rbd_dev
);
3962 rbd_warn(rbd_dev
, "reregisteration lock failed: %d",
3967 up_write(&rbd_dev
->lock_rwsem
);
3969 wake_requests(rbd_dev
, true);
3973 * Synchronous osd object method call. Returns the number of bytes
3974 * returned in the outbound buffer, or a negative error code.
3976 static int rbd_obj_method_sync(struct rbd_device
*rbd_dev
,
3977 const char *object_name
,
3978 const char *class_name
,
3979 const char *method_name
,
3980 const void *outbound
,
3981 size_t outbound_size
,
3983 size_t inbound_size
)
3985 struct rbd_obj_request
*obj_request
;
3986 struct page
**pages
;
3991 * Method calls are ultimately read operations. The result
3992 * should placed into the inbound buffer provided. They
3993 * also supply outbound data--parameters for the object
3994 * method. Currently if this is present it will be a
3997 page_count
= (u32
)calc_pages_for(0, inbound_size
);
3998 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
4000 return PTR_ERR(pages
);
4003 obj_request
= rbd_obj_request_create(object_name
, 0, inbound_size
,
4008 obj_request
->pages
= pages
;
4009 obj_request
->page_count
= page_count
;
4011 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, OBJ_OP_READ
, 1,
4013 if (!obj_request
->osd_req
)
4016 osd_req_op_cls_init(obj_request
->osd_req
, 0, CEPH_OSD_OP_CALL
,
4017 class_name
, method_name
);
4018 if (outbound_size
) {
4019 struct ceph_pagelist
*pagelist
;
4021 pagelist
= kmalloc(sizeof (*pagelist
), GFP_NOFS
);
4025 ceph_pagelist_init(pagelist
);
4026 ceph_pagelist_append(pagelist
, outbound
, outbound_size
);
4027 osd_req_op_cls_request_data_pagelist(obj_request
->osd_req
, 0,
4030 osd_req_op_cls_response_data_pages(obj_request
->osd_req
, 0,
4031 obj_request
->pages
, inbound_size
,
4034 rbd_obj_request_submit(obj_request
);
4035 ret
= rbd_obj_request_wait(obj_request
);
4039 ret
= obj_request
->result
;
4043 rbd_assert(obj_request
->xferred
< (u64
)INT_MAX
);
4044 ret
= (int)obj_request
->xferred
;
4045 ceph_copy_from_page_vector(pages
, inbound
, 0, obj_request
->xferred
);
4048 rbd_obj_request_put(obj_request
);
4050 ceph_release_page_vector(pages
, page_count
);
4056 * lock_rwsem must be held for read
4058 static void rbd_wait_state_locked(struct rbd_device
*rbd_dev
)
4064 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4065 * and cancel_delayed_work() in wake_requests().
4067 dout("%s rbd_dev %p queueing lock_dwork\n", __func__
, rbd_dev
);
4068 queue_delayed_work(rbd_dev
->task_wq
, &rbd_dev
->lock_dwork
, 0);
4069 prepare_to_wait_exclusive(&rbd_dev
->lock_waitq
, &wait
,
4070 TASK_UNINTERRUPTIBLE
);
4071 up_read(&rbd_dev
->lock_rwsem
);
4073 down_read(&rbd_dev
->lock_rwsem
);
4074 } while (rbd_dev
->lock_state
!= RBD_LOCK_STATE_LOCKED
&&
4075 !test_bit(RBD_DEV_FLAG_BLACKLISTED
, &rbd_dev
->flags
));
4077 finish_wait(&rbd_dev
->lock_waitq
, &wait
);
4080 static void rbd_queue_workfn(struct work_struct
*work
)
4082 struct request
*rq
= blk_mq_rq_from_pdu(work
);
4083 struct rbd_device
*rbd_dev
= rq
->q
->queuedata
;
4084 struct rbd_img_request
*img_request
;
4085 struct ceph_snap_context
*snapc
= NULL
;
4086 u64 offset
= (u64
)blk_rq_pos(rq
) << SECTOR_SHIFT
;
4087 u64 length
= blk_rq_bytes(rq
);
4088 enum obj_operation_type op_type
;
4090 bool must_be_locked
;
4093 if (rq
->cmd_type
!= REQ_TYPE_FS
) {
4094 dout("%s: non-fs request type %d\n", __func__
,
4095 (int) rq
->cmd_type
);
4100 if (req_op(rq
) == REQ_OP_DISCARD
)
4101 op_type
= OBJ_OP_DISCARD
;
4102 else if (req_op(rq
) == REQ_OP_WRITE
)
4103 op_type
= OBJ_OP_WRITE
;
4105 op_type
= OBJ_OP_READ
;
4107 /* Ignore/skip any zero-length requests */
4110 dout("%s: zero-length request\n", __func__
);
4115 /* Only reads are allowed to a read-only device */
4117 if (op_type
!= OBJ_OP_READ
) {
4118 if (rbd_dev
->mapping
.read_only
) {
4122 rbd_assert(rbd_dev
->spec
->snap_id
== CEPH_NOSNAP
);
4126 * Quit early if the mapped snapshot no longer exists. It's
4127 * still possible the snapshot will have disappeared by the
4128 * time our request arrives at the osd, but there's no sense in
4129 * sending it if we already know.
4131 if (!test_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
)) {
4132 dout("request for non-existent snapshot");
4133 rbd_assert(rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
);
4138 if (offset
&& length
> U64_MAX
- offset
+ 1) {
4139 rbd_warn(rbd_dev
, "bad request range (%llu~%llu)", offset
,
4142 goto err_rq
; /* Shouldn't happen */
4145 blk_mq_start_request(rq
);
4147 down_read(&rbd_dev
->header_rwsem
);
4148 mapping_size
= rbd_dev
->mapping
.size
;
4149 if (op_type
!= OBJ_OP_READ
) {
4150 snapc
= rbd_dev
->header
.snapc
;
4151 ceph_get_snap_context(snapc
);
4152 must_be_locked
= rbd_is_lock_supported(rbd_dev
);
4154 must_be_locked
= rbd_dev
->opts
->lock_on_read
&&
4155 rbd_is_lock_supported(rbd_dev
);
4157 up_read(&rbd_dev
->header_rwsem
);
4159 if (offset
+ length
> mapping_size
) {
4160 rbd_warn(rbd_dev
, "beyond EOD (%llu~%llu > %llu)", offset
,
4161 length
, mapping_size
);
4166 if (must_be_locked
) {
4167 down_read(&rbd_dev
->lock_rwsem
);
4168 if (rbd_dev
->lock_state
!= RBD_LOCK_STATE_LOCKED
&&
4169 !test_bit(RBD_DEV_FLAG_BLACKLISTED
, &rbd_dev
->flags
))
4170 rbd_wait_state_locked(rbd_dev
);
4172 WARN_ON((rbd_dev
->lock_state
== RBD_LOCK_STATE_LOCKED
) ^
4173 !test_bit(RBD_DEV_FLAG_BLACKLISTED
, &rbd_dev
->flags
));
4174 if (test_bit(RBD_DEV_FLAG_BLACKLISTED
, &rbd_dev
->flags
)) {
4175 result
= -EBLACKLISTED
;
4180 img_request
= rbd_img_request_create(rbd_dev
, offset
, length
, op_type
,
4186 img_request
->rq
= rq
;
4187 snapc
= NULL
; /* img_request consumes a ref */
4189 if (op_type
== OBJ_OP_DISCARD
)
4190 result
= rbd_img_request_fill(img_request
, OBJ_REQUEST_NODATA
,
4193 result
= rbd_img_request_fill(img_request
, OBJ_REQUEST_BIO
,
4196 goto err_img_request
;
4198 result
= rbd_img_request_submit(img_request
);
4200 goto err_img_request
;
4203 up_read(&rbd_dev
->lock_rwsem
);
4207 rbd_img_request_put(img_request
);
4210 up_read(&rbd_dev
->lock_rwsem
);
4213 rbd_warn(rbd_dev
, "%s %llx at %llx result %d",
4214 obj_op_name(op_type
), length
, offset
, result
);
4215 ceph_put_snap_context(snapc
);
4217 blk_mq_end_request(rq
, result
);
4220 static int rbd_queue_rq(struct blk_mq_hw_ctx
*hctx
,
4221 const struct blk_mq_queue_data
*bd
)
4223 struct request
*rq
= bd
->rq
;
4224 struct work_struct
*work
= blk_mq_rq_to_pdu(rq
);
4226 queue_work(rbd_wq
, work
);
4227 return BLK_MQ_RQ_QUEUE_OK
;
4230 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
4232 struct gendisk
*disk
= rbd_dev
->disk
;
4237 rbd_dev
->disk
= NULL
;
4238 if (disk
->flags
& GENHD_FL_UP
) {
4241 blk_cleanup_queue(disk
->queue
);
4242 blk_mq_free_tag_set(&rbd_dev
->tag_set
);
4247 static int rbd_obj_read_sync(struct rbd_device
*rbd_dev
,
4248 const char *object_name
,
4249 u64 offset
, u64 length
, void *buf
)
4252 struct rbd_obj_request
*obj_request
;
4253 struct page
**pages
= NULL
;
4258 page_count
= (u32
) calc_pages_for(offset
, length
);
4259 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
4261 return PTR_ERR(pages
);
4264 obj_request
= rbd_obj_request_create(object_name
, offset
, length
,
4269 obj_request
->pages
= pages
;
4270 obj_request
->page_count
= page_count
;
4272 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, OBJ_OP_READ
, 1,
4274 if (!obj_request
->osd_req
)
4277 osd_req_op_extent_init(obj_request
->osd_req
, 0, CEPH_OSD_OP_READ
,
4278 offset
, length
, 0, 0);
4279 osd_req_op_extent_osd_data_pages(obj_request
->osd_req
, 0,
4281 obj_request
->length
,
4282 obj_request
->offset
& ~PAGE_MASK
,
4285 rbd_obj_request_submit(obj_request
);
4286 ret
= rbd_obj_request_wait(obj_request
);
4290 ret
= obj_request
->result
;
4294 rbd_assert(obj_request
->xferred
<= (u64
) SIZE_MAX
);
4295 size
= (size_t) obj_request
->xferred
;
4296 ceph_copy_from_page_vector(pages
, buf
, 0, size
);
4297 rbd_assert(size
<= (size_t)INT_MAX
);
4301 rbd_obj_request_put(obj_request
);
4303 ceph_release_page_vector(pages
, page_count
);
4309 * Read the complete header for the given rbd device. On successful
4310 * return, the rbd_dev->header field will contain up-to-date
4311 * information about the image.
4313 static int rbd_dev_v1_header_info(struct rbd_device
*rbd_dev
)
4315 struct rbd_image_header_ondisk
*ondisk
= NULL
;
4322 * The complete header will include an array of its 64-bit
4323 * snapshot ids, followed by the names of those snapshots as
4324 * a contiguous block of NUL-terminated strings. Note that
4325 * the number of snapshots could change by the time we read
4326 * it in, in which case we re-read it.
4333 size
= sizeof (*ondisk
);
4334 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
4336 ondisk
= kmalloc(size
, GFP_KERNEL
);
4340 ret
= rbd_obj_read_sync(rbd_dev
, rbd_dev
->header_oid
.name
,
4344 if ((size_t)ret
< size
) {
4346 rbd_warn(rbd_dev
, "short header read (want %zd got %d)",
4350 if (!rbd_dev_ondisk_valid(ondisk
)) {
4352 rbd_warn(rbd_dev
, "invalid header");
4356 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
4357 want_count
= snap_count
;
4358 snap_count
= le32_to_cpu(ondisk
->snap_count
);
4359 } while (snap_count
!= want_count
);
4361 ret
= rbd_header_from_disk(rbd_dev
, ondisk
);
4369 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4370 * has disappeared from the (just updated) snapshot context.
4372 static void rbd_exists_validate(struct rbd_device
*rbd_dev
)
4376 if (!test_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
))
4379 snap_id
= rbd_dev
->spec
->snap_id
;
4380 if (snap_id
== CEPH_NOSNAP
)
4383 if (rbd_dev_snap_index(rbd_dev
, snap_id
) == BAD_SNAP_INDEX
)
4384 clear_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
);
4387 static void rbd_dev_update_size(struct rbd_device
*rbd_dev
)
4392 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4393 * try to update its size. If REMOVING is set, updating size
4394 * is just useless work since the device can't be opened.
4396 if (test_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
) &&
4397 !test_bit(RBD_DEV_FLAG_REMOVING
, &rbd_dev
->flags
)) {
4398 size
= (sector_t
)rbd_dev
->mapping
.size
/ SECTOR_SIZE
;
4399 dout("setting size to %llu sectors", (unsigned long long)size
);
4400 set_capacity(rbd_dev
->disk
, size
);
4401 revalidate_disk(rbd_dev
->disk
);
4405 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
)
4410 down_write(&rbd_dev
->header_rwsem
);
4411 mapping_size
= rbd_dev
->mapping
.size
;
4413 ret
= rbd_dev_header_info(rbd_dev
);
4418 * If there is a parent, see if it has disappeared due to the
4419 * mapped image getting flattened.
4421 if (rbd_dev
->parent
) {
4422 ret
= rbd_dev_v2_parent_info(rbd_dev
);
4427 if (rbd_dev
->spec
->snap_id
== CEPH_NOSNAP
) {
4428 rbd_dev
->mapping
.size
= rbd_dev
->header
.image_size
;
4430 /* validate mapped snapshot's EXISTS flag */
4431 rbd_exists_validate(rbd_dev
);
4435 up_write(&rbd_dev
->header_rwsem
);
4436 if (!ret
&& mapping_size
!= rbd_dev
->mapping
.size
)
4437 rbd_dev_update_size(rbd_dev
);
4442 static int rbd_init_request(void *data
, struct request
*rq
,
4443 unsigned int hctx_idx
, unsigned int request_idx
,
4444 unsigned int numa_node
)
4446 struct work_struct
*work
= blk_mq_rq_to_pdu(rq
);
4448 INIT_WORK(work
, rbd_queue_workfn
);
4452 static struct blk_mq_ops rbd_mq_ops
= {
4453 .queue_rq
= rbd_queue_rq
,
4454 .init_request
= rbd_init_request
,
4457 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
4459 struct gendisk
*disk
;
4460 struct request_queue
*q
;
4464 /* create gendisk info */
4465 disk
= alloc_disk(single_major
?
4466 (1 << RBD_SINGLE_MAJOR_PART_SHIFT
) :
4467 RBD_MINORS_PER_MAJOR
);
4471 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
4473 disk
->major
= rbd_dev
->major
;
4474 disk
->first_minor
= rbd_dev
->minor
;
4476 disk
->flags
|= GENHD_FL_EXT_DEVT
;
4477 disk
->fops
= &rbd_bd_ops
;
4478 disk
->private_data
= rbd_dev
;
4480 memset(&rbd_dev
->tag_set
, 0, sizeof(rbd_dev
->tag_set
));
4481 rbd_dev
->tag_set
.ops
= &rbd_mq_ops
;
4482 rbd_dev
->tag_set
.queue_depth
= rbd_dev
->opts
->queue_depth
;
4483 rbd_dev
->tag_set
.numa_node
= NUMA_NO_NODE
;
4484 rbd_dev
->tag_set
.flags
= BLK_MQ_F_SHOULD_MERGE
| BLK_MQ_F_SG_MERGE
;
4485 rbd_dev
->tag_set
.nr_hw_queues
= 1;
4486 rbd_dev
->tag_set
.cmd_size
= sizeof(struct work_struct
);
4488 err
= blk_mq_alloc_tag_set(&rbd_dev
->tag_set
);
4492 q
= blk_mq_init_queue(&rbd_dev
->tag_set
);
4498 queue_flag_set_unlocked(QUEUE_FLAG_NONROT
, q
);
4499 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4501 /* set io sizes to object size */
4502 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
4503 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
4504 q
->limits
.max_sectors
= queue_max_hw_sectors(q
);
4505 blk_queue_max_segments(q
, USHRT_MAX
);
4506 blk_queue_max_segment_size(q
, segment_size
);
4507 blk_queue_io_min(q
, segment_size
);
4508 blk_queue_io_opt(q
, segment_size
);
4510 /* enable the discard support */
4511 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD
, q
);
4512 q
->limits
.discard_granularity
= segment_size
;
4513 q
->limits
.discard_alignment
= segment_size
;
4514 blk_queue_max_discard_sectors(q
, segment_size
/ SECTOR_SIZE
);
4515 q
->limits
.discard_zeroes_data
= 1;
4517 if (!ceph_test_opt(rbd_dev
->rbd_client
->client
, NOCRC
))
4518 q
->backing_dev_info
.capabilities
|= BDI_CAP_STABLE_WRITES
;
4522 q
->queuedata
= rbd_dev
;
4524 rbd_dev
->disk
= disk
;
4528 blk_mq_free_tag_set(&rbd_dev
->tag_set
);
4538 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
4540 return container_of(dev
, struct rbd_device
, dev
);
4543 static ssize_t
rbd_size_show(struct device
*dev
,
4544 struct device_attribute
*attr
, char *buf
)
4546 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4548 return sprintf(buf
, "%llu\n",
4549 (unsigned long long)rbd_dev
->mapping
.size
);
4553 * Note this shows the features for whatever's mapped, which is not
4554 * necessarily the base image.
4556 static ssize_t
rbd_features_show(struct device
*dev
,
4557 struct device_attribute
*attr
, char *buf
)
4559 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4561 return sprintf(buf
, "0x%016llx\n",
4562 (unsigned long long)rbd_dev
->mapping
.features
);
4565 static ssize_t
rbd_major_show(struct device
*dev
,
4566 struct device_attribute
*attr
, char *buf
)
4568 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4571 return sprintf(buf
, "%d\n", rbd_dev
->major
);
4573 return sprintf(buf
, "(none)\n");
4576 static ssize_t
rbd_minor_show(struct device
*dev
,
4577 struct device_attribute
*attr
, char *buf
)
4579 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4581 return sprintf(buf
, "%d\n", rbd_dev
->minor
);
4584 static ssize_t
rbd_client_addr_show(struct device
*dev
,
4585 struct device_attribute
*attr
, char *buf
)
4587 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4588 struct ceph_entity_addr
*client_addr
=
4589 ceph_client_addr(rbd_dev
->rbd_client
->client
);
4591 return sprintf(buf
, "%pISpc/%u\n", &client_addr
->in_addr
,
4592 le32_to_cpu(client_addr
->nonce
));
4595 static ssize_t
rbd_client_id_show(struct device
*dev
,
4596 struct device_attribute
*attr
, char *buf
)
4598 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4600 return sprintf(buf
, "client%lld\n",
4601 ceph_client_gid(rbd_dev
->rbd_client
->client
));
4604 static ssize_t
rbd_cluster_fsid_show(struct device
*dev
,
4605 struct device_attribute
*attr
, char *buf
)
4607 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4609 return sprintf(buf
, "%pU\n", &rbd_dev
->rbd_client
->client
->fsid
);
4612 static ssize_t
rbd_config_info_show(struct device
*dev
,
4613 struct device_attribute
*attr
, char *buf
)
4615 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4617 if (!capable(CAP_SYS_ADMIN
))
4620 return sprintf(buf
, "%s\n", rbd_dev
->config_info
);
4623 static ssize_t
rbd_pool_show(struct device
*dev
,
4624 struct device_attribute
*attr
, char *buf
)
4626 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4628 return sprintf(buf
, "%s\n", rbd_dev
->spec
->pool_name
);
4631 static ssize_t
rbd_pool_id_show(struct device
*dev
,
4632 struct device_attribute
*attr
, char *buf
)
4634 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4636 return sprintf(buf
, "%llu\n",
4637 (unsigned long long) rbd_dev
->spec
->pool_id
);
4640 static ssize_t
rbd_name_show(struct device
*dev
,
4641 struct device_attribute
*attr
, char *buf
)
4643 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4645 if (rbd_dev
->spec
->image_name
)
4646 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_name
);
4648 return sprintf(buf
, "(unknown)\n");
4651 static ssize_t
rbd_image_id_show(struct device
*dev
,
4652 struct device_attribute
*attr
, char *buf
)
4654 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4656 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_id
);
4660 * Shows the name of the currently-mapped snapshot (or
4661 * RBD_SNAP_HEAD_NAME for the base image).
4663 static ssize_t
rbd_snap_show(struct device
*dev
,
4664 struct device_attribute
*attr
,
4667 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4669 return sprintf(buf
, "%s\n", rbd_dev
->spec
->snap_name
);
4672 static ssize_t
rbd_snap_id_show(struct device
*dev
,
4673 struct device_attribute
*attr
, char *buf
)
4675 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4677 return sprintf(buf
, "%llu\n", rbd_dev
->spec
->snap_id
);
4681 * For a v2 image, shows the chain of parent images, separated by empty
4682 * lines. For v1 images or if there is no parent, shows "(no parent
4685 static ssize_t
rbd_parent_show(struct device
*dev
,
4686 struct device_attribute
*attr
,
4689 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4692 if (!rbd_dev
->parent
)
4693 return sprintf(buf
, "(no parent image)\n");
4695 for ( ; rbd_dev
->parent
; rbd_dev
= rbd_dev
->parent
) {
4696 struct rbd_spec
*spec
= rbd_dev
->parent_spec
;
4698 count
+= sprintf(&buf
[count
], "%s"
4699 "pool_id %llu\npool_name %s\n"
4700 "image_id %s\nimage_name %s\n"
4701 "snap_id %llu\nsnap_name %s\n"
4703 !count
? "" : "\n", /* first? */
4704 spec
->pool_id
, spec
->pool_name
,
4705 spec
->image_id
, spec
->image_name
?: "(unknown)",
4706 spec
->snap_id
, spec
->snap_name
,
4707 rbd_dev
->parent_overlap
);
4713 static ssize_t
rbd_image_refresh(struct device
*dev
,
4714 struct device_attribute
*attr
,
4718 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4721 if (!capable(CAP_SYS_ADMIN
))
4724 ret
= rbd_dev_refresh(rbd_dev
);
4731 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
4732 static DEVICE_ATTR(features
, S_IRUGO
, rbd_features_show
, NULL
);
4733 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
4734 static DEVICE_ATTR(minor
, S_IRUGO
, rbd_minor_show
, NULL
);
4735 static DEVICE_ATTR(client_addr
, S_IRUGO
, rbd_client_addr_show
, NULL
);
4736 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
4737 static DEVICE_ATTR(cluster_fsid
, S_IRUGO
, rbd_cluster_fsid_show
, NULL
);
4738 static DEVICE_ATTR(config_info
, S_IRUSR
, rbd_config_info_show
, NULL
);
4739 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
4740 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
4741 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
4742 static DEVICE_ATTR(image_id
, S_IRUGO
, rbd_image_id_show
, NULL
);
4743 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
4744 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
4745 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
4746 static DEVICE_ATTR(parent
, S_IRUGO
, rbd_parent_show
, NULL
);
4748 static struct attribute
*rbd_attrs
[] = {
4749 &dev_attr_size
.attr
,
4750 &dev_attr_features
.attr
,
4751 &dev_attr_major
.attr
,
4752 &dev_attr_minor
.attr
,
4753 &dev_attr_client_addr
.attr
,
4754 &dev_attr_client_id
.attr
,
4755 &dev_attr_cluster_fsid
.attr
,
4756 &dev_attr_config_info
.attr
,
4757 &dev_attr_pool
.attr
,
4758 &dev_attr_pool_id
.attr
,
4759 &dev_attr_name
.attr
,
4760 &dev_attr_image_id
.attr
,
4761 &dev_attr_current_snap
.attr
,
4762 &dev_attr_snap_id
.attr
,
4763 &dev_attr_parent
.attr
,
4764 &dev_attr_refresh
.attr
,
4768 static struct attribute_group rbd_attr_group
= {
4772 static const struct attribute_group
*rbd_attr_groups
[] = {
4777 static void rbd_dev_release(struct device
*dev
);
4779 static struct device_type rbd_device_type
= {
4781 .groups
= rbd_attr_groups
,
4782 .release
= rbd_dev_release
,
4785 static struct rbd_spec
*rbd_spec_get(struct rbd_spec
*spec
)
4787 kref_get(&spec
->kref
);
4792 static void rbd_spec_free(struct kref
*kref
);
4793 static void rbd_spec_put(struct rbd_spec
*spec
)
4796 kref_put(&spec
->kref
, rbd_spec_free
);
4799 static struct rbd_spec
*rbd_spec_alloc(void)
4801 struct rbd_spec
*spec
;
4803 spec
= kzalloc(sizeof (*spec
), GFP_KERNEL
);
4807 spec
->pool_id
= CEPH_NOPOOL
;
4808 spec
->snap_id
= CEPH_NOSNAP
;
4809 kref_init(&spec
->kref
);
4814 static void rbd_spec_free(struct kref
*kref
)
4816 struct rbd_spec
*spec
= container_of(kref
, struct rbd_spec
, kref
);
4818 kfree(spec
->pool_name
);
4819 kfree(spec
->image_id
);
4820 kfree(spec
->image_name
);
4821 kfree(spec
->snap_name
);
4825 static void rbd_dev_free(struct rbd_device
*rbd_dev
)
4827 WARN_ON(rbd_dev
->watch_state
!= RBD_WATCH_STATE_UNREGISTERED
);
4828 WARN_ON(rbd_dev
->lock_state
!= RBD_LOCK_STATE_UNLOCKED
);
4830 ceph_oid_destroy(&rbd_dev
->header_oid
);
4831 ceph_oloc_destroy(&rbd_dev
->header_oloc
);
4832 kfree(rbd_dev
->config_info
);
4834 rbd_put_client(rbd_dev
->rbd_client
);
4835 rbd_spec_put(rbd_dev
->spec
);
4836 kfree(rbd_dev
->opts
);
4840 static void rbd_dev_release(struct device
*dev
)
4842 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4843 bool need_put
= !!rbd_dev
->opts
;
4846 destroy_workqueue(rbd_dev
->task_wq
);
4847 ida_simple_remove(&rbd_dev_id_ida
, rbd_dev
->dev_id
);
4850 rbd_dev_free(rbd_dev
);
4853 * This is racy, but way better than putting module outside of
4854 * the release callback. The race window is pretty small, so
4855 * doing something similar to dm (dm-builtin.c) is overkill.
4858 module_put(THIS_MODULE
);
4861 static struct rbd_device
*__rbd_dev_create(struct rbd_client
*rbdc
,
4862 struct rbd_spec
*spec
)
4864 struct rbd_device
*rbd_dev
;
4866 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
4870 spin_lock_init(&rbd_dev
->lock
);
4871 INIT_LIST_HEAD(&rbd_dev
->node
);
4872 init_rwsem(&rbd_dev
->header_rwsem
);
4874 ceph_oid_init(&rbd_dev
->header_oid
);
4875 ceph_oloc_init(&rbd_dev
->header_oloc
);
4877 mutex_init(&rbd_dev
->watch_mutex
);
4878 rbd_dev
->watch_state
= RBD_WATCH_STATE_UNREGISTERED
;
4879 INIT_DELAYED_WORK(&rbd_dev
->watch_dwork
, rbd_reregister_watch
);
4881 init_rwsem(&rbd_dev
->lock_rwsem
);
4882 rbd_dev
->lock_state
= RBD_LOCK_STATE_UNLOCKED
;
4883 INIT_WORK(&rbd_dev
->acquired_lock_work
, rbd_notify_acquired_lock
);
4884 INIT_WORK(&rbd_dev
->released_lock_work
, rbd_notify_released_lock
);
4885 INIT_DELAYED_WORK(&rbd_dev
->lock_dwork
, rbd_acquire_lock
);
4886 INIT_WORK(&rbd_dev
->unlock_work
, rbd_release_lock_work
);
4887 init_waitqueue_head(&rbd_dev
->lock_waitq
);
4889 rbd_dev
->dev
.bus
= &rbd_bus_type
;
4890 rbd_dev
->dev
.type
= &rbd_device_type
;
4891 rbd_dev
->dev
.parent
= &rbd_root_dev
;
4892 device_initialize(&rbd_dev
->dev
);
4894 rbd_dev
->rbd_client
= rbdc
;
4895 rbd_dev
->spec
= spec
;
4897 rbd_dev
->layout
.stripe_unit
= 1 << RBD_MAX_OBJ_ORDER
;
4898 rbd_dev
->layout
.stripe_count
= 1;
4899 rbd_dev
->layout
.object_size
= 1 << RBD_MAX_OBJ_ORDER
;
4900 rbd_dev
->layout
.pool_id
= spec
->pool_id
;
4901 RCU_INIT_POINTER(rbd_dev
->layout
.pool_ns
, NULL
);
4907 * Create a mapping rbd_dev.
4909 static struct rbd_device
*rbd_dev_create(struct rbd_client
*rbdc
,
4910 struct rbd_spec
*spec
,
4911 struct rbd_options
*opts
)
4913 struct rbd_device
*rbd_dev
;
4915 rbd_dev
= __rbd_dev_create(rbdc
, spec
);
4919 rbd_dev
->opts
= opts
;
4921 /* get an id and fill in device name */
4922 rbd_dev
->dev_id
= ida_simple_get(&rbd_dev_id_ida
, 0,
4923 minor_to_rbd_dev_id(1 << MINORBITS
),
4925 if (rbd_dev
->dev_id
< 0)
4928 sprintf(rbd_dev
->name
, RBD_DRV_NAME
"%d", rbd_dev
->dev_id
);
4929 rbd_dev
->task_wq
= alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM
,
4931 if (!rbd_dev
->task_wq
)
4934 /* we have a ref from do_rbd_add() */
4935 __module_get(THIS_MODULE
);
4937 dout("%s rbd_dev %p dev_id %d\n", __func__
, rbd_dev
, rbd_dev
->dev_id
);
4941 ida_simple_remove(&rbd_dev_id_ida
, rbd_dev
->dev_id
);
4943 rbd_dev_free(rbd_dev
);
4947 static void rbd_dev_destroy(struct rbd_device
*rbd_dev
)
4950 put_device(&rbd_dev
->dev
);
4954 * Get the size and object order for an image snapshot, or if
4955 * snap_id is CEPH_NOSNAP, gets this information for the base
4958 static int _rbd_dev_v2_snap_size(struct rbd_device
*rbd_dev
, u64 snap_id
,
4959 u8
*order
, u64
*snap_size
)
4961 __le64 snapid
= cpu_to_le64(snap_id
);
4966 } __attribute__ ((packed
)) size_buf
= { 0 };
4968 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_oid
.name
,
4970 &snapid
, sizeof (snapid
),
4971 &size_buf
, sizeof (size_buf
));
4972 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
4975 if (ret
< sizeof (size_buf
))
4979 *order
= size_buf
.order
;
4980 dout(" order %u", (unsigned int)*order
);
4982 *snap_size
= le64_to_cpu(size_buf
.size
);
4984 dout(" snap_id 0x%016llx snap_size = %llu\n",
4985 (unsigned long long)snap_id
,
4986 (unsigned long long)*snap_size
);
4991 static int rbd_dev_v2_image_size(struct rbd_device
*rbd_dev
)
4993 return _rbd_dev_v2_snap_size(rbd_dev
, CEPH_NOSNAP
,
4994 &rbd_dev
->header
.obj_order
,
4995 &rbd_dev
->header
.image_size
);
4998 static int rbd_dev_v2_object_prefix(struct rbd_device
*rbd_dev
)
5004 reply_buf
= kzalloc(RBD_OBJ_PREFIX_LEN_MAX
, GFP_KERNEL
);
5008 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_oid
.name
,
5009 "rbd", "get_object_prefix", NULL
, 0,
5010 reply_buf
, RBD_OBJ_PREFIX_LEN_MAX
);
5011 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
5016 rbd_dev
->header
.object_prefix
= ceph_extract_encoded_string(&p
,
5017 p
+ ret
, NULL
, GFP_NOIO
);
5020 if (IS_ERR(rbd_dev
->header
.object_prefix
)) {
5021 ret
= PTR_ERR(rbd_dev
->header
.object_prefix
);
5022 rbd_dev
->header
.object_prefix
= NULL
;
5024 dout(" object_prefix = %s\n", rbd_dev
->header
.object_prefix
);
5032 static int _rbd_dev_v2_snap_features(struct rbd_device
*rbd_dev
, u64 snap_id
,
5035 __le64 snapid
= cpu_to_le64(snap_id
);
5039 } __attribute__ ((packed
)) features_buf
= { 0 };
5043 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_oid
.name
,
5044 "rbd", "get_features",
5045 &snapid
, sizeof (snapid
),
5046 &features_buf
, sizeof (features_buf
));
5047 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
5050 if (ret
< sizeof (features_buf
))
5053 unsup
= le64_to_cpu(features_buf
.incompat
) & ~RBD_FEATURES_SUPPORTED
;
5055 rbd_warn(rbd_dev
, "image uses unsupported features: 0x%llx",
5060 *snap_features
= le64_to_cpu(features_buf
.features
);
5062 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5063 (unsigned long long)snap_id
,
5064 (unsigned long long)*snap_features
,
5065 (unsigned long long)le64_to_cpu(features_buf
.incompat
));
5070 static int rbd_dev_v2_features(struct rbd_device
*rbd_dev
)
5072 return _rbd_dev_v2_snap_features(rbd_dev
, CEPH_NOSNAP
,
5073 &rbd_dev
->header
.features
);
5076 static int rbd_dev_v2_parent_info(struct rbd_device
*rbd_dev
)
5078 struct rbd_spec
*parent_spec
;
5080 void *reply_buf
= NULL
;
5090 parent_spec
= rbd_spec_alloc();
5094 size
= sizeof (__le64
) + /* pool_id */
5095 sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
+ /* image_id */
5096 sizeof (__le64
) + /* snap_id */
5097 sizeof (__le64
); /* overlap */
5098 reply_buf
= kmalloc(size
, GFP_KERNEL
);
5104 snapid
= cpu_to_le64(rbd_dev
->spec
->snap_id
);
5105 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_oid
.name
,
5106 "rbd", "get_parent",
5107 &snapid
, sizeof (snapid
),
5109 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
5114 end
= reply_buf
+ ret
;
5116 ceph_decode_64_safe(&p
, end
, pool_id
, out_err
);
5117 if (pool_id
== CEPH_NOPOOL
) {
5119 * Either the parent never existed, or we have
5120 * record of it but the image got flattened so it no
5121 * longer has a parent. When the parent of a
5122 * layered image disappears we immediately set the
5123 * overlap to 0. The effect of this is that all new
5124 * requests will be treated as if the image had no
5127 if (rbd_dev
->parent_overlap
) {
5128 rbd_dev
->parent_overlap
= 0;
5129 rbd_dev_parent_put(rbd_dev
);
5130 pr_info("%s: clone image has been flattened\n",
5131 rbd_dev
->disk
->disk_name
);
5134 goto out
; /* No parent? No problem. */
5137 /* The ceph file layout needs to fit pool id in 32 bits */
5140 if (pool_id
> (u64
)U32_MAX
) {
5141 rbd_warn(NULL
, "parent pool id too large (%llu > %u)",
5142 (unsigned long long)pool_id
, U32_MAX
);
5146 image_id
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
5147 if (IS_ERR(image_id
)) {
5148 ret
= PTR_ERR(image_id
);
5151 ceph_decode_64_safe(&p
, end
, snap_id
, out_err
);
5152 ceph_decode_64_safe(&p
, end
, overlap
, out_err
);
5155 * The parent won't change (except when the clone is
5156 * flattened, already handled that). So we only need to
5157 * record the parent spec we have not already done so.
5159 if (!rbd_dev
->parent_spec
) {
5160 parent_spec
->pool_id
= pool_id
;
5161 parent_spec
->image_id
= image_id
;
5162 parent_spec
->snap_id
= snap_id
;
5163 rbd_dev
->parent_spec
= parent_spec
;
5164 parent_spec
= NULL
; /* rbd_dev now owns this */
5170 * We always update the parent overlap. If it's zero we issue
5171 * a warning, as we will proceed as if there was no parent.
5175 /* refresh, careful to warn just once */
5176 if (rbd_dev
->parent_overlap
)
5178 "clone now standalone (overlap became 0)");
5181 rbd_warn(rbd_dev
, "clone is standalone (overlap 0)");
5184 rbd_dev
->parent_overlap
= overlap
;
5190 rbd_spec_put(parent_spec
);
5195 static int rbd_dev_v2_striping_info(struct rbd_device
*rbd_dev
)
5199 __le64 stripe_count
;
5200 } __attribute__ ((packed
)) striping_info_buf
= { 0 };
5201 size_t size
= sizeof (striping_info_buf
);
5208 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_oid
.name
,
5209 "rbd", "get_stripe_unit_count", NULL
, 0,
5210 (char *)&striping_info_buf
, size
);
5211 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
5218 * We don't actually support the "fancy striping" feature
5219 * (STRIPINGV2) yet, but if the striping sizes are the
5220 * defaults the behavior is the same as before. So find
5221 * out, and only fail if the image has non-default values.
5224 obj_size
= (u64
)1 << rbd_dev
->header
.obj_order
;
5225 p
= &striping_info_buf
;
5226 stripe_unit
= ceph_decode_64(&p
);
5227 if (stripe_unit
!= obj_size
) {
5228 rbd_warn(rbd_dev
, "unsupported stripe unit "
5229 "(got %llu want %llu)",
5230 stripe_unit
, obj_size
);
5233 stripe_count
= ceph_decode_64(&p
);
5234 if (stripe_count
!= 1) {
5235 rbd_warn(rbd_dev
, "unsupported stripe count "
5236 "(got %llu want 1)", stripe_count
);
5239 rbd_dev
->header
.stripe_unit
= stripe_unit
;
5240 rbd_dev
->header
.stripe_count
= stripe_count
;
5245 static char *rbd_dev_image_name(struct rbd_device
*rbd_dev
)
5247 size_t image_id_size
;
5252 void *reply_buf
= NULL
;
5254 char *image_name
= NULL
;
5257 rbd_assert(!rbd_dev
->spec
->image_name
);
5259 len
= strlen(rbd_dev
->spec
->image_id
);
5260 image_id_size
= sizeof (__le32
) + len
;
5261 image_id
= kmalloc(image_id_size
, GFP_KERNEL
);
5266 end
= image_id
+ image_id_size
;
5267 ceph_encode_string(&p
, end
, rbd_dev
->spec
->image_id
, (u32
)len
);
5269 size
= sizeof (__le32
) + RBD_IMAGE_NAME_LEN_MAX
;
5270 reply_buf
= kmalloc(size
, GFP_KERNEL
);
5274 ret
= rbd_obj_method_sync(rbd_dev
, RBD_DIRECTORY
,
5275 "rbd", "dir_get_name",
5276 image_id
, image_id_size
,
5281 end
= reply_buf
+ ret
;
5283 image_name
= ceph_extract_encoded_string(&p
, end
, &len
, GFP_KERNEL
);
5284 if (IS_ERR(image_name
))
5287 dout("%s: name is %s len is %zd\n", __func__
, image_name
, len
);
5295 static u64
rbd_v1_snap_id_by_name(struct rbd_device
*rbd_dev
, const char *name
)
5297 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
5298 const char *snap_name
;
5301 /* Skip over names until we find the one we are looking for */
5303 snap_name
= rbd_dev
->header
.snap_names
;
5304 while (which
< snapc
->num_snaps
) {
5305 if (!strcmp(name
, snap_name
))
5306 return snapc
->snaps
[which
];
5307 snap_name
+= strlen(snap_name
) + 1;
5313 static u64
rbd_v2_snap_id_by_name(struct rbd_device
*rbd_dev
, const char *name
)
5315 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
5320 for (which
= 0; !found
&& which
< snapc
->num_snaps
; which
++) {
5321 const char *snap_name
;
5323 snap_id
= snapc
->snaps
[which
];
5324 snap_name
= rbd_dev_v2_snap_name(rbd_dev
, snap_id
);
5325 if (IS_ERR(snap_name
)) {
5326 /* ignore no-longer existing snapshots */
5327 if (PTR_ERR(snap_name
) == -ENOENT
)
5332 found
= !strcmp(name
, snap_name
);
5335 return found
? snap_id
: CEPH_NOSNAP
;
5339 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5340 * no snapshot by that name is found, or if an error occurs.
5342 static u64
rbd_snap_id_by_name(struct rbd_device
*rbd_dev
, const char *name
)
5344 if (rbd_dev
->image_format
== 1)
5345 return rbd_v1_snap_id_by_name(rbd_dev
, name
);
5347 return rbd_v2_snap_id_by_name(rbd_dev
, name
);
5351 * An image being mapped will have everything but the snap id.
5353 static int rbd_spec_fill_snap_id(struct rbd_device
*rbd_dev
)
5355 struct rbd_spec
*spec
= rbd_dev
->spec
;
5357 rbd_assert(spec
->pool_id
!= CEPH_NOPOOL
&& spec
->pool_name
);
5358 rbd_assert(spec
->image_id
&& spec
->image_name
);
5359 rbd_assert(spec
->snap_name
);
5361 if (strcmp(spec
->snap_name
, RBD_SNAP_HEAD_NAME
)) {
5364 snap_id
= rbd_snap_id_by_name(rbd_dev
, spec
->snap_name
);
5365 if (snap_id
== CEPH_NOSNAP
)
5368 spec
->snap_id
= snap_id
;
5370 spec
->snap_id
= CEPH_NOSNAP
;
5377 * A parent image will have all ids but none of the names.
5379 * All names in an rbd spec are dynamically allocated. It's OK if we
5380 * can't figure out the name for an image id.
5382 static int rbd_spec_fill_names(struct rbd_device
*rbd_dev
)
5384 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
5385 struct rbd_spec
*spec
= rbd_dev
->spec
;
5386 const char *pool_name
;
5387 const char *image_name
;
5388 const char *snap_name
;
5391 rbd_assert(spec
->pool_id
!= CEPH_NOPOOL
);
5392 rbd_assert(spec
->image_id
);
5393 rbd_assert(spec
->snap_id
!= CEPH_NOSNAP
);
5395 /* Get the pool name; we have to make our own copy of this */
5397 pool_name
= ceph_pg_pool_name_by_id(osdc
->osdmap
, spec
->pool_id
);
5399 rbd_warn(rbd_dev
, "no pool with id %llu", spec
->pool_id
);
5402 pool_name
= kstrdup(pool_name
, GFP_KERNEL
);
5406 /* Fetch the image name; tolerate failure here */
5408 image_name
= rbd_dev_image_name(rbd_dev
);
5410 rbd_warn(rbd_dev
, "unable to get image name");
5412 /* Fetch the snapshot name */
5414 snap_name
= rbd_snap_name(rbd_dev
, spec
->snap_id
);
5415 if (IS_ERR(snap_name
)) {
5416 ret
= PTR_ERR(snap_name
);
5420 spec
->pool_name
= pool_name
;
5421 spec
->image_name
= image_name
;
5422 spec
->snap_name
= snap_name
;
5432 static int rbd_dev_v2_snap_context(struct rbd_device
*rbd_dev
)
5441 struct ceph_snap_context
*snapc
;
5445 * We'll need room for the seq value (maximum snapshot id),
5446 * snapshot count, and array of that many snapshot ids.
5447 * For now we have a fixed upper limit on the number we're
5448 * prepared to receive.
5450 size
= sizeof (__le64
) + sizeof (__le32
) +
5451 RBD_MAX_SNAP_COUNT
* sizeof (__le64
);
5452 reply_buf
= kzalloc(size
, GFP_KERNEL
);
5456 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_oid
.name
,
5457 "rbd", "get_snapcontext", NULL
, 0,
5459 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
5464 end
= reply_buf
+ ret
;
5466 ceph_decode_64_safe(&p
, end
, seq
, out
);
5467 ceph_decode_32_safe(&p
, end
, snap_count
, out
);
5470 * Make sure the reported number of snapshot ids wouldn't go
5471 * beyond the end of our buffer. But before checking that,
5472 * make sure the computed size of the snapshot context we
5473 * allocate is representable in a size_t.
5475 if (snap_count
> (SIZE_MAX
- sizeof (struct ceph_snap_context
))
5480 if (!ceph_has_room(&p
, end
, snap_count
* sizeof (__le64
)))
5484 snapc
= ceph_create_snap_context(snap_count
, GFP_KERNEL
);
5490 for (i
= 0; i
< snap_count
; i
++)
5491 snapc
->snaps
[i
] = ceph_decode_64(&p
);
5493 ceph_put_snap_context(rbd_dev
->header
.snapc
);
5494 rbd_dev
->header
.snapc
= snapc
;
5496 dout(" snap context seq = %llu, snap_count = %u\n",
5497 (unsigned long long)seq
, (unsigned int)snap_count
);
5504 static const char *rbd_dev_v2_snap_name(struct rbd_device
*rbd_dev
,
5515 size
= sizeof (__le32
) + RBD_MAX_SNAP_NAME_LEN
;
5516 reply_buf
= kmalloc(size
, GFP_KERNEL
);
5518 return ERR_PTR(-ENOMEM
);
5520 snapid
= cpu_to_le64(snap_id
);
5521 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_oid
.name
,
5522 "rbd", "get_snapshot_name",
5523 &snapid
, sizeof (snapid
),
5525 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
5527 snap_name
= ERR_PTR(ret
);
5532 end
= reply_buf
+ ret
;
5533 snap_name
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
5534 if (IS_ERR(snap_name
))
5537 dout(" snap_id 0x%016llx snap_name = %s\n",
5538 (unsigned long long)snap_id
, snap_name
);
5545 static int rbd_dev_v2_header_info(struct rbd_device
*rbd_dev
)
5547 bool first_time
= rbd_dev
->header
.object_prefix
== NULL
;
5550 ret
= rbd_dev_v2_image_size(rbd_dev
);
5555 ret
= rbd_dev_v2_header_onetime(rbd_dev
);
5560 ret
= rbd_dev_v2_snap_context(rbd_dev
);
5561 if (ret
&& first_time
) {
5562 kfree(rbd_dev
->header
.object_prefix
);
5563 rbd_dev
->header
.object_prefix
= NULL
;
5569 static int rbd_dev_header_info(struct rbd_device
*rbd_dev
)
5571 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
5573 if (rbd_dev
->image_format
== 1)
5574 return rbd_dev_v1_header_info(rbd_dev
);
5576 return rbd_dev_v2_header_info(rbd_dev
);
5580 * Skips over white space at *buf, and updates *buf to point to the
5581 * first found non-space character (if any). Returns the length of
5582 * the token (string of non-white space characters) found. Note
5583 * that *buf must be terminated with '\0'.
5585 static inline size_t next_token(const char **buf
)
5588 * These are the characters that produce nonzero for
5589 * isspace() in the "C" and "POSIX" locales.
5591 const char *spaces
= " \f\n\r\t\v";
5593 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
5595 return strcspn(*buf
, spaces
); /* Return token length */
5599 * Finds the next token in *buf, dynamically allocates a buffer big
5600 * enough to hold a copy of it, and copies the token into the new
5601 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5602 * that a duplicate buffer is created even for a zero-length token.
5604 * Returns a pointer to the newly-allocated duplicate, or a null
5605 * pointer if memory for the duplicate was not available. If
5606 * the lenp argument is a non-null pointer, the length of the token
5607 * (not including the '\0') is returned in *lenp.
5609 * If successful, the *buf pointer will be updated to point beyond
5610 * the end of the found token.
5612 * Note: uses GFP_KERNEL for allocation.
5614 static inline char *dup_token(const char **buf
, size_t *lenp
)
5619 len
= next_token(buf
);
5620 dup
= kmemdup(*buf
, len
+ 1, GFP_KERNEL
);
5623 *(dup
+ len
) = '\0';
5633 * Parse the options provided for an "rbd add" (i.e., rbd image
5634 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5635 * and the data written is passed here via a NUL-terminated buffer.
5636 * Returns 0 if successful or an error code otherwise.
5638 * The information extracted from these options is recorded in
5639 * the other parameters which return dynamically-allocated
5642 * The address of a pointer that will refer to a ceph options
5643 * structure. Caller must release the returned pointer using
5644 * ceph_destroy_options() when it is no longer needed.
5646 * Address of an rbd options pointer. Fully initialized by
5647 * this function; caller must release with kfree().
5649 * Address of an rbd image specification pointer. Fully
5650 * initialized by this function based on parsed options.
5651 * Caller must release with rbd_spec_put().
5653 * The options passed take this form:
5654 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5657 * A comma-separated list of one or more monitor addresses.
5658 * A monitor address is an ip address, optionally followed
5659 * by a port number (separated by a colon).
5660 * I.e.: ip1[:port1][,ip2[:port2]...]
5662 * A comma-separated list of ceph and/or rbd options.
5664 * The name of the rados pool containing the rbd image.
5666 * The name of the image in that pool to map.
5668 * An optional snapshot id. If provided, the mapping will
5669 * present data from the image at the time that snapshot was
5670 * created. The image head is used if no snapshot id is
5671 * provided. Snapshot mappings are always read-only.
5673 static int rbd_add_parse_args(const char *buf
,
5674 struct ceph_options
**ceph_opts
,
5675 struct rbd_options
**opts
,
5676 struct rbd_spec
**rbd_spec
)
5680 const char *mon_addrs
;
5682 size_t mon_addrs_size
;
5683 struct rbd_spec
*spec
= NULL
;
5684 struct rbd_options
*rbd_opts
= NULL
;
5685 struct ceph_options
*copts
;
5688 /* The first four tokens are required */
5690 len
= next_token(&buf
);
5692 rbd_warn(NULL
, "no monitor address(es) provided");
5696 mon_addrs_size
= len
+ 1;
5700 options
= dup_token(&buf
, NULL
);
5704 rbd_warn(NULL
, "no options provided");
5708 spec
= rbd_spec_alloc();
5712 spec
->pool_name
= dup_token(&buf
, NULL
);
5713 if (!spec
->pool_name
)
5715 if (!*spec
->pool_name
) {
5716 rbd_warn(NULL
, "no pool name provided");
5720 spec
->image_name
= dup_token(&buf
, NULL
);
5721 if (!spec
->image_name
)
5723 if (!*spec
->image_name
) {
5724 rbd_warn(NULL
, "no image name provided");
5729 * Snapshot name is optional; default is to use "-"
5730 * (indicating the head/no snapshot).
5732 len
= next_token(&buf
);
5734 buf
= RBD_SNAP_HEAD_NAME
; /* No snapshot supplied */
5735 len
= sizeof (RBD_SNAP_HEAD_NAME
) - 1;
5736 } else if (len
> RBD_MAX_SNAP_NAME_LEN
) {
5737 ret
= -ENAMETOOLONG
;
5740 snap_name
= kmemdup(buf
, len
+ 1, GFP_KERNEL
);
5743 *(snap_name
+ len
) = '\0';
5744 spec
->snap_name
= snap_name
;
5746 /* Initialize all rbd options to the defaults */
5748 rbd_opts
= kzalloc(sizeof (*rbd_opts
), GFP_KERNEL
);
5752 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
5753 rbd_opts
->queue_depth
= RBD_QUEUE_DEPTH_DEFAULT
;
5754 rbd_opts
->lock_on_read
= RBD_LOCK_ON_READ_DEFAULT
;
5756 copts
= ceph_parse_options(options
, mon_addrs
,
5757 mon_addrs
+ mon_addrs_size
- 1,
5758 parse_rbd_opts_token
, rbd_opts
);
5759 if (IS_ERR(copts
)) {
5760 ret
= PTR_ERR(copts
);
5781 * Return pool id (>= 0) or a negative error code.
5783 static int rbd_add_get_pool_id(struct rbd_client
*rbdc
, const char *pool_name
)
5785 struct ceph_options
*opts
= rbdc
->client
->options
;
5791 ret
= ceph_pg_poolid_by_name(rbdc
->client
->osdc
.osdmap
, pool_name
);
5792 if (ret
== -ENOENT
&& tries
++ < 1) {
5793 ret
= ceph_monc_get_version(&rbdc
->client
->monc
, "osdmap",
5798 if (rbdc
->client
->osdc
.osdmap
->epoch
< newest_epoch
) {
5799 ceph_osdc_maybe_request_map(&rbdc
->client
->osdc
);
5800 (void) ceph_monc_wait_osdmap(&rbdc
->client
->monc
,
5802 opts
->mount_timeout
);
5805 /* the osdmap we have is new enough */
5814 * An rbd format 2 image has a unique identifier, distinct from the
5815 * name given to it by the user. Internally, that identifier is
5816 * what's used to specify the names of objects related to the image.
5818 * A special "rbd id" object is used to map an rbd image name to its
5819 * id. If that object doesn't exist, then there is no v2 rbd image
5820 * with the supplied name.
5822 * This function will record the given rbd_dev's image_id field if
5823 * it can be determined, and in that case will return 0. If any
5824 * errors occur a negative errno will be returned and the rbd_dev's
5825 * image_id field will be unchanged (and should be NULL).
5827 static int rbd_dev_image_id(struct rbd_device
*rbd_dev
)
5836 * When probing a parent image, the image id is already
5837 * known (and the image name likely is not). There's no
5838 * need to fetch the image id again in this case. We
5839 * do still need to set the image format though.
5841 if (rbd_dev
->spec
->image_id
) {
5842 rbd_dev
->image_format
= *rbd_dev
->spec
->image_id
? 2 : 1;
5848 * First, see if the format 2 image id file exists, and if
5849 * so, get the image's persistent id from it.
5851 size
= sizeof (RBD_ID_PREFIX
) + strlen(rbd_dev
->spec
->image_name
);
5852 object_name
= kmalloc(size
, GFP_NOIO
);
5855 sprintf(object_name
, "%s%s", RBD_ID_PREFIX
, rbd_dev
->spec
->image_name
);
5856 dout("rbd id object name is %s\n", object_name
);
5858 /* Response will be an encoded string, which includes a length */
5860 size
= sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
;
5861 response
= kzalloc(size
, GFP_NOIO
);
5867 /* If it doesn't exist we'll assume it's a format 1 image */
5869 ret
= rbd_obj_method_sync(rbd_dev
, object_name
,
5870 "rbd", "get_id", NULL
, 0,
5871 response
, RBD_IMAGE_ID_LEN_MAX
);
5872 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
5873 if (ret
== -ENOENT
) {
5874 image_id
= kstrdup("", GFP_KERNEL
);
5875 ret
= image_id
? 0 : -ENOMEM
;
5877 rbd_dev
->image_format
= 1;
5878 } else if (ret
>= 0) {
5881 image_id
= ceph_extract_encoded_string(&p
, p
+ ret
,
5883 ret
= PTR_ERR_OR_ZERO(image_id
);
5885 rbd_dev
->image_format
= 2;
5889 rbd_dev
->spec
->image_id
= image_id
;
5890 dout("image_id is %s\n", image_id
);
5900 * Undo whatever state changes are made by v1 or v2 header info
5903 static void rbd_dev_unprobe(struct rbd_device
*rbd_dev
)
5905 struct rbd_image_header
*header
;
5907 rbd_dev_parent_put(rbd_dev
);
5909 /* Free dynamic fields from the header, then zero it out */
5911 header
= &rbd_dev
->header
;
5912 ceph_put_snap_context(header
->snapc
);
5913 kfree(header
->snap_sizes
);
5914 kfree(header
->snap_names
);
5915 kfree(header
->object_prefix
);
5916 memset(header
, 0, sizeof (*header
));
5919 static int rbd_dev_v2_header_onetime(struct rbd_device
*rbd_dev
)
5923 ret
= rbd_dev_v2_object_prefix(rbd_dev
);
5928 * Get the and check features for the image. Currently the
5929 * features are assumed to never change.
5931 ret
= rbd_dev_v2_features(rbd_dev
);
5935 /* If the image supports fancy striping, get its parameters */
5937 if (rbd_dev
->header
.features
& RBD_FEATURE_STRIPINGV2
) {
5938 ret
= rbd_dev_v2_striping_info(rbd_dev
);
5942 /* No support for crypto and compression type format 2 images */
5946 rbd_dev
->header
.features
= 0;
5947 kfree(rbd_dev
->header
.object_prefix
);
5948 rbd_dev
->header
.object_prefix
= NULL
;
5954 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5955 * rbd_dev_image_probe() recursion depth, which means it's also the
5956 * length of the already discovered part of the parent chain.
5958 static int rbd_dev_probe_parent(struct rbd_device
*rbd_dev
, int depth
)
5960 struct rbd_device
*parent
= NULL
;
5963 if (!rbd_dev
->parent_spec
)
5966 if (++depth
> RBD_MAX_PARENT_CHAIN_LEN
) {
5967 pr_info("parent chain is too long (%d)\n", depth
);
5972 parent
= __rbd_dev_create(rbd_dev
->rbd_client
, rbd_dev
->parent_spec
);
5979 * Images related by parent/child relationships always share
5980 * rbd_client and spec/parent_spec, so bump their refcounts.
5982 __rbd_get_client(rbd_dev
->rbd_client
);
5983 rbd_spec_get(rbd_dev
->parent_spec
);
5985 ret
= rbd_dev_image_probe(parent
, depth
);
5989 rbd_dev
->parent
= parent
;
5990 atomic_set(&rbd_dev
->parent_ref
, 1);
5994 rbd_dev_unparent(rbd_dev
);
5995 rbd_dev_destroy(parent
);
6000 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6003 static int rbd_dev_device_setup(struct rbd_device
*rbd_dev
)
6007 /* Record our major and minor device numbers. */
6009 if (!single_major
) {
6010 ret
= register_blkdev(0, rbd_dev
->name
);
6012 goto err_out_unlock
;
6014 rbd_dev
->major
= ret
;
6017 rbd_dev
->major
= rbd_major
;
6018 rbd_dev
->minor
= rbd_dev_id_to_minor(rbd_dev
->dev_id
);
6021 /* Set up the blkdev mapping. */
6023 ret
= rbd_init_disk(rbd_dev
);
6025 goto err_out_blkdev
;
6027 ret
= rbd_dev_mapping_set(rbd_dev
);
6031 set_capacity(rbd_dev
->disk
, rbd_dev
->mapping
.size
/ SECTOR_SIZE
);
6032 set_disk_ro(rbd_dev
->disk
, rbd_dev
->mapping
.read_only
);
6034 dev_set_name(&rbd_dev
->dev
, "%d", rbd_dev
->dev_id
);
6035 ret
= device_add(&rbd_dev
->dev
);
6037 goto err_out_mapping
;
6039 /* Everything's ready. Announce the disk to the world. */
6041 set_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
);
6042 up_write(&rbd_dev
->header_rwsem
);
6044 spin_lock(&rbd_dev_list_lock
);
6045 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
6046 spin_unlock(&rbd_dev_list_lock
);
6048 add_disk(rbd_dev
->disk
);
6049 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev
->disk
->disk_name
,
6050 (unsigned long long)get_capacity(rbd_dev
->disk
) << SECTOR_SHIFT
,
6051 rbd_dev
->header
.features
);
6056 rbd_dev_mapping_clear(rbd_dev
);
6058 rbd_free_disk(rbd_dev
);
6061 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
6063 up_write(&rbd_dev
->header_rwsem
);
6067 static int rbd_dev_header_name(struct rbd_device
*rbd_dev
)
6069 struct rbd_spec
*spec
= rbd_dev
->spec
;
6072 /* Record the header object name for this rbd image. */
6074 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
6076 rbd_dev
->header_oloc
.pool
= rbd_dev
->layout
.pool_id
;
6077 if (rbd_dev
->image_format
== 1)
6078 ret
= ceph_oid_aprintf(&rbd_dev
->header_oid
, GFP_KERNEL
, "%s%s",
6079 spec
->image_name
, RBD_SUFFIX
);
6081 ret
= ceph_oid_aprintf(&rbd_dev
->header_oid
, GFP_KERNEL
, "%s%s",
6082 RBD_HEADER_PREFIX
, spec
->image_id
);
6087 static void rbd_dev_image_release(struct rbd_device
*rbd_dev
)
6089 rbd_dev_unprobe(rbd_dev
);
6090 rbd_dev
->image_format
= 0;
6091 kfree(rbd_dev
->spec
->image_id
);
6092 rbd_dev
->spec
->image_id
= NULL
;
6094 rbd_dev_destroy(rbd_dev
);
6098 * Probe for the existence of the header object for the given rbd
6099 * device. If this image is the one being mapped (i.e., not a
6100 * parent), initiate a watch on its header object before using that
6101 * object to get detailed information about the rbd image.
6103 static int rbd_dev_image_probe(struct rbd_device
*rbd_dev
, int depth
)
6108 * Get the id from the image id object. Unless there's an
6109 * error, rbd_dev->spec->image_id will be filled in with
6110 * a dynamically-allocated string, and rbd_dev->image_format
6111 * will be set to either 1 or 2.
6113 ret
= rbd_dev_image_id(rbd_dev
);
6117 ret
= rbd_dev_header_name(rbd_dev
);
6119 goto err_out_format
;
6122 ret
= rbd_register_watch(rbd_dev
);
6125 pr_info("image %s/%s does not exist\n",
6126 rbd_dev
->spec
->pool_name
,
6127 rbd_dev
->spec
->image_name
);
6128 goto err_out_format
;
6132 ret
= rbd_dev_header_info(rbd_dev
);
6137 * If this image is the one being mapped, we have pool name and
6138 * id, image name and id, and snap name - need to fill snap id.
6139 * Otherwise this is a parent image, identified by pool, image
6140 * and snap ids - need to fill in names for those ids.
6143 ret
= rbd_spec_fill_snap_id(rbd_dev
);
6145 ret
= rbd_spec_fill_names(rbd_dev
);
6148 pr_info("snap %s/%s@%s does not exist\n",
6149 rbd_dev
->spec
->pool_name
,
6150 rbd_dev
->spec
->image_name
,
6151 rbd_dev
->spec
->snap_name
);
6155 if (rbd_dev
->header
.features
& RBD_FEATURE_LAYERING
) {
6156 ret
= rbd_dev_v2_parent_info(rbd_dev
);
6161 * Need to warn users if this image is the one being
6162 * mapped and has a parent.
6164 if (!depth
&& rbd_dev
->parent_spec
)
6166 "WARNING: kernel layering is EXPERIMENTAL!");
6169 ret
= rbd_dev_probe_parent(rbd_dev
, depth
);
6173 dout("discovered format %u image, header name is %s\n",
6174 rbd_dev
->image_format
, rbd_dev
->header_oid
.name
);
6178 rbd_dev_unprobe(rbd_dev
);
6181 rbd_unregister_watch(rbd_dev
);
6183 rbd_dev
->image_format
= 0;
6184 kfree(rbd_dev
->spec
->image_id
);
6185 rbd_dev
->spec
->image_id
= NULL
;
6189 static ssize_t
do_rbd_add(struct bus_type
*bus
,
6193 struct rbd_device
*rbd_dev
= NULL
;
6194 struct ceph_options
*ceph_opts
= NULL
;
6195 struct rbd_options
*rbd_opts
= NULL
;
6196 struct rbd_spec
*spec
= NULL
;
6197 struct rbd_client
*rbdc
;
6201 if (!capable(CAP_SYS_ADMIN
))
6204 if (!try_module_get(THIS_MODULE
))
6207 /* parse add command */
6208 rc
= rbd_add_parse_args(buf
, &ceph_opts
, &rbd_opts
, &spec
);
6212 rbdc
= rbd_get_client(ceph_opts
);
6219 rc
= rbd_add_get_pool_id(rbdc
, spec
->pool_name
);
6222 pr_info("pool %s does not exist\n", spec
->pool_name
);
6223 goto err_out_client
;
6225 spec
->pool_id
= (u64
)rc
;
6227 rbd_dev
= rbd_dev_create(rbdc
, spec
, rbd_opts
);
6230 goto err_out_client
;
6232 rbdc
= NULL
; /* rbd_dev now owns this */
6233 spec
= NULL
; /* rbd_dev now owns this */
6234 rbd_opts
= NULL
; /* rbd_dev now owns this */
6236 rbd_dev
->config_info
= kstrdup(buf
, GFP_KERNEL
);
6237 if (!rbd_dev
->config_info
) {
6239 goto err_out_rbd_dev
;
6242 down_write(&rbd_dev
->header_rwsem
);
6243 rc
= rbd_dev_image_probe(rbd_dev
, 0);
6245 up_write(&rbd_dev
->header_rwsem
);
6246 goto err_out_rbd_dev
;
6249 /* If we are mapping a snapshot it must be marked read-only */
6251 read_only
= rbd_dev
->opts
->read_only
;
6252 if (rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
)
6254 rbd_dev
->mapping
.read_only
= read_only
;
6256 rc
= rbd_dev_device_setup(rbd_dev
);
6259 * rbd_unregister_watch() can't be moved into
6260 * rbd_dev_image_release() without refactoring, see
6261 * commit 1f3ef78861ac.
6263 rbd_unregister_watch(rbd_dev
);
6264 rbd_dev_image_release(rbd_dev
);
6270 module_put(THIS_MODULE
);
6274 rbd_dev_destroy(rbd_dev
);
6276 rbd_put_client(rbdc
);
6283 static ssize_t
rbd_add(struct bus_type
*bus
,
6290 return do_rbd_add(bus
, buf
, count
);
6293 static ssize_t
rbd_add_single_major(struct bus_type
*bus
,
6297 return do_rbd_add(bus
, buf
, count
);
6300 static void rbd_dev_device_release(struct rbd_device
*rbd_dev
)
6302 rbd_free_disk(rbd_dev
);
6304 spin_lock(&rbd_dev_list_lock
);
6305 list_del_init(&rbd_dev
->node
);
6306 spin_unlock(&rbd_dev_list_lock
);
6308 clear_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
);
6309 device_del(&rbd_dev
->dev
);
6310 rbd_dev_mapping_clear(rbd_dev
);
6312 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
6315 static void rbd_dev_remove_parent(struct rbd_device
*rbd_dev
)
6317 while (rbd_dev
->parent
) {
6318 struct rbd_device
*first
= rbd_dev
;
6319 struct rbd_device
*second
= first
->parent
;
6320 struct rbd_device
*third
;
6323 * Follow to the parent with no grandparent and
6326 while (second
&& (third
= second
->parent
)) {
6331 rbd_dev_image_release(second
);
6332 first
->parent
= NULL
;
6333 first
->parent_overlap
= 0;
6335 rbd_assert(first
->parent_spec
);
6336 rbd_spec_put(first
->parent_spec
);
6337 first
->parent_spec
= NULL
;
6341 static ssize_t
do_rbd_remove(struct bus_type
*bus
,
6345 struct rbd_device
*rbd_dev
= NULL
;
6346 struct list_head
*tmp
;
6352 if (!capable(CAP_SYS_ADMIN
))
6357 sscanf(buf
, "%d %5s", &dev_id
, opt_buf
);
6359 pr_err("dev_id out of range\n");
6362 if (opt_buf
[0] != '\0') {
6363 if (!strcmp(opt_buf
, "force")) {
6366 pr_err("bad remove option at '%s'\n", opt_buf
);
6372 spin_lock(&rbd_dev_list_lock
);
6373 list_for_each(tmp
, &rbd_dev_list
) {
6374 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
6375 if (rbd_dev
->dev_id
== dev_id
) {
6381 spin_lock_irq(&rbd_dev
->lock
);
6382 if (rbd_dev
->open_count
&& !force
)
6384 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING
,
6387 spin_unlock_irq(&rbd_dev
->lock
);
6389 spin_unlock(&rbd_dev_list_lock
);
6395 * Prevent new IO from being queued and wait for existing
6396 * IO to complete/fail.
6398 blk_mq_freeze_queue(rbd_dev
->disk
->queue
);
6399 blk_set_queue_dying(rbd_dev
->disk
->queue
);
6402 down_write(&rbd_dev
->lock_rwsem
);
6403 if (__rbd_is_lock_owner(rbd_dev
))
6404 rbd_unlock(rbd_dev
);
6405 up_write(&rbd_dev
->lock_rwsem
);
6406 rbd_unregister_watch(rbd_dev
);
6409 * Don't free anything from rbd_dev->disk until after all
6410 * notifies are completely processed. Otherwise
6411 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6412 * in a potential use after free of rbd_dev->disk or rbd_dev.
6414 rbd_dev_device_release(rbd_dev
);
6415 rbd_dev_image_release(rbd_dev
);
6420 static ssize_t
rbd_remove(struct bus_type
*bus
,
6427 return do_rbd_remove(bus
, buf
, count
);
6430 static ssize_t
rbd_remove_single_major(struct bus_type
*bus
,
6434 return do_rbd_remove(bus
, buf
, count
);
6438 * create control files in sysfs
6441 static int rbd_sysfs_init(void)
6445 ret
= device_register(&rbd_root_dev
);
6449 ret
= bus_register(&rbd_bus_type
);
6451 device_unregister(&rbd_root_dev
);
6456 static void rbd_sysfs_cleanup(void)
6458 bus_unregister(&rbd_bus_type
);
6459 device_unregister(&rbd_root_dev
);
6462 static int rbd_slab_init(void)
6464 rbd_assert(!rbd_img_request_cache
);
6465 rbd_img_request_cache
= KMEM_CACHE(rbd_img_request
, 0);
6466 if (!rbd_img_request_cache
)
6469 rbd_assert(!rbd_obj_request_cache
);
6470 rbd_obj_request_cache
= KMEM_CACHE(rbd_obj_request
, 0);
6471 if (!rbd_obj_request_cache
)
6474 rbd_assert(!rbd_segment_name_cache
);
6475 rbd_segment_name_cache
= kmem_cache_create("rbd_segment_name",
6476 CEPH_MAX_OID_NAME_LEN
+ 1, 1, 0, NULL
);
6477 if (rbd_segment_name_cache
)
6480 kmem_cache_destroy(rbd_obj_request_cache
);
6481 rbd_obj_request_cache
= NULL
;
6483 kmem_cache_destroy(rbd_img_request_cache
);
6484 rbd_img_request_cache
= NULL
;
6489 static void rbd_slab_exit(void)
6491 rbd_assert(rbd_segment_name_cache
);
6492 kmem_cache_destroy(rbd_segment_name_cache
);
6493 rbd_segment_name_cache
= NULL
;
6495 rbd_assert(rbd_obj_request_cache
);
6496 kmem_cache_destroy(rbd_obj_request_cache
);
6497 rbd_obj_request_cache
= NULL
;
6499 rbd_assert(rbd_img_request_cache
);
6500 kmem_cache_destroy(rbd_img_request_cache
);
6501 rbd_img_request_cache
= NULL
;
6504 static int __init
rbd_init(void)
6508 if (!libceph_compatible(NULL
)) {
6509 rbd_warn(NULL
, "libceph incompatibility (quitting)");
6513 rc
= rbd_slab_init();
6518 * The number of active work items is limited by the number of
6519 * rbd devices * queue depth, so leave @max_active at default.
6521 rbd_wq
= alloc_workqueue(RBD_DRV_NAME
, WQ_MEM_RECLAIM
, 0);
6528 rbd_major
= register_blkdev(0, RBD_DRV_NAME
);
6529 if (rbd_major
< 0) {
6535 rc
= rbd_sysfs_init();
6537 goto err_out_blkdev
;
6540 pr_info("loaded (major %d)\n", rbd_major
);
6542 pr_info("loaded\n");
6548 unregister_blkdev(rbd_major
, RBD_DRV_NAME
);
6550 destroy_workqueue(rbd_wq
);
6556 static void __exit
rbd_exit(void)
6558 ida_destroy(&rbd_dev_id_ida
);
6559 rbd_sysfs_cleanup();
6561 unregister_blkdev(rbd_major
, RBD_DRV_NAME
);
6562 destroy_workqueue(rbd_wq
);
6566 module_init(rbd_init
);
6567 module_exit(rbd_exit
);
6569 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6570 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6571 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6572 /* following authorship retained from original osdblk.c */
6573 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6575 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6576 MODULE_LICENSE("GPL");