2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
49 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN 64
51 #define RBD_MAX_SNAP_NAME_LEN 32
52 #define RBD_MAX_OPT_LEN 1024
54 #define RBD_SNAP_HEAD_NAME "-"
56 #define DEV_NAME_LEN 32
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
61 * block device image metadata (in-memory version)
63 struct rbd_image_header
{
69 struct rw_semaphore snap_rwsem
;
70 struct ceph_snap_context
*snapc
;
71 size_t snap_names_len
;
86 * an instance of the client. multiple devices may share a client.
89 struct ceph_client
*client
;
90 struct rbd_options
*rbd_opts
;
92 struct list_head node
;
99 struct request
*rq
; /* blk layer request */
100 struct bio
*bio
; /* cloned bio */
101 struct page
**pages
; /* list of used pages */
109 struct list_head node
;
117 int id
; /* blkdev unique id */
119 int major
; /* blkdev assigned major */
120 struct gendisk
*disk
; /* blkdev's gendisk and rq */
121 struct request_queue
*q
;
123 struct ceph_client
*client
;
124 struct rbd_client
*rbd_client
;
126 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
128 spinlock_t lock
; /* queue lock */
130 struct rbd_image_header header
;
131 char obj
[RBD_MAX_OBJ_NAME_LEN
]; /* rbd image name */
133 char obj_md_name
[RBD_MAX_MD_NAME_LEN
]; /* hdr nm. */
134 char pool_name
[RBD_MAX_POOL_NAME_LEN
];
137 struct ceph_osd_event
*watch_event
;
138 struct ceph_osd_request
*watch_request
;
140 char snap_name
[RBD_MAX_SNAP_NAME_LEN
];
141 u32 cur_snap
; /* index+1 of current snapshot within snap context
145 struct list_head node
;
147 /* list of snapshots */
148 struct list_head snaps
;
154 static struct bus_type rbd_bus_type
= {
158 static spinlock_t node_lock
; /* protects client get/put */
160 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
161 static LIST_HEAD(rbd_dev_list
); /* devices */
162 static LIST_HEAD(rbd_client_list
); /* clients */
164 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
);
165 static void rbd_dev_release(struct device
*dev
);
166 static ssize_t
rbd_snap_rollback(struct device
*dev
,
167 struct device_attribute
*attr
,
170 static ssize_t
rbd_snap_add(struct device
*dev
,
171 struct device_attribute
*attr
,
174 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
175 struct rbd_snap
*snap
);;
178 static struct rbd_device
*dev_to_rbd(struct device
*dev
)
180 return container_of(dev
, struct rbd_device
, dev
);
183 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
185 return get_device(&rbd_dev
->dev
);
188 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
190 put_device(&rbd_dev
->dev
);
193 static int __rbd_update_snaps(struct rbd_device
*rbd_dev
);
195 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
197 struct gendisk
*disk
= bdev
->bd_disk
;
198 struct rbd_device
*rbd_dev
= disk
->private_data
;
200 rbd_get_dev(rbd_dev
);
202 set_device_ro(bdev
, rbd_dev
->read_only
);
204 if ((mode
& FMODE_WRITE
) && rbd_dev
->read_only
)
210 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
212 struct rbd_device
*rbd_dev
= disk
->private_data
;
214 rbd_put_dev(rbd_dev
);
219 static const struct block_device_operations rbd_bd_ops
= {
220 .owner
= THIS_MODULE
,
222 .release
= rbd_release
,
226 * Initialize an rbd client instance.
229 static struct rbd_client
*rbd_client_create(struct ceph_options
*opt
,
230 struct rbd_options
*rbd_opts
)
232 struct rbd_client
*rbdc
;
235 dout("rbd_client_create\n");
236 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
240 kref_init(&rbdc
->kref
);
241 INIT_LIST_HEAD(&rbdc
->node
);
243 rbdc
->client
= ceph_create_client(opt
, rbdc
);
244 if (IS_ERR(rbdc
->client
))
246 opt
= NULL
; /* Now rbdc->client is responsible for opt */
248 ret
= ceph_open_session(rbdc
->client
);
252 rbdc
->rbd_opts
= rbd_opts
;
254 spin_lock(&node_lock
);
255 list_add_tail(&rbdc
->node
, &rbd_client_list
);
256 spin_unlock(&node_lock
);
258 dout("rbd_client_create created %p\n", rbdc
);
262 ceph_destroy_client(rbdc
->client
);
267 ceph_destroy_options(opt
);
272 * Find a ceph client with specific addr and configuration.
274 static struct rbd_client
*__rbd_client_find(struct ceph_options
*opt
)
276 struct rbd_client
*client_node
;
278 if (opt
->flags
& CEPH_OPT_NOSHARE
)
281 list_for_each_entry(client_node
, &rbd_client_list
, node
)
282 if (ceph_compare_options(opt
, client_node
->client
) == 0)
295 /* string args above */
298 static match_table_t rbdopt_tokens
= {
299 {Opt_notify_timeout
, "notify_timeout=%d"},
301 /* string args above */
305 static int parse_rbd_opts_token(char *c
, void *private)
307 struct rbd_options
*rbdopt
= private;
308 substring_t argstr
[MAX_OPT_ARGS
];
309 int token
, intval
, ret
;
311 token
= match_token((char *)c
, rbdopt_tokens
, argstr
);
315 if (token
< Opt_last_int
) {
316 ret
= match_int(&argstr
[0], &intval
);
318 pr_err("bad mount option arg (not int) "
322 dout("got int token %d val %d\n", token
, intval
);
323 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
324 dout("got string token %d val %s\n", token
,
327 dout("got token %d\n", token
);
331 case Opt_notify_timeout
:
332 rbdopt
->notify_timeout
= intval
;
341 * Get a ceph client with specific addr and configuration, if one does
342 * not exist create it.
344 static int rbd_get_client(struct rbd_device
*rbd_dev
, const char *mon_addr
,
347 struct rbd_client
*rbdc
;
348 struct ceph_options
*opt
;
350 struct rbd_options
*rbd_opts
;
352 rbd_opts
= kzalloc(sizeof(*rbd_opts
), GFP_KERNEL
);
356 rbd_opts
->notify_timeout
= RBD_NOTIFY_TIMEOUT_DEFAULT
;
358 ret
= ceph_parse_options(&opt
, options
, mon_addr
,
359 mon_addr
+ strlen(mon_addr
), parse_rbd_opts_token
, rbd_opts
);
363 spin_lock(&node_lock
);
364 rbdc
= __rbd_client_find(opt
);
366 ceph_destroy_options(opt
);
368 /* using an existing client */
369 kref_get(&rbdc
->kref
);
370 rbd_dev
->rbd_client
= rbdc
;
371 rbd_dev
->client
= rbdc
->client
;
372 spin_unlock(&node_lock
);
375 spin_unlock(&node_lock
);
377 rbdc
= rbd_client_create(opt
, rbd_opts
);
383 rbd_dev
->rbd_client
= rbdc
;
384 rbd_dev
->client
= rbdc
->client
;
392 * Destroy ceph client
394 static void rbd_client_release(struct kref
*kref
)
396 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
398 dout("rbd_release_client %p\n", rbdc
);
399 spin_lock(&node_lock
);
400 list_del(&rbdc
->node
);
401 spin_unlock(&node_lock
);
403 ceph_destroy_client(rbdc
->client
);
404 kfree(rbdc
->rbd_opts
);
409 * Drop reference to ceph client node. If it's not referenced anymore, release
412 static void rbd_put_client(struct rbd_device
*rbd_dev
)
414 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
415 rbd_dev
->rbd_client
= NULL
;
416 rbd_dev
->client
= NULL
;
421 * Create a new header structure, translate header format from the on-disk
424 static int rbd_header_from_disk(struct rbd_image_header
*header
,
425 struct rbd_image_header_ondisk
*ondisk
,
430 u32 snap_count
= le32_to_cpu(ondisk
->snap_count
);
433 init_rwsem(&header
->snap_rwsem
);
434 header
->snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
435 header
->snapc
= kmalloc(sizeof(struct ceph_snap_context
) +
437 sizeof(struct rbd_image_snap_ondisk
),
442 header
->snap_names
= kmalloc(header
->snap_names_len
,
444 if (!header
->snap_names
)
446 header
->snap_sizes
= kmalloc(snap_count
* sizeof(u64
),
448 if (!header
->snap_sizes
)
451 header
->snap_names
= NULL
;
452 header
->snap_sizes
= NULL
;
454 memcpy(header
->block_name
, ondisk
->block_name
,
455 sizeof(ondisk
->block_name
));
457 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
458 header
->obj_order
= ondisk
->options
.order
;
459 header
->crypt_type
= ondisk
->options
.crypt_type
;
460 header
->comp_type
= ondisk
->options
.comp_type
;
462 atomic_set(&header
->snapc
->nref
, 1);
463 header
->snap_seq
= le64_to_cpu(ondisk
->snap_seq
);
464 header
->snapc
->num_snaps
= snap_count
;
465 header
->total_snaps
= snap_count
;
468 allocated_snaps
== snap_count
) {
469 for (i
= 0; i
< snap_count
; i
++) {
470 header
->snapc
->snaps
[i
] =
471 le64_to_cpu(ondisk
->snaps
[i
].id
);
472 header
->snap_sizes
[i
] =
473 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
476 /* copy snapshot names */
477 memcpy(header
->snap_names
, &ondisk
->snaps
[i
],
478 header
->snap_names_len
);
484 kfree(header
->snap_names
);
486 kfree(header
->snapc
);
490 static int snap_index(struct rbd_image_header
*header
, int snap_num
)
492 return header
->total_snaps
- snap_num
;
495 static u64
cur_snap_id(struct rbd_device
*rbd_dev
)
497 struct rbd_image_header
*header
= &rbd_dev
->header
;
499 if (!rbd_dev
->cur_snap
)
502 return header
->snapc
->snaps
[snap_index(header
, rbd_dev
->cur_snap
)];
505 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
509 char *p
= header
->snap_names
;
511 for (i
= 0; i
< header
->total_snaps
; i
++, p
+= strlen(p
) + 1) {
512 if (strcmp(snap_name
, p
) == 0)
515 if (i
== header
->total_snaps
)
518 *seq
= header
->snapc
->snaps
[i
];
521 *size
= header
->snap_sizes
[i
];
526 static int rbd_header_set_snap(struct rbd_device
*dev
,
527 const char *snap_name
,
530 struct rbd_image_header
*header
= &dev
->header
;
531 struct ceph_snap_context
*snapc
= header
->snapc
;
534 down_write(&header
->snap_rwsem
);
538 strcmp(snap_name
, "-") == 0 ||
539 strcmp(snap_name
, RBD_SNAP_HEAD_NAME
) == 0) {
540 if (header
->total_snaps
)
541 snapc
->seq
= header
->snap_seq
;
547 *size
= header
->image_size
;
549 ret
= snap_by_name(header
, snap_name
, &snapc
->seq
, size
);
553 dev
->cur_snap
= header
->total_snaps
- ret
;
559 up_write(&header
->snap_rwsem
);
563 static void rbd_header_free(struct rbd_image_header
*header
)
565 kfree(header
->snapc
);
566 kfree(header
->snap_names
);
567 kfree(header
->snap_sizes
);
571 * get the actual striped segment name, offset and length
573 static u64
rbd_get_segment(struct rbd_image_header
*header
,
574 const char *block_name
,
576 char *seg_name
, u64
*segofs
)
578 u64 seg
= ofs
>> header
->obj_order
;
581 snprintf(seg_name
, RBD_MAX_SEG_NAME_LEN
,
582 "%s.%012llx", block_name
, seg
);
584 ofs
= ofs
& ((1 << header
->obj_order
) - 1);
585 len
= min_t(u64
, len
, (1 << header
->obj_order
) - ofs
);
597 static void bio_chain_put(struct bio
*chain
)
603 chain
= chain
->bi_next
;
609 * zeros a bio chain, starting at specific offset
611 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
620 bio_for_each_segment(bv
, chain
, i
) {
621 if (pos
+ bv
->bv_len
> start_ofs
) {
622 int remainder
= max(start_ofs
- pos
, 0);
623 buf
= bvec_kmap_irq(bv
, &flags
);
624 memset(buf
+ remainder
, 0,
625 bv
->bv_len
- remainder
);
626 bvec_kunmap_irq(buf
, &flags
);
631 chain
= chain
->bi_next
;
636 * bio_chain_clone - clone a chain of bios up to a certain length.
637 * might return a bio_pair that will need to be released.
639 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
640 struct bio_pair
**bp
,
641 int len
, gfp_t gfpmask
)
643 struct bio
*tmp
, *old_chain
= *old
, *new_chain
= NULL
, *tail
= NULL
;
647 bio_pair_release(*bp
);
651 while (old_chain
&& (total
< len
)) {
652 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
656 if (total
+ old_chain
->bi_size
> len
) {
660 * this split can only happen with a single paged bio,
661 * split_bio will BUG_ON if this is not the case
663 dout("bio_chain_clone split! total=%d remaining=%d"
665 (int)total
, (int)len
-total
,
666 (int)old_chain
->bi_size
);
668 /* split the bio. We'll release it either in the next
669 call, or it will have to be released outside */
670 bp
= bio_split(old_chain
, (len
- total
) / 512ULL);
674 __bio_clone(tmp
, &bp
->bio1
);
678 __bio_clone(tmp
, old_chain
);
679 *next
= old_chain
->bi_next
;
683 gfpmask
&= ~__GFP_WAIT
;
687 new_chain
= tail
= tmp
;
692 old_chain
= old_chain
->bi_next
;
694 total
+= tmp
->bi_size
;
700 tail
->bi_next
= NULL
;
707 dout("bio_chain_clone with err\n");
708 bio_chain_put(new_chain
);
713 * helpers for osd request op vectors.
715 static int rbd_create_rw_ops(struct ceph_osd_req_op
**ops
,
720 *ops
= kzalloc(sizeof(struct ceph_osd_req_op
) * (num_ops
+ 1),
724 (*ops
)[0].op
= opcode
;
726 * op extent offset and length will be set later on
727 * in calc_raw_layout()
729 (*ops
)[0].payload_len
= payload_len
;
733 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
739 * Send ceph osd request
741 static int rbd_do_request(struct request
*rq
,
742 struct rbd_device
*dev
,
743 struct ceph_snap_context
*snapc
,
745 const char *obj
, u64 ofs
, u64 len
,
750 struct ceph_osd_req_op
*ops
,
752 void (*rbd_cb
)(struct ceph_osd_request
*req
,
753 struct ceph_msg
*msg
),
754 struct ceph_osd_request
**linger_req
,
757 struct ceph_osd_request
*req
;
758 struct ceph_file_layout
*layout
;
761 struct timespec mtime
= CURRENT_TIME
;
762 struct rbd_request
*req_data
;
763 struct ceph_osd_request_head
*reqhead
;
764 struct rbd_image_header
*header
= &dev
->header
;
767 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
771 dout("rbd_do_request len=%lld ofs=%lld\n", len
, ofs
);
773 down_read(&header
->snap_rwsem
);
775 req
= ceph_osdc_alloc_request(&dev
->client
->osdc
, flags
,
779 GFP_NOIO
, pages
, bio
);
781 up_read(&header
->snap_rwsem
);
786 req
->r_callback
= rbd_cb
;
790 req_data
->pages
= pages
;
793 req
->r_priv
= req_data
;
795 reqhead
= req
->r_request
->front
.iov_base
;
796 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
798 strncpy(req
->r_oid
, obj
, sizeof(req
->r_oid
));
799 req
->r_oid_len
= strlen(req
->r_oid
);
801 layout
= &req
->r_file_layout
;
802 memset(layout
, 0, sizeof(*layout
));
803 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
804 layout
->fl_stripe_count
= cpu_to_le32(1);
805 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
806 layout
->fl_pg_preferred
= cpu_to_le32(-1);
807 layout
->fl_pg_pool
= cpu_to_le32(dev
->poolid
);
808 ceph_calc_raw_layout(&dev
->client
->osdc
, layout
, snapid
,
809 ofs
, &len
, &bno
, req
, ops
);
811 ceph_osdc_build_request(req
, ofs
, &len
,
815 req
->r_oid
, req
->r_oid_len
);
816 up_read(&header
->snap_rwsem
);
819 ceph_osdc_set_request_linger(&dev
->client
->osdc
, req
);
823 ret
= ceph_osdc_start_request(&dev
->client
->osdc
, req
, false);
828 ret
= ceph_osdc_wait_request(&dev
->client
->osdc
, req
);
830 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
831 dout("reassert_ver=%lld\n", le64_to_cpu(req
->r_reassert_version
.version
));
832 ceph_osdc_put_request(req
);
837 bio_chain_put(req_data
->bio
);
838 ceph_osdc_put_request(req
);
843 blk_end_request(rq
, ret
, len
);
848 * Ceph osd op callback
850 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
852 struct rbd_request
*req_data
= req
->r_priv
;
853 struct ceph_osd_reply_head
*replyhead
;
854 struct ceph_osd_op
*op
;
860 replyhead
= msg
->front
.iov_base
;
861 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
862 op
= (void *)(replyhead
+ 1);
863 rc
= le32_to_cpu(replyhead
->result
);
864 bytes
= le64_to_cpu(op
->extent
.length
);
865 read_op
= (le32_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
867 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes
, read_op
, rc
);
869 if (rc
== -ENOENT
&& read_op
) {
870 zero_bio_chain(req_data
->bio
, 0);
872 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
873 zero_bio_chain(req_data
->bio
, bytes
);
874 bytes
= req_data
->len
;
877 blk_end_request(req_data
->rq
, rc
, bytes
);
880 bio_chain_put(req_data
->bio
);
882 ceph_osdc_put_request(req
);
886 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
888 ceph_osdc_put_request(req
);
892 * Do a synchronous ceph osd operation
894 static int rbd_req_sync_op(struct rbd_device
*dev
,
895 struct ceph_snap_context
*snapc
,
899 struct ceph_osd_req_op
*orig_ops
,
904 struct ceph_osd_request
**linger_req
,
910 struct ceph_osd_req_op
*ops
= orig_ops
;
913 num_pages
= calc_pages_for(ofs
, len
);
914 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
916 return PTR_ERR(pages
);
919 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? len
: 0);
920 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
924 if ((flags
& CEPH_OSD_FLAG_WRITE
) && buf
) {
925 ret
= ceph_copy_to_page_vector(pages
, buf
, ofs
, len
);
931 ret
= rbd_do_request(NULL
, dev
, snapc
, snapid
,
942 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
943 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
947 rbd_destroy_ops(ops
);
949 ceph_release_page_vector(pages
, num_pages
);
954 * Do an asynchronous ceph osd operation
956 static int rbd_do_op(struct request
*rq
,
957 struct rbd_device
*rbd_dev
,
958 struct ceph_snap_context
*snapc
,
960 int opcode
, int flags
, int num_reply
,
968 struct ceph_osd_req_op
*ops
;
971 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
975 seg_len
= rbd_get_segment(&rbd_dev
->header
,
976 rbd_dev
->header
.block_name
,
980 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
982 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
986 /* we've taken care of segment sizes earlier when we
987 cloned the bios. We should never have a segment
988 truncated at this point */
989 BUG_ON(seg_len
< len
);
991 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
992 seg_name
, seg_ofs
, seg_len
,
998 rbd_req_cb
, 0, NULL
);
1005 * Request async osd write
1007 static int rbd_req_write(struct request
*rq
,
1008 struct rbd_device
*rbd_dev
,
1009 struct ceph_snap_context
*snapc
,
1013 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1015 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1021 * Request async osd read
1023 static int rbd_req_read(struct request
*rq
,
1024 struct rbd_device
*rbd_dev
,
1029 return rbd_do_op(rq
, rbd_dev
, NULL
,
1030 (snapid
? snapid
: CEPH_NOSNAP
),
1038 * Request sync osd read
1040 static int rbd_req_sync_read(struct rbd_device
*dev
,
1041 struct ceph_snap_context
*snapc
,
1048 return rbd_req_sync_op(dev
, NULL
,
1049 (snapid
? snapid
: CEPH_NOSNAP
),
1053 1, obj
, ofs
, len
, buf
, NULL
, ver
);
1057 * Request sync osd watch
1059 static int rbd_req_sync_notify_ack(struct rbd_device
*dev
,
1064 struct ceph_osd_req_op
*ops
;
1065 struct page
**pages
= NULL
;
1066 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1070 ops
[0].watch
.ver
= cpu_to_le64(dev
->header
.obj_version
);
1071 ops
[0].watch
.cookie
= notify_id
;
1072 ops
[0].watch
.flag
= 0;
1074 ret
= rbd_do_request(NULL
, dev
, NULL
, CEPH_NOSNAP
,
1080 rbd_simple_req_cb
, 0, NULL
);
1082 rbd_destroy_ops(ops
);
1086 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1088 struct rbd_device
*dev
= (struct rbd_device
*)data
;
1092 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev
->obj_md_name
,
1093 notify_id
, (int)opcode
);
1094 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1095 __rbd_update_snaps(dev
);
1096 mutex_unlock(&ctl_mutex
);
1098 rbd_req_sync_notify_ack(dev
, ver
, notify_id
, dev
->obj_md_name
);
1102 * Request sync osd watch
1104 static int rbd_req_sync_watch(struct rbd_device
*dev
,
1108 struct ceph_osd_req_op
*ops
;
1109 struct ceph_osd_client
*osdc
= &dev
->client
->osdc
;
1111 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_WATCH
, 0);
1115 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1116 (void *)dev
, &dev
->watch_event
);
1120 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1121 ops
[0].watch
.cookie
= cpu_to_le64(dev
->watch_event
->cookie
);
1122 ops
[0].watch
.flag
= 1;
1124 ret
= rbd_req_sync_op(dev
, NULL
,
1127 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1130 &dev
->watch_request
, NULL
);
1135 rbd_destroy_ops(ops
);
1139 ceph_osdc_cancel_event(dev
->watch_event
);
1140 dev
->watch_event
= NULL
;
1142 rbd_destroy_ops(ops
);
1146 struct rbd_notify_info
{
1147 struct rbd_device
*dev
;
1150 static void rbd_notify_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1152 struct rbd_device
*dev
= (struct rbd_device
*)data
;
1156 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev
->obj_md_name
,
1157 notify_id
, (int)opcode
);
1161 * Request sync osd notify
1163 static int rbd_req_sync_notify(struct rbd_device
*dev
,
1166 struct ceph_osd_req_op
*ops
;
1167 struct ceph_osd_client
*osdc
= &dev
->client
->osdc
;
1168 struct ceph_osd_event
*event
;
1169 struct rbd_notify_info info
;
1170 int payload_len
= sizeof(u32
) + sizeof(u32
);
1173 ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_NOTIFY
, payload_len
);
1179 ret
= ceph_osdc_create_event(osdc
, rbd_notify_cb
, 1,
1180 (void *)&info
, &event
);
1184 ops
[0].watch
.ver
= 1;
1185 ops
[0].watch
.flag
= 1;
1186 ops
[0].watch
.cookie
= event
->cookie
;
1187 ops
[0].watch
.prot_ver
= RADOS_NOTIFY_VER
;
1188 ops
[0].watch
.timeout
= 12;
1190 ret
= rbd_req_sync_op(dev
, NULL
,
1193 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1195 1, obj
, 0, 0, NULL
, NULL
, NULL
);
1199 ret
= ceph_osdc_wait_event(event
, CEPH_OSD_TIMEOUT_DEFAULT
);
1200 dout("ceph_osdc_wait_event returned %d\n", ret
);
1201 rbd_destroy_ops(ops
);
1205 ceph_osdc_cancel_event(event
);
1207 rbd_destroy_ops(ops
);
1212 * Request sync osd rollback
1214 static int rbd_req_sync_rollback_obj(struct rbd_device
*dev
,
1218 struct ceph_osd_req_op
*ops
;
1219 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_ROLLBACK
, 0);
1223 ops
[0].snap
.snapid
= snapid
;
1225 ret
= rbd_req_sync_op(dev
, NULL
,
1228 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1230 1, obj
, 0, 0, NULL
, NULL
, NULL
);
1232 rbd_destroy_ops(ops
);
1238 * Request sync osd read
1240 static int rbd_req_sync_exec(struct rbd_device
*dev
,
1248 struct ceph_osd_req_op
*ops
;
1249 int cls_len
= strlen(cls
);
1250 int method_len
= strlen(method
);
1251 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_CALL
,
1252 cls_len
+ method_len
+ len
);
1256 ops
[0].cls
.class_name
= cls
;
1257 ops
[0].cls
.class_len
= (__u8
)cls_len
;
1258 ops
[0].cls
.method_name
= method
;
1259 ops
[0].cls
.method_len
= (__u8
)method_len
;
1260 ops
[0].cls
.argc
= 0;
1261 ops
[0].cls
.indata
= data
;
1262 ops
[0].cls
.indata_len
= len
;
1264 ret
= rbd_req_sync_op(dev
, NULL
,
1267 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1269 1, obj
, 0, 0, NULL
, NULL
, ver
);
1271 rbd_destroy_ops(ops
);
1273 dout("cls_exec returned %d\n", ret
);
1278 * block device queue callback
1280 static void rbd_rq_fn(struct request_queue
*q
)
1282 struct rbd_device
*rbd_dev
= q
->queuedata
;
1284 struct bio_pair
*bp
= NULL
;
1286 rq
= blk_fetch_request(q
);
1290 struct bio
*rq_bio
, *next_bio
= NULL
;
1292 int size
, op_size
= 0;
1295 /* peek at request from block layer */
1299 dout("fetched request\n");
1301 /* filter out block requests we don't understand */
1302 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1303 __blk_end_request_all(rq
, 0);
1307 /* deduce our operation (read, write) */
1308 do_write
= (rq_data_dir(rq
) == WRITE
);
1310 size
= blk_rq_bytes(rq
);
1311 ofs
= blk_rq_pos(rq
) * 512ULL;
1313 if (do_write
&& rbd_dev
->read_only
) {
1314 __blk_end_request_all(rq
, -EROFS
);
1318 spin_unlock_irq(q
->queue_lock
);
1320 dout("%s 0x%x bytes at 0x%llx\n",
1321 do_write
? "write" : "read",
1322 size
, blk_rq_pos(rq
) * 512ULL);
1325 /* a bio clone to be passed down to OSD req */
1326 dout("rq->bio->bi_vcnt=%d\n", rq
->bio
->bi_vcnt
);
1327 op_size
= rbd_get_segment(&rbd_dev
->header
,
1328 rbd_dev
->header
.block_name
,
1331 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1332 op_size
, GFP_ATOMIC
);
1334 spin_lock_irq(q
->queue_lock
);
1335 __blk_end_request_all(rq
, -ENOMEM
);
1339 /* init OSD command: write or read */
1341 rbd_req_write(rq
, rbd_dev
,
1342 rbd_dev
->header
.snapc
,
1346 rbd_req_read(rq
, rbd_dev
,
1347 cur_snap_id(rbd_dev
),
1358 bio_pair_release(bp
);
1360 spin_lock_irq(q
->queue_lock
);
1362 rq
= blk_fetch_request(q
);
1367 * a queue callback. Makes sure that we don't create a bio that spans across
1368 * multiple osd objects. One exception would be with a single page bios,
1369 * which we handle later at bio_chain_clone
1371 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1372 struct bio_vec
*bvec
)
1374 struct rbd_device
*rbd_dev
= q
->queuedata
;
1375 unsigned int chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- 9);
1376 sector_t sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1377 unsigned int bio_sectors
= bmd
->bi_size
>> 9;
1380 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1381 + bio_sectors
)) << 9;
1383 max
= 0; /* bio_add cannot handle a negative return */
1384 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1385 return bvec
->bv_len
;
1389 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1391 struct gendisk
*disk
= rbd_dev
->disk
;
1396 rbd_header_free(&rbd_dev
->header
);
1398 if (disk
->flags
& GENHD_FL_UP
)
1401 blk_cleanup_queue(disk
->queue
);
1406 * reload the ondisk the header
1408 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1409 struct rbd_image_header
*header
)
1412 struct rbd_image_header_ondisk
*dh
;
1414 u64 snap_names_len
= 0;
1418 int len
= sizeof(*dh
) +
1419 snap_count
* sizeof(struct rbd_image_snap_ondisk
) +
1423 dh
= kmalloc(len
, GFP_KERNEL
);
1427 rc
= rbd_req_sync_read(rbd_dev
,
1429 rbd_dev
->obj_md_name
,
1435 rc
= rbd_header_from_disk(header
, dh
, snap_count
, GFP_KERNEL
);
1439 if (snap_count
!= header
->total_snaps
) {
1440 snap_count
= header
->total_snaps
;
1441 snap_names_len
= header
->snap_names_len
;
1442 rbd_header_free(header
);
1448 header
->obj_version
= ver
;
1458 static int rbd_header_add_snap(struct rbd_device
*dev
,
1459 const char *snap_name
,
1462 int name_len
= strlen(snap_name
);
1465 void *data
, *data_start
, *data_end
;
1468 /* we should create a snapshot only if we're pointing at the head */
1472 ret
= ceph_monc_create_snapid(&dev
->client
->monc
, dev
->poolid
,
1474 dout("created snapid=%lld\n", new_snapid
);
1478 data
= kmalloc(name_len
+ 16, gfp_flags
);
1483 data_end
= data
+ name_len
+ 16;
1485 ceph_encode_string_safe(&data
, data_end
, snap_name
, name_len
, bad
);
1486 ceph_encode_64_safe(&data
, data_end
, new_snapid
, bad
);
1488 ret
= rbd_req_sync_exec(dev
, dev
->obj_md_name
, "rbd", "snap_add",
1489 data_start
, data
- data_start
, &ver
);
1496 dev
->header
.snapc
->seq
= new_snapid
;
1503 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1505 struct rbd_snap
*snap
;
1507 while (!list_empty(&rbd_dev
->snaps
)) {
1508 snap
= list_first_entry(&rbd_dev
->snaps
, struct rbd_snap
, node
);
1509 __rbd_remove_snap_dev(rbd_dev
, snap
);
1514 * only read the first part of the ondisk header, without the snaps info
1516 static int __rbd_update_snaps(struct rbd_device
*rbd_dev
)
1519 struct rbd_image_header h
;
1523 ret
= rbd_read_header(rbd_dev
, &h
);
1527 down_write(&rbd_dev
->header
.snap_rwsem
);
1529 snap_seq
= rbd_dev
->header
.snapc
->seq
;
1530 if (rbd_dev
->header
.total_snaps
&&
1531 rbd_dev
->header
.snapc
->snaps
[0] == snap_seq
)
1532 /* pointing at the head, will need to follow that
1536 kfree(rbd_dev
->header
.snapc
);
1537 kfree(rbd_dev
->header
.snap_names
);
1538 kfree(rbd_dev
->header
.snap_sizes
);
1540 rbd_dev
->header
.total_snaps
= h
.total_snaps
;
1541 rbd_dev
->header
.snapc
= h
.snapc
;
1542 rbd_dev
->header
.snap_names
= h
.snap_names
;
1543 rbd_dev
->header
.snap_names_len
= h
.snap_names_len
;
1544 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1546 rbd_dev
->header
.snapc
->seq
= rbd_dev
->header
.snapc
->snaps
[0];
1548 rbd_dev
->header
.snapc
->seq
= snap_seq
;
1550 ret
= __rbd_init_snaps_header(rbd_dev
);
1552 up_write(&rbd_dev
->header
.snap_rwsem
);
1557 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1559 struct gendisk
*disk
;
1560 struct request_queue
*q
;
1564 /* contact OSD, request size info about the object being mapped */
1565 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1569 /* no need to lock here, as rbd_dev is not registered yet */
1570 rc
= __rbd_init_snaps_header(rbd_dev
);
1574 rc
= rbd_header_set_snap(rbd_dev
, rbd_dev
->snap_name
, &total_size
);
1578 /* create gendisk info */
1580 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1584 sprintf(disk
->disk_name
, DRV_NAME
"%d", rbd_dev
->id
);
1585 disk
->major
= rbd_dev
->major
;
1586 disk
->first_minor
= 0;
1587 disk
->fops
= &rbd_bd_ops
;
1588 disk
->private_data
= rbd_dev
;
1592 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1595 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1598 q
->queuedata
= rbd_dev
;
1600 rbd_dev
->disk
= disk
;
1603 /* finally, announce the disk to the world */
1604 set_capacity(disk
, total_size
/ 512ULL);
1607 pr_info("%s: added with size 0x%llx\n",
1608 disk
->disk_name
, (unsigned long long)total_size
);
1621 static ssize_t
rbd_size_show(struct device
*dev
,
1622 struct device_attribute
*attr
, char *buf
)
1624 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1626 return sprintf(buf
, "%llu\n", (unsigned long long)rbd_dev
->header
.image_size
);
1629 static ssize_t
rbd_major_show(struct device
*dev
,
1630 struct device_attribute
*attr
, char *buf
)
1632 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1634 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1637 static ssize_t
rbd_client_id_show(struct device
*dev
,
1638 struct device_attribute
*attr
, char *buf
)
1640 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1642 return sprintf(buf
, "client%lld\n", ceph_client_id(rbd_dev
->client
));
1645 static ssize_t
rbd_pool_show(struct device
*dev
,
1646 struct device_attribute
*attr
, char *buf
)
1648 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1650 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1653 static ssize_t
rbd_name_show(struct device
*dev
,
1654 struct device_attribute
*attr
, char *buf
)
1656 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1658 return sprintf(buf
, "%s\n", rbd_dev
->obj
);
1661 static ssize_t
rbd_snap_show(struct device
*dev
,
1662 struct device_attribute
*attr
,
1665 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1667 return sprintf(buf
, "%s\n", rbd_dev
->snap_name
);
1670 static ssize_t
rbd_image_refresh(struct device
*dev
,
1671 struct device_attribute
*attr
,
1675 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1679 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1681 rc
= __rbd_update_snaps(rbd_dev
);
1685 mutex_unlock(&ctl_mutex
);
1689 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
1690 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
1691 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
1692 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
1693 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
1694 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
1695 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
1696 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
1697 static DEVICE_ATTR(rollback_snap
, S_IWUSR
, NULL
, rbd_snap_rollback
);
1699 static struct attribute
*rbd_attrs
[] = {
1700 &dev_attr_size
.attr
,
1701 &dev_attr_major
.attr
,
1702 &dev_attr_client_id
.attr
,
1703 &dev_attr_pool
.attr
,
1704 &dev_attr_name
.attr
,
1705 &dev_attr_current_snap
.attr
,
1706 &dev_attr_refresh
.attr
,
1707 &dev_attr_create_snap
.attr
,
1708 &dev_attr_rollback_snap
.attr
,
1712 static struct attribute_group rbd_attr_group
= {
1716 static const struct attribute_group
*rbd_attr_groups
[] = {
1721 static void rbd_sysfs_dev_release(struct device
*dev
)
1725 static struct device_type rbd_device_type
= {
1727 .groups
= rbd_attr_groups
,
1728 .release
= rbd_sysfs_dev_release
,
1736 static ssize_t
rbd_snap_size_show(struct device
*dev
,
1737 struct device_attribute
*attr
,
1740 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1742 return sprintf(buf
, "%lld\n", (long long)snap
->size
);
1745 static ssize_t
rbd_snap_id_show(struct device
*dev
,
1746 struct device_attribute
*attr
,
1749 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1751 return sprintf(buf
, "%lld\n", (long long)snap
->id
);
1754 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
1755 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
1757 static struct attribute
*rbd_snap_attrs
[] = {
1758 &dev_attr_snap_size
.attr
,
1759 &dev_attr_snap_id
.attr
,
1763 static struct attribute_group rbd_snap_attr_group
= {
1764 .attrs
= rbd_snap_attrs
,
1767 static void rbd_snap_dev_release(struct device
*dev
)
1769 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1774 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
1775 &rbd_snap_attr_group
,
1779 static struct device_type rbd_snap_device_type
= {
1780 .groups
= rbd_snap_attr_groups
,
1781 .release
= rbd_snap_dev_release
,
1784 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
1785 struct rbd_snap
*snap
)
1787 list_del(&snap
->node
);
1788 device_unregister(&snap
->dev
);
1791 static int rbd_register_snap_dev(struct rbd_device
*rbd_dev
,
1792 struct rbd_snap
*snap
,
1793 struct device
*parent
)
1795 struct device
*dev
= &snap
->dev
;
1798 dev
->type
= &rbd_snap_device_type
;
1799 dev
->parent
= parent
;
1800 dev
->release
= rbd_snap_dev_release
;
1801 dev_set_name(dev
, "snap_%s", snap
->name
);
1802 ret
= device_register(dev
);
1807 static int __rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
1808 int i
, const char *name
,
1809 struct rbd_snap
**snapp
)
1812 struct rbd_snap
*snap
= kzalloc(sizeof(*snap
), GFP_KERNEL
);
1815 snap
->name
= kstrdup(name
, GFP_KERNEL
);
1816 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
1817 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
1818 if (device_is_registered(&rbd_dev
->dev
)) {
1819 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
1833 * search for the previous snap in a null delimited string list
1835 const char *rbd_prev_snap_name(const char *name
, const char *start
)
1837 if (name
< start
+ 2)
1850 * compare the old list of snapshots that we have to what's in the header
1851 * and update it accordingly. Note that the header holds the snapshots
1852 * in a reverse order (from newest to oldest) and we need to go from
1853 * older to new so that we don't get a duplicate snap name when
1854 * doing the process (e.g., removed snapshot and recreated a new
1855 * one with the same name.
1857 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
)
1859 const char *name
, *first_name
;
1860 int i
= rbd_dev
->header
.total_snaps
;
1861 struct rbd_snap
*snap
, *old_snap
= NULL
;
1863 struct list_head
*p
, *n
;
1865 first_name
= rbd_dev
->header
.snap_names
;
1866 name
= first_name
+ rbd_dev
->header
.snap_names_len
;
1868 list_for_each_prev_safe(p
, n
, &rbd_dev
->snaps
) {
1871 old_snap
= list_entry(p
, struct rbd_snap
, node
);
1874 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
- 1];
1876 if (!i
|| old_snap
->id
< cur_id
) {
1877 /* old_snap->id was skipped, thus was removed */
1878 __rbd_remove_snap_dev(rbd_dev
, old_snap
);
1881 if (old_snap
->id
== cur_id
) {
1882 /* we have this snapshot already */
1884 name
= rbd_prev_snap_name(name
, first_name
);
1888 i
--, name
= rbd_prev_snap_name(name
, first_name
)) {
1893 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
];
1894 /* snapshot removal? handle it above */
1895 if (cur_id
>= old_snap
->id
)
1897 /* a new snapshot */
1898 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
1902 /* note that we add it backward so using n and not p */
1903 list_add(&snap
->node
, n
);
1907 /* we're done going over the old snap list, just add what's left */
1908 for (; i
> 0; i
--) {
1909 name
= rbd_prev_snap_name(name
, first_name
);
1914 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
1917 list_add(&snap
->node
, &rbd_dev
->snaps
);
1924 static void rbd_root_dev_release(struct device
*dev
)
1928 static struct device rbd_root_dev
= {
1930 .release
= rbd_root_dev_release
,
1933 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
1937 struct rbd_snap
*snap
;
1939 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1940 dev
= &rbd_dev
->dev
;
1942 dev
->bus
= &rbd_bus_type
;
1943 dev
->type
= &rbd_device_type
;
1944 dev
->parent
= &rbd_root_dev
;
1945 dev
->release
= rbd_dev_release
;
1946 dev_set_name(dev
, "%d", rbd_dev
->id
);
1947 ret
= device_register(dev
);
1951 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
1952 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
1958 mutex_unlock(&ctl_mutex
);
1961 mutex_unlock(&ctl_mutex
);
1965 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
1967 device_unregister(&rbd_dev
->dev
);
1970 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
1975 ret
= rbd_req_sync_watch(rbd_dev
, rbd_dev
->obj_md_name
,
1976 rbd_dev
->header
.obj_version
);
1977 if (ret
== -ERANGE
) {
1978 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1979 rc
= __rbd_update_snaps(rbd_dev
);
1980 mutex_unlock(&ctl_mutex
);
1984 } while (ret
== -ERANGE
);
1989 static ssize_t
rbd_add(struct bus_type
*bus
,
1993 struct ceph_osd_client
*osdc
;
1994 struct rbd_device
*rbd_dev
;
1995 ssize_t rc
= -ENOMEM
;
1996 int irc
, new_id
= 0;
1997 struct list_head
*tmp
;
2001 if (!try_module_get(THIS_MODULE
))
2004 mon_dev_name
= kmalloc(RBD_MAX_OPT_LEN
, GFP_KERNEL
);
2008 options
= kmalloc(RBD_MAX_OPT_LEN
, GFP_KERNEL
);
2012 /* new rbd_device object */
2013 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2017 /* static rbd_device initialization */
2018 spin_lock_init(&rbd_dev
->lock
);
2019 INIT_LIST_HEAD(&rbd_dev
->node
);
2020 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2022 /* generate unique id: find highest unique id, add one */
2023 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2025 list_for_each(tmp
, &rbd_dev_list
) {
2026 struct rbd_device
*rbd_dev
;
2028 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2029 if (rbd_dev
->id
>= new_id
)
2030 new_id
= rbd_dev
->id
+ 1;
2033 rbd_dev
->id
= new_id
;
2035 /* add to global list */
2036 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2038 /* parse add command */
2039 if (sscanf(buf
, "%" __stringify(RBD_MAX_OPT_LEN
) "s "
2040 "%" __stringify(RBD_MAX_OPT_LEN
) "s "
2041 "%" __stringify(RBD_MAX_POOL_NAME_LEN
) "s "
2042 "%" __stringify(RBD_MAX_OBJ_NAME_LEN
) "s"
2043 "%" __stringify(RBD_MAX_SNAP_NAME_LEN
) "s",
2044 mon_dev_name
, options
, rbd_dev
->pool_name
,
2045 rbd_dev
->obj
, rbd_dev
->snap_name
) < 4) {
2050 if (rbd_dev
->snap_name
[0] == 0)
2051 rbd_dev
->snap_name
[0] = '-';
2053 rbd_dev
->obj_len
= strlen(rbd_dev
->obj
);
2054 snprintf(rbd_dev
->obj_md_name
, sizeof(rbd_dev
->obj_md_name
), "%s%s",
2055 rbd_dev
->obj
, RBD_SUFFIX
);
2057 /* initialize rest of new object */
2058 snprintf(rbd_dev
->name
, DEV_NAME_LEN
, DRV_NAME
"%d", rbd_dev
->id
);
2059 rc
= rbd_get_client(rbd_dev
, mon_dev_name
, options
);
2063 mutex_unlock(&ctl_mutex
);
2066 osdc
= &rbd_dev
->client
->osdc
;
2067 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2069 goto err_out_client
;
2070 rbd_dev
->poolid
= rc
;
2072 /* register our block device */
2073 irc
= register_blkdev(0, rbd_dev
->name
);
2076 goto err_out_client
;
2078 rbd_dev
->major
= irc
;
2080 rc
= rbd_bus_add_dev(rbd_dev
);
2082 goto err_out_blkdev
;
2084 /* set up and announce blkdev mapping */
2085 rc
= rbd_init_disk(rbd_dev
);
2089 rc
= rbd_init_watch_dev(rbd_dev
);
2096 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2097 list_del_init(&rbd_dev
->node
);
2098 mutex_unlock(&ctl_mutex
);
2100 /* this will also clean up rest of rbd_dev stuff */
2102 rbd_bus_del_dev(rbd_dev
);
2104 kfree(mon_dev_name
);
2108 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2110 rbd_put_client(rbd_dev
);
2111 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2113 list_del_init(&rbd_dev
->node
);
2114 mutex_unlock(&ctl_mutex
);
2120 kfree(mon_dev_name
);
2122 dout("Error adding device %s\n", buf
);
2123 module_put(THIS_MODULE
);
2127 static struct rbd_device
*__rbd_get_dev(unsigned long id
)
2129 struct list_head
*tmp
;
2130 struct rbd_device
*rbd_dev
;
2132 list_for_each(tmp
, &rbd_dev_list
) {
2133 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2134 if (rbd_dev
->id
== id
)
2140 static void rbd_dev_release(struct device
*dev
)
2142 struct rbd_device
*rbd_dev
=
2143 container_of(dev
, struct rbd_device
, dev
);
2145 if (rbd_dev
->watch_request
)
2146 ceph_osdc_unregister_linger_request(&rbd_dev
->client
->osdc
,
2147 rbd_dev
->watch_request
);
2148 if (rbd_dev
->watch_event
)
2149 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
2151 rbd_put_client(rbd_dev
);
2153 /* clean up and free blkdev */
2154 rbd_free_disk(rbd_dev
);
2155 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2158 /* release module ref */
2159 module_put(THIS_MODULE
);
2162 static ssize_t
rbd_remove(struct bus_type
*bus
,
2166 struct rbd_device
*rbd_dev
= NULL
;
2171 rc
= strict_strtoul(buf
, 10, &ul
);
2175 /* convert to int; abort if we lost anything in the conversion */
2176 target_id
= (int) ul
;
2177 if (target_id
!= ul
)
2180 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2182 rbd_dev
= __rbd_get_dev(target_id
);
2188 list_del_init(&rbd_dev
->node
);
2190 __rbd_remove_all_snaps(rbd_dev
);
2191 rbd_bus_del_dev(rbd_dev
);
2194 mutex_unlock(&ctl_mutex
);
2198 static ssize_t
rbd_snap_add(struct device
*dev
,
2199 struct device_attribute
*attr
,
2203 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
2205 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
2209 snprintf(name
, count
, "%s", buf
);
2211 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2213 ret
= rbd_header_add_snap(rbd_dev
,
2218 ret
= __rbd_update_snaps(rbd_dev
);
2222 /* shouldn't hold ctl_mutex when notifying.. notify might
2223 trigger a watch callback that would need to get that mutex */
2224 mutex_unlock(&ctl_mutex
);
2226 /* make a best effort, don't error if failed */
2227 rbd_req_sync_notify(rbd_dev
, rbd_dev
->obj_md_name
);
2234 mutex_unlock(&ctl_mutex
);
2239 static ssize_t
rbd_snap_rollback(struct device
*dev
,
2240 struct device_attribute
*attr
,
2244 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
2248 char *seg_name
= NULL
;
2249 char *snap_name
= kmalloc(count
+ 1, GFP_KERNEL
);
2254 /* parse snaps add command */
2255 snprintf(snap_name
, count
, "%s", buf
);
2256 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
2260 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2262 ret
= snap_by_name(&rbd_dev
->header
, snap_name
, &snapid
, NULL
);
2266 dout("snapid=%lld\n", snapid
);
2269 while (cur_ofs
< rbd_dev
->header
.image_size
) {
2270 cur_ofs
+= rbd_get_segment(&rbd_dev
->header
,
2274 dout("seg_name=%s\n", seg_name
);
2276 ret
= rbd_req_sync_rollback_obj(rbd_dev
, snapid
, seg_name
);
2278 pr_warning("could not roll back obj %s err=%d\n",
2282 ret
= __rbd_update_snaps(rbd_dev
);
2289 mutex_unlock(&ctl_mutex
);
2297 static struct bus_attribute rbd_bus_attrs
[] = {
2298 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
2299 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
2304 * create control files in sysfs
2307 static int rbd_sysfs_init(void)
2311 rbd_bus_type
.bus_attrs
= rbd_bus_attrs
;
2313 ret
= bus_register(&rbd_bus_type
);
2317 ret
= device_register(&rbd_root_dev
);
2322 static void rbd_sysfs_cleanup(void)
2324 device_unregister(&rbd_root_dev
);
2325 bus_unregister(&rbd_bus_type
);
2328 int __init
rbd_init(void)
2332 rc
= rbd_sysfs_init();
2335 spin_lock_init(&node_lock
);
2336 pr_info("loaded " DRV_NAME_LONG
"\n");
2340 void __exit
rbd_exit(void)
2342 rbd_sysfs_cleanup();
2345 module_init(rbd_init
);
2346 module_exit(rbd_exit
);
2348 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2349 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2350 MODULE_DESCRIPTION("rados block device");
2352 /* following authorship retained from original osdblk.c */
2353 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2355 MODULE_LICENSE("GPL");