2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59 #define RBD_MAX_POOL_NAME_LEN 64
60 #define RBD_MAX_SNAP_NAME_LEN 32
61 #define RBD_MAX_OPT_LEN 1024
63 #define RBD_SNAP_HEAD_NAME "-"
66 * An RBD device name will be "rbd#", where the "rbd" comes from
67 * RBD_DRV_NAME above, and # is a unique integer identifier.
68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69 * enough to hold all possible device names.
71 #define DEV_NAME_LEN 32
72 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
74 #define RBD_READ_ONLY_DEFAULT false
77 * block device image metadata (in-memory version)
79 struct rbd_image_header
{
85 struct ceph_snap_context
*snapc
;
86 size_t snap_names_len
;
101 * an instance of the client. multiple devices may share an rbd client.
104 struct ceph_client
*client
;
105 struct rbd_options
*rbd_opts
;
107 struct list_head node
;
111 * a request completion status
113 struct rbd_req_status
{
120 * a collection of requests
122 struct rbd_req_coll
{
126 struct rbd_req_status status
[0];
130 * a single io request
133 struct request
*rq
; /* blk layer request */
134 struct bio
*bio
; /* cloned bio */
135 struct page
**pages
; /* list of used pages */
138 struct rbd_req_coll
*coll
;
145 struct list_head node
;
153 int id
; /* blkdev unique id */
155 int major
; /* blkdev assigned major */
156 struct gendisk
*disk
; /* blkdev's gendisk and rq */
157 struct request_queue
*q
;
159 struct rbd_client
*rbd_client
;
161 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
163 spinlock_t lock
; /* queue lock */
165 struct rbd_image_header header
;
166 char obj
[RBD_MAX_OBJ_NAME_LEN
]; /* rbd image name */
168 char obj_md_name
[RBD_MAX_MD_NAME_LEN
]; /* hdr nm. */
169 char pool_name
[RBD_MAX_POOL_NAME_LEN
];
172 struct ceph_osd_event
*watch_event
;
173 struct ceph_osd_request
*watch_request
;
175 /* protects updating the header */
176 struct rw_semaphore header_rwsem
;
177 /* name of the snapshot this device reads from */
178 char snap_name
[RBD_MAX_SNAP_NAME_LEN
];
179 /* id of the snapshot this device reads from */
180 u64 snap_id
; /* current snapshot id */
181 /* whether the snap_id this device reads from still exists */
185 struct list_head node
;
187 /* list of snapshots */
188 struct list_head snaps
;
192 unsigned long open_count
;
195 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
197 static LIST_HEAD(rbd_dev_list
); /* devices */
198 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
200 static LIST_HEAD(rbd_client_list
); /* clients */
201 static DEFINE_SPINLOCK(rbd_client_list_lock
);
203 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
);
204 static void rbd_dev_release(struct device
*dev
);
205 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
206 struct rbd_snap
*snap
);
208 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
210 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
213 static struct bus_attribute rbd_bus_attrs
[] = {
214 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
215 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
219 static struct bus_type rbd_bus_type
= {
221 .bus_attrs
= rbd_bus_attrs
,
224 static void rbd_root_dev_release(struct device
*dev
)
228 static struct device rbd_root_dev
= {
230 .release
= rbd_root_dev_release
,
234 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
236 return get_device(&rbd_dev
->dev
);
239 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
241 put_device(&rbd_dev
->dev
);
244 static int __rbd_update_snaps(struct rbd_device
*rbd_dev
);
246 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
248 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
250 if ((mode
& FMODE_WRITE
) && rbd_dev
->read_only
)
253 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
254 rbd_get_dev(rbd_dev
);
255 set_device_ro(bdev
, rbd_dev
->read_only
);
256 rbd_dev
->open_count
++;
257 mutex_unlock(&ctl_mutex
);
262 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
264 struct rbd_device
*rbd_dev
= disk
->private_data
;
266 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
267 BUG_ON(!rbd_dev
->open_count
);
268 rbd_dev
->open_count
--;
269 rbd_put_dev(rbd_dev
);
270 mutex_unlock(&ctl_mutex
);
275 static const struct block_device_operations rbd_bd_ops
= {
276 .owner
= THIS_MODULE
,
278 .release
= rbd_release
,
282 * Initialize an rbd client instance.
285 static struct rbd_client
*rbd_client_create(struct ceph_options
*opt
,
286 struct rbd_options
*rbd_opts
)
288 struct rbd_client
*rbdc
;
291 dout("rbd_client_create\n");
292 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
296 kref_init(&rbdc
->kref
);
297 INIT_LIST_HEAD(&rbdc
->node
);
299 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
301 rbdc
->client
= ceph_create_client(opt
, rbdc
, 0, 0);
302 if (IS_ERR(rbdc
->client
))
304 opt
= NULL
; /* Now rbdc->client is responsible for opt */
306 ret
= ceph_open_session(rbdc
->client
);
310 rbdc
->rbd_opts
= rbd_opts
;
312 spin_lock(&rbd_client_list_lock
);
313 list_add_tail(&rbdc
->node
, &rbd_client_list
);
314 spin_unlock(&rbd_client_list_lock
);
316 mutex_unlock(&ctl_mutex
);
318 dout("rbd_client_create created %p\n", rbdc
);
322 ceph_destroy_client(rbdc
->client
);
324 mutex_unlock(&ctl_mutex
);
328 ceph_destroy_options(opt
);
333 * Find a ceph client with specific addr and configuration.
335 static struct rbd_client
*__rbd_client_find(struct ceph_options
*opt
)
337 struct rbd_client
*client_node
;
339 if (opt
->flags
& CEPH_OPT_NOSHARE
)
342 list_for_each_entry(client_node
, &rbd_client_list
, node
)
343 if (ceph_compare_options(opt
, client_node
->client
) == 0)
355 /* string args above */
358 /* Boolean args above */
362 static match_table_t rbdopt_tokens
= {
364 /* string args above */
365 {Opt_read_only
, "read_only"},
366 {Opt_read_only
, "ro"}, /* Alternate spelling */
367 {Opt_read_write
, "read_write"},
368 {Opt_read_write
, "rw"}, /* Alternate spelling */
369 /* Boolean args above */
373 static int parse_rbd_opts_token(char *c
, void *private)
375 struct rbd_options
*rbdopt
= private;
376 substring_t argstr
[MAX_OPT_ARGS
];
377 int token
, intval
, ret
;
379 token
= match_token(c
, rbdopt_tokens
, argstr
);
383 if (token
< Opt_last_int
) {
384 ret
= match_int(&argstr
[0], &intval
);
386 pr_err("bad mount option arg (not int) "
390 dout("got int token %d val %d\n", token
, intval
);
391 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
392 dout("got string token %d val %s\n", token
,
394 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
395 dout("got Boolean token %d\n", token
);
397 dout("got token %d\n", token
);
402 rbdopt
->read_only
= true;
405 rbdopt
->read_only
= false;
414 * Get a ceph client with specific addr and configuration, if one does
415 * not exist create it.
417 static struct rbd_client
*rbd_get_client(const char *mon_addr
,
421 struct rbd_client
*rbdc
;
422 struct ceph_options
*opt
;
423 struct rbd_options
*rbd_opts
;
425 rbd_opts
= kzalloc(sizeof(*rbd_opts
), GFP_KERNEL
);
427 return ERR_PTR(-ENOMEM
);
429 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
431 opt
= ceph_parse_options(options
, mon_addr
,
432 mon_addr
+ mon_addr_len
,
433 parse_rbd_opts_token
, rbd_opts
);
436 return ERR_CAST(opt
);
439 spin_lock(&rbd_client_list_lock
);
440 rbdc
= __rbd_client_find(opt
);
442 /* using an existing client */
443 kref_get(&rbdc
->kref
);
444 spin_unlock(&rbd_client_list_lock
);
446 ceph_destroy_options(opt
);
451 spin_unlock(&rbd_client_list_lock
);
453 rbdc
= rbd_client_create(opt
, rbd_opts
);
462 * Destroy ceph client
464 * Caller must hold rbd_client_list_lock.
466 static void rbd_client_release(struct kref
*kref
)
468 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
470 dout("rbd_release_client %p\n", rbdc
);
471 spin_lock(&rbd_client_list_lock
);
472 list_del(&rbdc
->node
);
473 spin_unlock(&rbd_client_list_lock
);
475 ceph_destroy_client(rbdc
->client
);
476 kfree(rbdc
->rbd_opts
);
481 * Drop reference to ceph client node. If it's not referenced anymore, release
484 static void rbd_put_client(struct rbd_device
*rbd_dev
)
486 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
487 rbd_dev
->rbd_client
= NULL
;
491 * Destroy requests collection
493 static void rbd_coll_release(struct kref
*kref
)
495 struct rbd_req_coll
*coll
=
496 container_of(kref
, struct rbd_req_coll
, kref
);
498 dout("rbd_coll_release %p\n", coll
);
503 * Create a new header structure, translate header format from the on-disk
506 static int rbd_header_from_disk(struct rbd_image_header
*header
,
507 struct rbd_image_header_ondisk
*ondisk
,
514 if (memcmp(ondisk
, RBD_HEADER_TEXT
, sizeof(RBD_HEADER_TEXT
)))
517 snap_count
= le32_to_cpu(ondisk
->snap_count
);
518 header
->snapc
= kmalloc(sizeof(struct ceph_snap_context
) +
519 snap_count
* sizeof(u64
),
524 header
->snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
526 header
->snap_names
= kmalloc(header
->snap_names_len
,
528 if (!header
->snap_names
)
530 header
->snap_sizes
= kmalloc(snap_count
* sizeof(u64
),
532 if (!header
->snap_sizes
)
535 header
->snap_names
= NULL
;
536 header
->snap_sizes
= NULL
;
538 memcpy(header
->block_name
, ondisk
->block_name
,
539 sizeof(ondisk
->block_name
));
541 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
542 header
->obj_order
= ondisk
->options
.order
;
543 header
->crypt_type
= ondisk
->options
.crypt_type
;
544 header
->comp_type
= ondisk
->options
.comp_type
;
546 atomic_set(&header
->snapc
->nref
, 1);
547 header
->snap_seq
= le64_to_cpu(ondisk
->snap_seq
);
548 header
->snapc
->num_snaps
= snap_count
;
549 header
->total_snaps
= snap_count
;
551 if (snap_count
&& allocated_snaps
== snap_count
) {
552 for (i
= 0; i
< snap_count
; i
++) {
553 header
->snapc
->snaps
[i
] =
554 le64_to_cpu(ondisk
->snaps
[i
].id
);
555 header
->snap_sizes
[i
] =
556 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
559 /* copy snapshot names */
560 memcpy(header
->snap_names
, &ondisk
->snaps
[i
],
561 header
->snap_names_len
);
567 kfree(header
->snap_names
);
569 kfree(header
->snapc
);
573 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
577 char *p
= header
->snap_names
;
579 for (i
= 0; i
< header
->total_snaps
; i
++) {
580 if (!strcmp(snap_name
, p
)) {
582 /* Found it. Pass back its id and/or size */
585 *seq
= header
->snapc
->snaps
[i
];
587 *size
= header
->snap_sizes
[i
];
590 p
+= strlen(p
) + 1; /* Skip ahead to the next name */
595 static int rbd_header_set_snap(struct rbd_device
*dev
, u64
*size
)
597 struct rbd_image_header
*header
= &dev
->header
;
598 struct ceph_snap_context
*snapc
= header
->snapc
;
601 BUILD_BUG_ON(sizeof (dev
->snap_name
) < sizeof (RBD_SNAP_HEAD_NAME
));
603 down_write(&dev
->header_rwsem
);
605 if (!memcmp(dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
606 sizeof (RBD_SNAP_HEAD_NAME
))) {
607 if (header
->total_snaps
)
608 snapc
->seq
= header
->snap_seq
;
611 dev
->snap_id
= CEPH_NOSNAP
;
612 dev
->snap_exists
= false;
613 dev
->read_only
= dev
->rbd_client
->rbd_opts
->read_only
;
615 *size
= header
->image_size
;
617 ret
= snap_by_name(header
, dev
->snap_name
, &snapc
->seq
, size
);
620 dev
->snap_id
= snapc
->seq
;
621 dev
->snap_exists
= true;
622 dev
->read_only
= true; /* No choice for snapshots */
627 up_write(&dev
->header_rwsem
);
631 static void rbd_header_free(struct rbd_image_header
*header
)
633 ceph_put_snap_context(header
->snapc
);
634 kfree(header
->snap_names
);
635 kfree(header
->snap_sizes
);
639 * get the actual striped segment name, offset and length
641 static u64
rbd_get_segment(struct rbd_image_header
*header
,
642 const char *block_name
,
644 char *seg_name
, u64
*segofs
)
646 u64 seg
= ofs
>> header
->obj_order
;
649 snprintf(seg_name
, RBD_MAX_SEG_NAME_LEN
,
650 "%s.%012llx", block_name
, seg
);
652 ofs
= ofs
& ((1 << header
->obj_order
) - 1);
653 len
= min_t(u64
, len
, (1 << header
->obj_order
) - ofs
);
661 static int rbd_get_num_segments(struct rbd_image_header
*header
,
664 u64 start_seg
= ofs
>> header
->obj_order
;
665 u64 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
666 return end_seg
- start_seg
+ 1;
670 * returns the size of an object in the image
672 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
674 return 1 << header
->obj_order
;
681 static void bio_chain_put(struct bio
*chain
)
687 chain
= chain
->bi_next
;
693 * zeros a bio chain, starting at specific offset
695 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
704 bio_for_each_segment(bv
, chain
, i
) {
705 if (pos
+ bv
->bv_len
> start_ofs
) {
706 int remainder
= max(start_ofs
- pos
, 0);
707 buf
= bvec_kmap_irq(bv
, &flags
);
708 memset(buf
+ remainder
, 0,
709 bv
->bv_len
- remainder
);
710 bvec_kunmap_irq(buf
, &flags
);
715 chain
= chain
->bi_next
;
720 * bio_chain_clone - clone a chain of bios up to a certain length.
721 * might return a bio_pair that will need to be released.
723 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
724 struct bio_pair
**bp
,
725 int len
, gfp_t gfpmask
)
727 struct bio
*tmp
, *old_chain
= *old
, *new_chain
= NULL
, *tail
= NULL
;
731 bio_pair_release(*bp
);
735 while (old_chain
&& (total
< len
)) {
736 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
740 if (total
+ old_chain
->bi_size
> len
) {
744 * this split can only happen with a single paged bio,
745 * split_bio will BUG_ON if this is not the case
747 dout("bio_chain_clone split! total=%d remaining=%d"
749 (int)total
, (int)len
-total
,
750 (int)old_chain
->bi_size
);
752 /* split the bio. We'll release it either in the next
753 call, or it will have to be released outside */
754 bp
= bio_split(old_chain
, (len
- total
) / SECTOR_SIZE
);
758 __bio_clone(tmp
, &bp
->bio1
);
762 __bio_clone(tmp
, old_chain
);
763 *next
= old_chain
->bi_next
;
767 gfpmask
&= ~__GFP_WAIT
;
771 new_chain
= tail
= tmp
;
776 old_chain
= old_chain
->bi_next
;
778 total
+= tmp
->bi_size
;
784 tail
->bi_next
= NULL
;
791 dout("bio_chain_clone with err\n");
792 bio_chain_put(new_chain
);
797 * helpers for osd request op vectors.
799 static int rbd_create_rw_ops(struct ceph_osd_req_op
**ops
,
804 *ops
= kzalloc(sizeof(struct ceph_osd_req_op
) * (num_ops
+ 1),
808 (*ops
)[0].op
= opcode
;
810 * op extent offset and length will be set later on
811 * in calc_raw_layout()
813 (*ops
)[0].payload_len
= payload_len
;
817 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
822 static void rbd_coll_end_req_index(struct request
*rq
,
823 struct rbd_req_coll
*coll
,
827 struct request_queue
*q
;
830 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
831 coll
, index
, ret
, len
);
837 blk_end_request(rq
, ret
, len
);
843 spin_lock_irq(q
->queue_lock
);
844 coll
->status
[index
].done
= 1;
845 coll
->status
[index
].rc
= ret
;
846 coll
->status
[index
].bytes
= len
;
847 max
= min
= coll
->num_done
;
848 while (max
< coll
->total
&& coll
->status
[max
].done
)
851 for (i
= min
; i
<max
; i
++) {
852 __blk_end_request(rq
, coll
->status
[i
].rc
,
853 coll
->status
[i
].bytes
);
855 kref_put(&coll
->kref
, rbd_coll_release
);
857 spin_unlock_irq(q
->queue_lock
);
860 static void rbd_coll_end_req(struct rbd_request
*req
,
863 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
867 * Send ceph osd request
869 static int rbd_do_request(struct request
*rq
,
870 struct rbd_device
*dev
,
871 struct ceph_snap_context
*snapc
,
873 const char *obj
, u64 ofs
, u64 len
,
878 struct ceph_osd_req_op
*ops
,
880 struct rbd_req_coll
*coll
,
882 void (*rbd_cb
)(struct ceph_osd_request
*req
,
883 struct ceph_msg
*msg
),
884 struct ceph_osd_request
**linger_req
,
887 struct ceph_osd_request
*req
;
888 struct ceph_file_layout
*layout
;
891 struct timespec mtime
= CURRENT_TIME
;
892 struct rbd_request
*req_data
;
893 struct ceph_osd_request_head
*reqhead
;
894 struct ceph_osd_client
*osdc
;
896 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
899 rbd_coll_end_req_index(rq
, coll
, coll_index
,
905 req_data
->coll
= coll
;
906 req_data
->coll_index
= coll_index
;
909 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj
, len
, ofs
);
911 osdc
= &dev
->rbd_client
->client
->osdc
;
912 req
= ceph_osdc_alloc_request(osdc
, flags
, snapc
, ops
,
913 false, GFP_NOIO
, pages
, bio
);
919 req
->r_callback
= rbd_cb
;
923 req_data
->pages
= pages
;
926 req
->r_priv
= req_data
;
928 reqhead
= req
->r_request
->front
.iov_base
;
929 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
931 strncpy(req
->r_oid
, obj
, sizeof(req
->r_oid
));
932 req
->r_oid_len
= strlen(req
->r_oid
);
934 layout
= &req
->r_file_layout
;
935 memset(layout
, 0, sizeof(*layout
));
936 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
937 layout
->fl_stripe_count
= cpu_to_le32(1);
938 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
939 layout
->fl_pg_preferred
= cpu_to_le32(-1);
940 layout
->fl_pg_pool
= cpu_to_le32(dev
->poolid
);
941 ret
= ceph_calc_raw_layout(osdc
, layout
, snapid
, ofs
, &len
, &bno
,
945 ceph_osdc_build_request(req
, ofs
, &len
,
949 req
->r_oid
, req
->r_oid_len
);
952 ceph_osdc_set_request_linger(osdc
, req
);
956 ret
= ceph_osdc_start_request(osdc
, req
, false);
961 ret
= ceph_osdc_wait_request(osdc
, req
);
963 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
964 dout("reassert_ver=%lld\n",
965 le64_to_cpu(req
->r_reassert_version
.version
));
966 ceph_osdc_put_request(req
);
971 bio_chain_put(req_data
->bio
);
972 ceph_osdc_put_request(req
);
974 rbd_coll_end_req(req_data
, ret
, len
);
980 * Ceph osd op callback
982 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
984 struct rbd_request
*req_data
= req
->r_priv
;
985 struct ceph_osd_reply_head
*replyhead
;
986 struct ceph_osd_op
*op
;
992 replyhead
= msg
->front
.iov_base
;
993 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
994 op
= (void *)(replyhead
+ 1);
995 rc
= le32_to_cpu(replyhead
->result
);
996 bytes
= le64_to_cpu(op
->extent
.length
);
997 read_op
= (le32_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
999 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes
, read_op
, rc
);
1001 if (rc
== -ENOENT
&& read_op
) {
1002 zero_bio_chain(req_data
->bio
, 0);
1004 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
1005 zero_bio_chain(req_data
->bio
, bytes
);
1006 bytes
= req_data
->len
;
1009 rbd_coll_end_req(req_data
, rc
, bytes
);
1012 bio_chain_put(req_data
->bio
);
1014 ceph_osdc_put_request(req
);
1018 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1020 ceph_osdc_put_request(req
);
1024 * Do a synchronous ceph osd operation
1026 static int rbd_req_sync_op(struct rbd_device
*dev
,
1027 struct ceph_snap_context
*snapc
,
1031 struct ceph_osd_req_op
*orig_ops
,
1036 struct ceph_osd_request
**linger_req
,
1040 struct page
**pages
;
1042 struct ceph_osd_req_op
*ops
= orig_ops
;
1045 num_pages
= calc_pages_for(ofs
, len
);
1046 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1048 return PTR_ERR(pages
);
1051 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? len
: 0);
1052 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
1056 if ((flags
& CEPH_OSD_FLAG_WRITE
) && buf
) {
1057 ret
= ceph_copy_to_page_vector(pages
, buf
, ofs
, len
);
1063 ret
= rbd_do_request(NULL
, dev
, snapc
, snapid
,
1064 obj
, ofs
, len
, NULL
,
1075 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
1076 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
1080 rbd_destroy_ops(ops
);
1082 ceph_release_page_vector(pages
, num_pages
);
1087 * Do an asynchronous ceph osd operation
1089 static int rbd_do_op(struct request
*rq
,
1090 struct rbd_device
*rbd_dev
,
1091 struct ceph_snap_context
*snapc
,
1093 int opcode
, int flags
, int num_reply
,
1096 struct rbd_req_coll
*coll
,
1103 struct ceph_osd_req_op
*ops
;
1106 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
1110 seg_len
= rbd_get_segment(&rbd_dev
->header
,
1111 rbd_dev
->header
.block_name
,
1113 seg_name
, &seg_ofs
);
1115 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
1117 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
1121 /* we've taken care of segment sizes earlier when we
1122 cloned the bios. We should never have a segment
1123 truncated at this point */
1124 BUG_ON(seg_len
< len
);
1126 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1127 seg_name
, seg_ofs
, seg_len
,
1134 rbd_req_cb
, 0, NULL
);
1136 rbd_destroy_ops(ops
);
1143 * Request async osd write
1145 static int rbd_req_write(struct request
*rq
,
1146 struct rbd_device
*rbd_dev
,
1147 struct ceph_snap_context
*snapc
,
1150 struct rbd_req_coll
*coll
,
1153 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1155 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1157 ofs
, len
, bio
, coll
, coll_index
);
1161 * Request async osd read
1163 static int rbd_req_read(struct request
*rq
,
1164 struct rbd_device
*rbd_dev
,
1168 struct rbd_req_coll
*coll
,
1171 return rbd_do_op(rq
, rbd_dev
, NULL
,
1172 (snapid
? snapid
: CEPH_NOSNAP
),
1176 ofs
, len
, bio
, coll
, coll_index
);
1180 * Request sync osd read
1182 static int rbd_req_sync_read(struct rbd_device
*dev
,
1183 struct ceph_snap_context
*snapc
,
1190 return rbd_req_sync_op(dev
, NULL
,
1191 (snapid
? snapid
: CEPH_NOSNAP
),
1195 1, obj
, ofs
, len
, buf
, NULL
, ver
);
1199 * Request sync osd watch
1201 static int rbd_req_sync_notify_ack(struct rbd_device
*dev
,
1206 struct ceph_osd_req_op
*ops
;
1207 struct page
**pages
= NULL
;
1210 ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1214 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1215 ops
[0].watch
.cookie
= notify_id
;
1216 ops
[0].watch
.flag
= 0;
1218 ret
= rbd_do_request(NULL
, dev
, NULL
, CEPH_NOSNAP
,
1225 rbd_simple_req_cb
, 0, NULL
);
1227 rbd_destroy_ops(ops
);
1231 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1233 struct rbd_device
*dev
= (struct rbd_device
*)data
;
1240 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev
->obj_md_name
,
1241 notify_id
, (int)opcode
);
1242 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1243 rc
= __rbd_update_snaps(dev
);
1244 hver
= dev
->header
.obj_version
;
1245 mutex_unlock(&ctl_mutex
);
1247 pr_warning(RBD_DRV_NAME
"%d got notification but failed to "
1248 " update snaps: %d\n", dev
->major
, rc
);
1250 rbd_req_sync_notify_ack(dev
, hver
, notify_id
, dev
->obj_md_name
);
1254 * Request sync osd watch
1256 static int rbd_req_sync_watch(struct rbd_device
*dev
,
1260 struct ceph_osd_req_op
*ops
;
1261 struct ceph_osd_client
*osdc
= &dev
->rbd_client
->client
->osdc
;
1263 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_WATCH
, 0);
1267 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1268 (void *)dev
, &dev
->watch_event
);
1272 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1273 ops
[0].watch
.cookie
= cpu_to_le64(dev
->watch_event
->cookie
);
1274 ops
[0].watch
.flag
= 1;
1276 ret
= rbd_req_sync_op(dev
, NULL
,
1279 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1282 &dev
->watch_request
, NULL
);
1287 rbd_destroy_ops(ops
);
1291 ceph_osdc_cancel_event(dev
->watch_event
);
1292 dev
->watch_event
= NULL
;
1294 rbd_destroy_ops(ops
);
1299 * Request sync osd unwatch
1301 static int rbd_req_sync_unwatch(struct rbd_device
*dev
,
1304 struct ceph_osd_req_op
*ops
;
1306 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_WATCH
, 0);
1310 ops
[0].watch
.ver
= 0;
1311 ops
[0].watch
.cookie
= cpu_to_le64(dev
->watch_event
->cookie
);
1312 ops
[0].watch
.flag
= 0;
1314 ret
= rbd_req_sync_op(dev
, NULL
,
1317 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1319 1, obj
, 0, 0, NULL
, NULL
, NULL
);
1321 rbd_destroy_ops(ops
);
1322 ceph_osdc_cancel_event(dev
->watch_event
);
1323 dev
->watch_event
= NULL
;
1329 * Request sync osd read
1331 static int rbd_req_sync_exec(struct rbd_device
*dev
,
1339 struct ceph_osd_req_op
*ops
;
1340 int cls_len
= strlen(cls
);
1341 int method_len
= strlen(method
);
1342 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_CALL
,
1343 cls_len
+ method_len
+ len
);
1347 ops
[0].cls
.class_name
= cls
;
1348 ops
[0].cls
.class_len
= (__u8
)cls_len
;
1349 ops
[0].cls
.method_name
= method
;
1350 ops
[0].cls
.method_len
= (__u8
)method_len
;
1351 ops
[0].cls
.argc
= 0;
1352 ops
[0].cls
.indata
= data
;
1353 ops
[0].cls
.indata_len
= len
;
1355 ret
= rbd_req_sync_op(dev
, NULL
,
1358 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1360 1, obj
, 0, 0, NULL
, NULL
, ver
);
1362 rbd_destroy_ops(ops
);
1364 dout("cls_exec returned %d\n", ret
);
1369 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1371 struct rbd_req_coll
*coll
=
1372 kzalloc(sizeof(struct rbd_req_coll
) +
1373 sizeof(struct rbd_req_status
) * num_reqs
,
1378 coll
->total
= num_reqs
;
1379 kref_init(&coll
->kref
);
1384 * block device queue callback
1386 static void rbd_rq_fn(struct request_queue
*q
)
1388 struct rbd_device
*rbd_dev
= q
->queuedata
;
1390 struct bio_pair
*bp
= NULL
;
1392 while ((rq
= blk_fetch_request(q
))) {
1394 struct bio
*rq_bio
, *next_bio
= NULL
;
1396 int size
, op_size
= 0;
1398 int num_segs
, cur_seg
= 0;
1399 struct rbd_req_coll
*coll
;
1400 struct ceph_snap_context
*snapc
;
1402 /* peek at request from block layer */
1406 dout("fetched request\n");
1408 /* filter out block requests we don't understand */
1409 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1410 __blk_end_request_all(rq
, 0);
1414 /* deduce our operation (read, write) */
1415 do_write
= (rq_data_dir(rq
) == WRITE
);
1417 size
= blk_rq_bytes(rq
);
1418 ofs
= blk_rq_pos(rq
) * SECTOR_SIZE
;
1420 if (do_write
&& rbd_dev
->read_only
) {
1421 __blk_end_request_all(rq
, -EROFS
);
1425 spin_unlock_irq(q
->queue_lock
);
1427 down_read(&rbd_dev
->header_rwsem
);
1429 if (rbd_dev
->snap_id
!= CEPH_NOSNAP
&& !rbd_dev
->snap_exists
) {
1430 up_read(&rbd_dev
->header_rwsem
);
1431 dout("request for non-existent snapshot");
1432 spin_lock_irq(q
->queue_lock
);
1433 __blk_end_request_all(rq
, -ENXIO
);
1437 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1439 up_read(&rbd_dev
->header_rwsem
);
1441 dout("%s 0x%x bytes at 0x%llx\n",
1442 do_write
? "write" : "read",
1443 size
, blk_rq_pos(rq
) * SECTOR_SIZE
);
1445 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1446 coll
= rbd_alloc_coll(num_segs
);
1448 spin_lock_irq(q
->queue_lock
);
1449 __blk_end_request_all(rq
, -ENOMEM
);
1450 ceph_put_snap_context(snapc
);
1455 /* a bio clone to be passed down to OSD req */
1456 dout("rq->bio->bi_vcnt=%d\n", rq
->bio
->bi_vcnt
);
1457 op_size
= rbd_get_segment(&rbd_dev
->header
,
1458 rbd_dev
->header
.block_name
,
1461 kref_get(&coll
->kref
);
1462 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1463 op_size
, GFP_ATOMIC
);
1465 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1471 /* init OSD command: write or read */
1473 rbd_req_write(rq
, rbd_dev
,
1479 rbd_req_read(rq
, rbd_dev
,
1492 kref_put(&coll
->kref
, rbd_coll_release
);
1495 bio_pair_release(bp
);
1496 spin_lock_irq(q
->queue_lock
);
1498 ceph_put_snap_context(snapc
);
1503 * a queue callback. Makes sure that we don't create a bio that spans across
1504 * multiple osd objects. One exception would be with a single page bios,
1505 * which we handle later at bio_chain_clone
1507 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1508 struct bio_vec
*bvec
)
1510 struct rbd_device
*rbd_dev
= q
->queuedata
;
1511 unsigned int chunk_sectors
;
1513 unsigned int bio_sectors
;
1516 chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1517 sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1518 bio_sectors
= bmd
->bi_size
>> SECTOR_SHIFT
;
1520 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1521 + bio_sectors
)) << SECTOR_SHIFT
;
1523 max
= 0; /* bio_add cannot handle a negative return */
1524 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1525 return bvec
->bv_len
;
1529 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1531 struct gendisk
*disk
= rbd_dev
->disk
;
1536 rbd_header_free(&rbd_dev
->header
);
1538 if (disk
->flags
& GENHD_FL_UP
)
1541 blk_cleanup_queue(disk
->queue
);
1546 * reload the ondisk the header
1548 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1549 struct rbd_image_header
*header
)
1552 struct rbd_image_header_ondisk
*dh
;
1558 * First reads the fixed-size header to determine the number
1559 * of snapshots, then re-reads it, along with all snapshot
1560 * records as well as their stored names.
1564 dh
= kmalloc(len
, GFP_KERNEL
);
1568 rc
= rbd_req_sync_read(rbd_dev
,
1570 rbd_dev
->obj_md_name
,
1576 rc
= rbd_header_from_disk(header
, dh
, snap_count
, GFP_KERNEL
);
1579 pr_warning("unrecognized header format"
1580 " for image %s", rbd_dev
->obj
);
1584 if (snap_count
== header
->total_snaps
)
1587 snap_count
= header
->total_snaps
;
1588 len
= sizeof (*dh
) +
1589 snap_count
* sizeof(struct rbd_image_snap_ondisk
) +
1590 header
->snap_names_len
;
1592 rbd_header_free(header
);
1595 header
->obj_version
= ver
;
1602 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1604 struct rbd_snap
*snap
;
1606 while (!list_empty(&rbd_dev
->snaps
)) {
1607 snap
= list_first_entry(&rbd_dev
->snaps
, struct rbd_snap
, node
);
1608 __rbd_remove_snap_dev(rbd_dev
, snap
);
1613 * only read the first part of the ondisk header, without the snaps info
1615 static int __rbd_update_snaps(struct rbd_device
*rbd_dev
)
1618 struct rbd_image_header h
;
1622 ret
= rbd_read_header(rbd_dev
, &h
);
1626 down_write(&rbd_dev
->header_rwsem
);
1629 if (rbd_dev
->snap_id
== CEPH_NOSNAP
) {
1630 sector_t size
= (sector_t
) h
.image_size
/ SECTOR_SIZE
;
1632 dout("setting size to %llu sectors", (unsigned long long) size
);
1633 set_capacity(rbd_dev
->disk
, size
);
1636 snap_seq
= rbd_dev
->header
.snapc
->seq
;
1637 if (rbd_dev
->header
.total_snaps
&&
1638 rbd_dev
->header
.snapc
->snaps
[0] == snap_seq
)
1639 /* pointing at the head, will need to follow that
1643 ceph_put_snap_context(rbd_dev
->header
.snapc
);
1644 kfree(rbd_dev
->header
.snap_names
);
1645 kfree(rbd_dev
->header
.snap_sizes
);
1647 rbd_dev
->header
.obj_version
= h
.obj_version
;
1648 rbd_dev
->header
.image_size
= h
.image_size
;
1649 rbd_dev
->header
.total_snaps
= h
.total_snaps
;
1650 rbd_dev
->header
.snapc
= h
.snapc
;
1651 rbd_dev
->header
.snap_names
= h
.snap_names
;
1652 rbd_dev
->header
.snap_names_len
= h
.snap_names_len
;
1653 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1655 rbd_dev
->header
.snapc
->seq
= rbd_dev
->header
.snapc
->snaps
[0];
1657 rbd_dev
->header
.snapc
->seq
= snap_seq
;
1659 ret
= __rbd_init_snaps_header(rbd_dev
);
1661 up_write(&rbd_dev
->header_rwsem
);
1666 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1668 struct gendisk
*disk
;
1669 struct request_queue
*q
;
1674 /* contact OSD, request size info about the object being mapped */
1675 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1679 /* no need to lock here, as rbd_dev is not registered yet */
1680 rc
= __rbd_init_snaps_header(rbd_dev
);
1684 rc
= rbd_header_set_snap(rbd_dev
, &total_size
);
1688 /* create gendisk info */
1690 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1694 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
1696 disk
->major
= rbd_dev
->major
;
1697 disk
->first_minor
= 0;
1698 disk
->fops
= &rbd_bd_ops
;
1699 disk
->private_data
= rbd_dev
;
1703 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1707 /* We use the default size, but let's be explicit about it. */
1708 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
1710 /* set io sizes to object size */
1711 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
1712 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
1713 blk_queue_max_segment_size(q
, segment_size
);
1714 blk_queue_io_min(q
, segment_size
);
1715 blk_queue_io_opt(q
, segment_size
);
1717 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1720 q
->queuedata
= rbd_dev
;
1722 rbd_dev
->disk
= disk
;
1725 /* finally, announce the disk to the world */
1726 set_capacity(disk
, total_size
/ SECTOR_SIZE
);
1729 pr_info("%s: added with size 0x%llx\n",
1730 disk
->disk_name
, (unsigned long long)total_size
);
1743 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
1745 return container_of(dev
, struct rbd_device
, dev
);
1748 static ssize_t
rbd_size_show(struct device
*dev
,
1749 struct device_attribute
*attr
, char *buf
)
1751 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1754 down_read(&rbd_dev
->header_rwsem
);
1755 size
= get_capacity(rbd_dev
->disk
);
1756 up_read(&rbd_dev
->header_rwsem
);
1758 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
1761 static ssize_t
rbd_major_show(struct device
*dev
,
1762 struct device_attribute
*attr
, char *buf
)
1764 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1766 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1769 static ssize_t
rbd_client_id_show(struct device
*dev
,
1770 struct device_attribute
*attr
, char *buf
)
1772 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1774 return sprintf(buf
, "client%lld\n",
1775 ceph_client_id(rbd_dev
->rbd_client
->client
));
1778 static ssize_t
rbd_pool_show(struct device
*dev
,
1779 struct device_attribute
*attr
, char *buf
)
1781 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1783 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1786 static ssize_t
rbd_name_show(struct device
*dev
,
1787 struct device_attribute
*attr
, char *buf
)
1789 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1791 return sprintf(buf
, "%s\n", rbd_dev
->obj
);
1794 static ssize_t
rbd_snap_show(struct device
*dev
,
1795 struct device_attribute
*attr
,
1798 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1800 return sprintf(buf
, "%s\n", rbd_dev
->snap_name
);
1803 static ssize_t
rbd_image_refresh(struct device
*dev
,
1804 struct device_attribute
*attr
,
1808 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1812 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1814 rc
= __rbd_update_snaps(rbd_dev
);
1818 mutex_unlock(&ctl_mutex
);
1822 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
1823 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
1824 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
1825 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
1826 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
1827 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
1828 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
1830 static struct attribute
*rbd_attrs
[] = {
1831 &dev_attr_size
.attr
,
1832 &dev_attr_major
.attr
,
1833 &dev_attr_client_id
.attr
,
1834 &dev_attr_pool
.attr
,
1835 &dev_attr_name
.attr
,
1836 &dev_attr_current_snap
.attr
,
1837 &dev_attr_refresh
.attr
,
1841 static struct attribute_group rbd_attr_group
= {
1845 static const struct attribute_group
*rbd_attr_groups
[] = {
1850 static void rbd_sysfs_dev_release(struct device
*dev
)
1854 static struct device_type rbd_device_type
= {
1856 .groups
= rbd_attr_groups
,
1857 .release
= rbd_sysfs_dev_release
,
1865 static ssize_t
rbd_snap_size_show(struct device
*dev
,
1866 struct device_attribute
*attr
,
1869 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1871 return sprintf(buf
, "%zd\n", snap
->size
);
1874 static ssize_t
rbd_snap_id_show(struct device
*dev
,
1875 struct device_attribute
*attr
,
1878 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1880 return sprintf(buf
, "%llu\n", (unsigned long long) snap
->id
);
1883 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
1884 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
1886 static struct attribute
*rbd_snap_attrs
[] = {
1887 &dev_attr_snap_size
.attr
,
1888 &dev_attr_snap_id
.attr
,
1892 static struct attribute_group rbd_snap_attr_group
= {
1893 .attrs
= rbd_snap_attrs
,
1896 static void rbd_snap_dev_release(struct device
*dev
)
1898 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1903 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
1904 &rbd_snap_attr_group
,
1908 static struct device_type rbd_snap_device_type
= {
1909 .groups
= rbd_snap_attr_groups
,
1910 .release
= rbd_snap_dev_release
,
1913 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
1914 struct rbd_snap
*snap
)
1916 list_del(&snap
->node
);
1917 device_unregister(&snap
->dev
);
1920 static int rbd_register_snap_dev(struct rbd_device
*rbd_dev
,
1921 struct rbd_snap
*snap
,
1922 struct device
*parent
)
1924 struct device
*dev
= &snap
->dev
;
1927 dev
->type
= &rbd_snap_device_type
;
1928 dev
->parent
= parent
;
1929 dev
->release
= rbd_snap_dev_release
;
1930 dev_set_name(dev
, "snap_%s", snap
->name
);
1931 ret
= device_register(dev
);
1936 static int __rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
1937 int i
, const char *name
,
1938 struct rbd_snap
**snapp
)
1941 struct rbd_snap
*snap
= kzalloc(sizeof(*snap
), GFP_KERNEL
);
1944 snap
->name
= kstrdup(name
, GFP_KERNEL
);
1945 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
1946 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
1947 if (device_is_registered(&rbd_dev
->dev
)) {
1948 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
1962 * search for the previous snap in a null delimited string list
1964 const char *rbd_prev_snap_name(const char *name
, const char *start
)
1966 if (name
< start
+ 2)
1979 * compare the old list of snapshots that we have to what's in the header
1980 * and update it accordingly. Note that the header holds the snapshots
1981 * in a reverse order (from newest to oldest) and we need to go from
1982 * older to new so that we don't get a duplicate snap name when
1983 * doing the process (e.g., removed snapshot and recreated a new
1984 * one with the same name.
1986 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
)
1988 const char *name
, *first_name
;
1989 int i
= rbd_dev
->header
.total_snaps
;
1990 struct rbd_snap
*snap
, *old_snap
= NULL
;
1992 struct list_head
*p
, *n
;
1994 first_name
= rbd_dev
->header
.snap_names
;
1995 name
= first_name
+ rbd_dev
->header
.snap_names_len
;
1997 list_for_each_prev_safe(p
, n
, &rbd_dev
->snaps
) {
2000 old_snap
= list_entry(p
, struct rbd_snap
, node
);
2003 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
- 1];
2005 if (!i
|| old_snap
->id
< cur_id
) {
2007 * old_snap->id was skipped, thus was
2008 * removed. If this rbd_dev is mapped to
2009 * the removed snapshot, record that it no
2010 * longer exists, to prevent further I/O.
2012 if (rbd_dev
->snap_id
== old_snap
->id
)
2013 rbd_dev
->snap_exists
= false;
2014 __rbd_remove_snap_dev(rbd_dev
, old_snap
);
2017 if (old_snap
->id
== cur_id
) {
2018 /* we have this snapshot already */
2020 name
= rbd_prev_snap_name(name
, first_name
);
2024 i
--, name
= rbd_prev_snap_name(name
, first_name
)) {
2029 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
];
2030 /* snapshot removal? handle it above */
2031 if (cur_id
>= old_snap
->id
)
2033 /* a new snapshot */
2034 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
2038 /* note that we add it backward so using n and not p */
2039 list_add(&snap
->node
, n
);
2043 /* we're done going over the old snap list, just add what's left */
2044 for (; i
> 0; i
--) {
2045 name
= rbd_prev_snap_name(name
, first_name
);
2050 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
2053 list_add(&snap
->node
, &rbd_dev
->snaps
);
2059 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2063 struct rbd_snap
*snap
;
2065 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2066 dev
= &rbd_dev
->dev
;
2068 dev
->bus
= &rbd_bus_type
;
2069 dev
->type
= &rbd_device_type
;
2070 dev
->parent
= &rbd_root_dev
;
2071 dev
->release
= rbd_dev_release
;
2072 dev_set_name(dev
, "%d", rbd_dev
->id
);
2073 ret
= device_register(dev
);
2077 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2078 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
2084 mutex_unlock(&ctl_mutex
);
2088 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2090 device_unregister(&rbd_dev
->dev
);
2093 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
2098 ret
= rbd_req_sync_watch(rbd_dev
, rbd_dev
->obj_md_name
,
2099 rbd_dev
->header
.obj_version
);
2100 if (ret
== -ERANGE
) {
2101 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2102 rc
= __rbd_update_snaps(rbd_dev
);
2103 mutex_unlock(&ctl_mutex
);
2107 } while (ret
== -ERANGE
);
2112 static atomic64_t rbd_id_max
= ATOMIC64_INIT(0);
2115 * Get a unique rbd identifier for the given new rbd_dev, and add
2116 * the rbd_dev to the global list. The minimum rbd id is 1.
2118 static void rbd_id_get(struct rbd_device
*rbd_dev
)
2120 rbd_dev
->id
= atomic64_inc_return(&rbd_id_max
);
2122 spin_lock(&rbd_dev_list_lock
);
2123 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2124 spin_unlock(&rbd_dev_list_lock
);
2128 * Remove an rbd_dev from the global list, and record that its
2129 * identifier is no longer in use.
2131 static void rbd_id_put(struct rbd_device
*rbd_dev
)
2133 struct list_head
*tmp
;
2134 int rbd_id
= rbd_dev
->id
;
2139 spin_lock(&rbd_dev_list_lock
);
2140 list_del_init(&rbd_dev
->node
);
2143 * If the id being "put" is not the current maximum, there
2144 * is nothing special we need to do.
2146 if (rbd_id
!= atomic64_read(&rbd_id_max
)) {
2147 spin_unlock(&rbd_dev_list_lock
);
2152 * We need to update the current maximum id. Search the
2153 * list to find out what it is. We're more likely to find
2154 * the maximum at the end, so search the list backward.
2157 list_for_each_prev(tmp
, &rbd_dev_list
) {
2158 struct rbd_device
*rbd_dev
;
2160 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2161 if (rbd_dev
->id
> max_id
)
2162 max_id
= rbd_dev
->id
;
2164 spin_unlock(&rbd_dev_list_lock
);
2167 * The max id could have been updated by rbd_id_get(), in
2168 * which case it now accurately reflects the new maximum.
2169 * Be careful not to overwrite the maximum value in that
2172 atomic64_cmpxchg(&rbd_id_max
, rbd_id
, max_id
);
2176 * Skips over white space at *buf, and updates *buf to point to the
2177 * first found non-space character (if any). Returns the length of
2178 * the token (string of non-white space characters) found. Note
2179 * that *buf must be terminated with '\0'.
2181 static inline size_t next_token(const char **buf
)
2184 * These are the characters that produce nonzero for
2185 * isspace() in the "C" and "POSIX" locales.
2187 const char *spaces
= " \f\n\r\t\v";
2189 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
2191 return strcspn(*buf
, spaces
); /* Return token length */
2195 * Finds the next token in *buf, and if the provided token buffer is
2196 * big enough, copies the found token into it. The result, if
2197 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2198 * must be terminated with '\0' on entry.
2200 * Returns the length of the token found (not including the '\0').
2201 * Return value will be 0 if no token is found, and it will be >=
2202 * token_size if the token would not fit.
2204 * The *buf pointer will be updated to point beyond the end of the
2205 * found token. Note that this occurs even if the token buffer is
2206 * too small to hold it.
2208 static inline size_t copy_token(const char **buf
,
2214 len
= next_token(buf
);
2215 if (len
< token_size
) {
2216 memcpy(token
, *buf
, len
);
2217 *(token
+ len
) = '\0';
2225 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2226 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2227 * on the list of monitor addresses and other options provided via
2230 static int rbd_add_parse_args(struct rbd_device
*rbd_dev
,
2232 const char **mon_addrs
,
2233 size_t *mon_addrs_size
,
2235 size_t options_size
)
2239 /* The first four tokens are required */
2241 len
= next_token(&buf
);
2244 *mon_addrs_size
= len
+ 1;
2249 len
= copy_token(&buf
, options
, options_size
);
2250 if (!len
|| len
>= options_size
)
2253 len
= copy_token(&buf
, rbd_dev
->pool_name
, sizeof (rbd_dev
->pool_name
));
2254 if (!len
|| len
>= sizeof (rbd_dev
->pool_name
))
2257 len
= copy_token(&buf
, rbd_dev
->obj
, sizeof (rbd_dev
->obj
));
2258 if (!len
|| len
>= sizeof (rbd_dev
->obj
))
2261 /* We have the object length in hand, save it. */
2263 rbd_dev
->obj_len
= len
;
2265 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2266 < RBD_MAX_OBJ_NAME_LEN
+ sizeof (RBD_SUFFIX
));
2267 sprintf(rbd_dev
->obj_md_name
, "%s%s", rbd_dev
->obj
, RBD_SUFFIX
);
2270 * The snapshot name is optional, but it's an error if it's
2271 * too long. If no snapshot is supplied, fill in the default.
2273 len
= copy_token(&buf
, rbd_dev
->snap_name
, sizeof (rbd_dev
->snap_name
));
2275 memcpy(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
2276 sizeof (RBD_SNAP_HEAD_NAME
));
2277 else if (len
>= sizeof (rbd_dev
->snap_name
))
2283 static ssize_t
rbd_add(struct bus_type
*bus
,
2287 struct rbd_device
*rbd_dev
;
2288 const char *mon_addrs
= NULL
;
2289 size_t mon_addrs_size
= 0;
2290 char *options
= NULL
;
2291 struct ceph_osd_client
*osdc
;
2294 if (!try_module_get(THIS_MODULE
))
2297 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2300 options
= kmalloc(count
, GFP_KERNEL
);
2304 /* static rbd_device initialization */
2305 spin_lock_init(&rbd_dev
->lock
);
2306 INIT_LIST_HEAD(&rbd_dev
->node
);
2307 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2308 init_rwsem(&rbd_dev
->header_rwsem
);
2310 init_rwsem(&rbd_dev
->header_rwsem
);
2312 /* generate unique id: find highest unique id, add one */
2313 rbd_id_get(rbd_dev
);
2315 /* Fill in the device name, now that we have its id. */
2316 BUILD_BUG_ON(DEV_NAME_LEN
2317 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
2318 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->id
);
2320 /* parse add command */
2321 rc
= rbd_add_parse_args(rbd_dev
, buf
, &mon_addrs
, &mon_addrs_size
,
2326 rbd_dev
->rbd_client
= rbd_get_client(mon_addrs
, mon_addrs_size
- 1,
2328 if (IS_ERR(rbd_dev
->rbd_client
)) {
2329 rc
= PTR_ERR(rbd_dev
->rbd_client
);
2334 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2335 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2337 goto err_out_client
;
2338 rbd_dev
->poolid
= rc
;
2340 /* register our block device */
2341 rc
= register_blkdev(0, rbd_dev
->name
);
2343 goto err_out_client
;
2344 rbd_dev
->major
= rc
;
2346 rc
= rbd_bus_add_dev(rbd_dev
);
2348 goto err_out_blkdev
;
2351 * At this point cleanup in the event of an error is the job
2352 * of the sysfs code (initiated by rbd_bus_del_dev()).
2354 * Set up and announce blkdev mapping.
2356 rc
= rbd_init_disk(rbd_dev
);
2360 rc
= rbd_init_watch_dev(rbd_dev
);
2367 /* this will also clean up rest of rbd_dev stuff */
2369 rbd_bus_del_dev(rbd_dev
);
2374 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2376 rbd_put_client(rbd_dev
);
2378 rbd_id_put(rbd_dev
);
2383 dout("Error adding device %s\n", buf
);
2384 module_put(THIS_MODULE
);
2386 return (ssize_t
) rc
;
2389 static struct rbd_device
*__rbd_get_dev(unsigned long id
)
2391 struct list_head
*tmp
;
2392 struct rbd_device
*rbd_dev
;
2394 spin_lock(&rbd_dev_list_lock
);
2395 list_for_each(tmp
, &rbd_dev_list
) {
2396 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2397 if (rbd_dev
->id
== id
) {
2398 spin_unlock(&rbd_dev_list_lock
);
2402 spin_unlock(&rbd_dev_list_lock
);
2406 static void rbd_dev_release(struct device
*dev
)
2408 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2410 if (rbd_dev
->watch_request
) {
2411 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
2413 ceph_osdc_unregister_linger_request(&client
->osdc
,
2414 rbd_dev
->watch_request
);
2416 if (rbd_dev
->watch_event
)
2417 rbd_req_sync_unwatch(rbd_dev
, rbd_dev
->obj_md_name
);
2419 rbd_put_client(rbd_dev
);
2421 /* clean up and free blkdev */
2422 rbd_free_disk(rbd_dev
);
2423 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2425 /* done with the id, and with the rbd_dev */
2426 rbd_id_put(rbd_dev
);
2429 /* release module ref */
2430 module_put(THIS_MODULE
);
2433 static ssize_t
rbd_remove(struct bus_type
*bus
,
2437 struct rbd_device
*rbd_dev
= NULL
;
2442 rc
= strict_strtoul(buf
, 10, &ul
);
2446 /* convert to int; abort if we lost anything in the conversion */
2447 target_id
= (int) ul
;
2448 if (target_id
!= ul
)
2451 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2453 rbd_dev
= __rbd_get_dev(target_id
);
2459 if (rbd_dev
->open_count
) {
2464 __rbd_remove_all_snaps(rbd_dev
);
2465 rbd_bus_del_dev(rbd_dev
);
2468 mutex_unlock(&ctl_mutex
);
2473 * create control files in sysfs
2476 static int rbd_sysfs_init(void)
2480 ret
= device_register(&rbd_root_dev
);
2484 ret
= bus_register(&rbd_bus_type
);
2486 device_unregister(&rbd_root_dev
);
2491 static void rbd_sysfs_cleanup(void)
2493 bus_unregister(&rbd_bus_type
);
2494 device_unregister(&rbd_root_dev
);
2497 int __init
rbd_init(void)
2501 rc
= rbd_sysfs_init();
2504 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
2508 void __exit
rbd_exit(void)
2510 rbd_sysfs_cleanup();
2513 module_init(rbd_init
);
2514 module_exit(rbd_exit
);
2516 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2517 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2518 MODULE_DESCRIPTION("rados block device");
2520 /* following authorship retained from original osdblk.c */
2521 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2523 MODULE_LICENSE("GPL");