Merge branch 'move-drivers' of git://git.kernel.org/pub/scm/linux/kernel/git/davej...
[linux-2.6/next.git] / drivers / block / rbd.c
blob9712fad82bc6cf2be78daaa503432e8725ab2a4f
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
49 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN 64
51 #define RBD_MAX_SNAP_NAME_LEN 32
52 #define RBD_MAX_OPT_LEN 1024
54 #define RBD_SNAP_HEAD_NAME "-"
56 #define DEV_NAME_LEN 32
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
61 * block device image metadata (in-memory version)
63 struct rbd_image_header {
64 u64 image_size;
65 char block_name[32];
66 __u8 obj_order;
67 __u8 crypt_type;
68 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
72 u64 snap_seq;
73 u32 total_snaps;
75 char *snap_names;
76 u64 *snap_sizes;
78 u64 obj_version;
81 struct rbd_options {
82 int notify_timeout;
86 * an instance of the client. multiple devices may share a client.
88 struct rbd_client {
89 struct ceph_client *client;
90 struct rbd_options *rbd_opts;
91 struct kref kref;
92 struct list_head node;
95 struct rbd_req_coll;
98 * a single io request
100 struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
105 int coll_index;
106 struct rbd_req_coll *coll;
109 struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
116 * a collection of requests
118 struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
125 struct rbd_snap {
126 struct device dev;
127 const char *name;
128 size_t size;
129 struct list_head node;
130 u64 id;
134 * a single device
136 struct rbd_device {
137 int id; /* blkdev unique id */
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
143 struct ceph_client *client;
144 struct rbd_client *rbd_client;
146 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
148 spinlock_t lock; /* queue lock */
150 struct rbd_image_header header;
151 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152 int obj_len;
153 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154 char pool_name[RBD_MAX_POOL_NAME_LEN];
155 int poolid;
157 struct ceph_osd_event *watch_event;
158 struct ceph_osd_request *watch_request;
160 char snap_name[RBD_MAX_SNAP_NAME_LEN];
161 u32 cur_snap; /* index+1 of current snapshot within snap context
162 0 - for the head */
163 int read_only;
165 struct list_head node;
167 /* list of snapshots */
168 struct list_head snaps;
170 /* sysfs related */
171 struct device dev;
174 static struct bus_type rbd_bus_type = {
175 .name = "rbd",
178 static spinlock_t node_lock; /* protects client get/put */
180 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list); /* devices */
182 static LIST_HEAD(rbd_client_list); /* clients */
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_rollback(struct device *dev,
187 struct device_attribute *attr,
188 const char *buf,
189 size_t size);
190 static ssize_t rbd_snap_add(struct device *dev,
191 struct device_attribute *attr,
192 const char *buf,
193 size_t count);
194 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
195 struct rbd_snap *snap);;
198 static struct rbd_device *dev_to_rbd(struct device *dev)
200 return container_of(dev, struct rbd_device, dev);
203 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
205 return get_device(&rbd_dev->dev);
208 static void rbd_put_dev(struct rbd_device *rbd_dev)
210 put_device(&rbd_dev->dev);
213 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
215 static int rbd_open(struct block_device *bdev, fmode_t mode)
217 struct gendisk *disk = bdev->bd_disk;
218 struct rbd_device *rbd_dev = disk->private_data;
220 rbd_get_dev(rbd_dev);
222 set_device_ro(bdev, rbd_dev->read_only);
224 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
225 return -EROFS;
227 return 0;
230 static int rbd_release(struct gendisk *disk, fmode_t mode)
232 struct rbd_device *rbd_dev = disk->private_data;
234 rbd_put_dev(rbd_dev);
236 return 0;
239 static const struct block_device_operations rbd_bd_ops = {
240 .owner = THIS_MODULE,
241 .open = rbd_open,
242 .release = rbd_release,
246 * Initialize an rbd client instance.
247 * We own *opt.
249 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
250 struct rbd_options *rbd_opts)
252 struct rbd_client *rbdc;
253 int ret = -ENOMEM;
255 dout("rbd_client_create\n");
256 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
257 if (!rbdc)
258 goto out_opt;
260 kref_init(&rbdc->kref);
261 INIT_LIST_HEAD(&rbdc->node);
263 rbdc->client = ceph_create_client(opt, rbdc);
264 if (IS_ERR(rbdc->client))
265 goto out_rbdc;
266 opt = NULL; /* Now rbdc->client is responsible for opt */
268 ret = ceph_open_session(rbdc->client);
269 if (ret < 0)
270 goto out_err;
272 rbdc->rbd_opts = rbd_opts;
274 spin_lock(&node_lock);
275 list_add_tail(&rbdc->node, &rbd_client_list);
276 spin_unlock(&node_lock);
278 dout("rbd_client_create created %p\n", rbdc);
279 return rbdc;
281 out_err:
282 ceph_destroy_client(rbdc->client);
283 out_rbdc:
284 kfree(rbdc);
285 out_opt:
286 if (opt)
287 ceph_destroy_options(opt);
288 return ERR_PTR(ret);
292 * Find a ceph client with specific addr and configuration.
294 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
296 struct rbd_client *client_node;
298 if (opt->flags & CEPH_OPT_NOSHARE)
299 return NULL;
301 list_for_each_entry(client_node, &rbd_client_list, node)
302 if (ceph_compare_options(opt, client_node->client) == 0)
303 return client_node;
304 return NULL;
308 * mount options
310 enum {
311 Opt_notify_timeout,
312 Opt_last_int,
313 /* int args above */
314 Opt_last_string,
315 /* string args above */
318 static match_table_t rbdopt_tokens = {
319 {Opt_notify_timeout, "notify_timeout=%d"},
320 /* int args above */
321 /* string args above */
322 {-1, NULL}
325 static int parse_rbd_opts_token(char *c, void *private)
327 struct rbd_options *rbdopt = private;
328 substring_t argstr[MAX_OPT_ARGS];
329 int token, intval, ret;
331 token = match_token((char *)c, rbdopt_tokens, argstr);
332 if (token < 0)
333 return -EINVAL;
335 if (token < Opt_last_int) {
336 ret = match_int(&argstr[0], &intval);
337 if (ret < 0) {
338 pr_err("bad mount option arg (not int) "
339 "at '%s'\n", c);
340 return ret;
342 dout("got int token %d val %d\n", token, intval);
343 } else if (token > Opt_last_int && token < Opt_last_string) {
344 dout("got string token %d val %s\n", token,
345 argstr[0].from);
346 } else {
347 dout("got token %d\n", token);
350 switch (token) {
351 case Opt_notify_timeout:
352 rbdopt->notify_timeout = intval;
353 break;
354 default:
355 BUG_ON(token);
357 return 0;
361 * Get a ceph client with specific addr and configuration, if one does
362 * not exist create it.
364 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
365 char *options)
367 struct rbd_client *rbdc;
368 struct ceph_options *opt;
369 int ret;
370 struct rbd_options *rbd_opts;
372 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
373 if (!rbd_opts)
374 return -ENOMEM;
376 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
378 ret = ceph_parse_options(&opt, options, mon_addr,
379 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
380 if (ret < 0)
381 goto done_err;
383 spin_lock(&node_lock);
384 rbdc = __rbd_client_find(opt);
385 if (rbdc) {
386 ceph_destroy_options(opt);
388 /* using an existing client */
389 kref_get(&rbdc->kref);
390 rbd_dev->rbd_client = rbdc;
391 rbd_dev->client = rbdc->client;
392 spin_unlock(&node_lock);
393 return 0;
395 spin_unlock(&node_lock);
397 rbdc = rbd_client_create(opt, rbd_opts);
398 if (IS_ERR(rbdc)) {
399 ret = PTR_ERR(rbdc);
400 goto done_err;
403 rbd_dev->rbd_client = rbdc;
404 rbd_dev->client = rbdc->client;
405 return 0;
406 done_err:
407 kfree(rbd_opts);
408 return ret;
412 * Destroy ceph client
414 static void rbd_client_release(struct kref *kref)
416 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
418 dout("rbd_release_client %p\n", rbdc);
419 spin_lock(&node_lock);
420 list_del(&rbdc->node);
421 spin_unlock(&node_lock);
423 ceph_destroy_client(rbdc->client);
424 kfree(rbdc->rbd_opts);
425 kfree(rbdc);
429 * Drop reference to ceph client node. If it's not referenced anymore, release
430 * it.
432 static void rbd_put_client(struct rbd_device *rbd_dev)
434 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435 rbd_dev->rbd_client = NULL;
436 rbd_dev->client = NULL;
440 * Destroy requests collection
442 static void rbd_coll_release(struct kref *kref)
444 struct rbd_req_coll *coll =
445 container_of(kref, struct rbd_req_coll, kref);
447 dout("rbd_coll_release %p\n", coll);
448 kfree(coll);
452 * Create a new header structure, translate header format from the on-disk
453 * header.
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456 struct rbd_image_header_ondisk *ondisk,
457 int allocated_snaps,
458 gfp_t gfp_flags)
460 int i;
461 u32 snap_count = le32_to_cpu(ondisk->snap_count);
462 int ret = -ENOMEM;
464 init_rwsem(&header->snap_rwsem);
465 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467 snap_count *
468 sizeof(struct rbd_image_snap_ondisk),
469 gfp_flags);
470 if (!header->snapc)
471 return -ENOMEM;
472 if (snap_count) {
473 header->snap_names = kmalloc(header->snap_names_len,
474 GFP_KERNEL);
475 if (!header->snap_names)
476 goto err_snapc;
477 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478 GFP_KERNEL);
479 if (!header->snap_sizes)
480 goto err_names;
481 } else {
482 header->snap_names = NULL;
483 header->snap_sizes = NULL;
485 memcpy(header->block_name, ondisk->block_name,
486 sizeof(ondisk->block_name));
488 header->image_size = le64_to_cpu(ondisk->image_size);
489 header->obj_order = ondisk->options.order;
490 header->crypt_type = ondisk->options.crypt_type;
491 header->comp_type = ondisk->options.comp_type;
493 atomic_set(&header->snapc->nref, 1);
494 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495 header->snapc->num_snaps = snap_count;
496 header->total_snaps = snap_count;
498 if (snap_count &&
499 allocated_snaps == snap_count) {
500 for (i = 0; i < snap_count; i++) {
501 header->snapc->snaps[i] =
502 le64_to_cpu(ondisk->snaps[i].id);
503 header->snap_sizes[i] =
504 le64_to_cpu(ondisk->snaps[i].image_size);
507 /* copy snapshot names */
508 memcpy(header->snap_names, &ondisk->snaps[i],
509 header->snap_names_len);
512 return 0;
514 err_names:
515 kfree(header->snap_names);
516 err_snapc:
517 kfree(header->snapc);
518 return ret;
521 static int snap_index(struct rbd_image_header *header, int snap_num)
523 return header->total_snaps - snap_num;
526 static u64 cur_snap_id(struct rbd_device *rbd_dev)
528 struct rbd_image_header *header = &rbd_dev->header;
530 if (!rbd_dev->cur_snap)
531 return 0;
533 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537 u64 *seq, u64 *size)
539 int i;
540 char *p = header->snap_names;
542 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543 if (strcmp(snap_name, p) == 0)
544 break;
546 if (i == header->total_snaps)
547 return -ENOENT;
548 if (seq)
549 *seq = header->snapc->snaps[i];
551 if (size)
552 *size = header->snap_sizes[i];
554 return i;
557 static int rbd_header_set_snap(struct rbd_device *dev,
558 const char *snap_name,
559 u64 *size)
561 struct rbd_image_header *header = &dev->header;
562 struct ceph_snap_context *snapc = header->snapc;
563 int ret = -ENOENT;
565 down_write(&header->snap_rwsem);
567 if (!snap_name ||
568 !*snap_name ||
569 strcmp(snap_name, "-") == 0 ||
570 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571 if (header->total_snaps)
572 snapc->seq = header->snap_seq;
573 else
574 snapc->seq = 0;
575 dev->cur_snap = 0;
576 dev->read_only = 0;
577 if (size)
578 *size = header->image_size;
579 } else {
580 ret = snap_by_name(header, snap_name, &snapc->seq, size);
581 if (ret < 0)
582 goto done;
584 dev->cur_snap = header->total_snaps - ret;
585 dev->read_only = 1;
588 ret = 0;
589 done:
590 up_write(&header->snap_rwsem);
591 return ret;
594 static void rbd_header_free(struct rbd_image_header *header)
596 kfree(header->snapc);
597 kfree(header->snap_names);
598 kfree(header->snap_sizes);
602 * get the actual striped segment name, offset and length
604 static u64 rbd_get_segment(struct rbd_image_header *header,
605 const char *block_name,
606 u64 ofs, u64 len,
607 char *seg_name, u64 *segofs)
609 u64 seg = ofs >> header->obj_order;
611 if (seg_name)
612 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613 "%s.%012llx", block_name, seg);
615 ofs = ofs & ((1 << header->obj_order) - 1);
616 len = min_t(u64, len, (1 << header->obj_order) - ofs);
618 if (segofs)
619 *segofs = ofs;
621 return len;
624 static int rbd_get_num_segments(struct rbd_image_header *header,
625 u64 ofs, u64 len)
627 u64 start_seg = ofs >> header->obj_order;
628 u64 end_seg = (ofs + len - 1) >> header->obj_order;
629 return end_seg - start_seg + 1;
633 * bio helpers
636 static void bio_chain_put(struct bio *chain)
638 struct bio *tmp;
640 while (chain) {
641 tmp = chain;
642 chain = chain->bi_next;
643 bio_put(tmp);
648 * zeros a bio chain, starting at specific offset
650 static void zero_bio_chain(struct bio *chain, int start_ofs)
652 struct bio_vec *bv;
653 unsigned long flags;
654 void *buf;
655 int i;
656 int pos = 0;
658 while (chain) {
659 bio_for_each_segment(bv, chain, i) {
660 if (pos + bv->bv_len > start_ofs) {
661 int remainder = max(start_ofs - pos, 0);
662 buf = bvec_kmap_irq(bv, &flags);
663 memset(buf + remainder, 0,
664 bv->bv_len - remainder);
665 bvec_kunmap_irq(buf, &flags);
667 pos += bv->bv_len;
670 chain = chain->bi_next;
675 * bio_chain_clone - clone a chain of bios up to a certain length.
676 * might return a bio_pair that will need to be released.
678 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
679 struct bio_pair **bp,
680 int len, gfp_t gfpmask)
682 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
683 int total = 0;
685 if (*bp) {
686 bio_pair_release(*bp);
687 *bp = NULL;
690 while (old_chain && (total < len)) {
691 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
692 if (!tmp)
693 goto err_out;
695 if (total + old_chain->bi_size > len) {
696 struct bio_pair *bp;
699 * this split can only happen with a single paged bio,
700 * split_bio will BUG_ON if this is not the case
702 dout("bio_chain_clone split! total=%d remaining=%d"
703 "bi_size=%d\n",
704 (int)total, (int)len-total,
705 (int)old_chain->bi_size);
707 /* split the bio. We'll release it either in the next
708 call, or it will have to be released outside */
709 bp = bio_split(old_chain, (len - total) / 512ULL);
710 if (!bp)
711 goto err_out;
713 __bio_clone(tmp, &bp->bio1);
715 *next = &bp->bio2;
716 } else {
717 __bio_clone(tmp, old_chain);
718 *next = old_chain->bi_next;
721 tmp->bi_bdev = NULL;
722 gfpmask &= ~__GFP_WAIT;
723 tmp->bi_next = NULL;
725 if (!new_chain) {
726 new_chain = tail = tmp;
727 } else {
728 tail->bi_next = tmp;
729 tail = tmp;
731 old_chain = old_chain->bi_next;
733 total += tmp->bi_size;
736 BUG_ON(total < len);
738 if (tail)
739 tail->bi_next = NULL;
741 *old = old_chain;
743 return new_chain;
745 err_out:
746 dout("bio_chain_clone with err\n");
747 bio_chain_put(new_chain);
748 return NULL;
752 * helpers for osd request op vectors.
754 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
755 int num_ops,
756 int opcode,
757 u32 payload_len)
759 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
760 GFP_NOIO);
761 if (!*ops)
762 return -ENOMEM;
763 (*ops)[0].op = opcode;
765 * op extent offset and length will be set later on
766 * in calc_raw_layout()
768 (*ops)[0].payload_len = payload_len;
769 return 0;
772 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
774 kfree(ops);
777 static void rbd_coll_end_req_index(struct request *rq,
778 struct rbd_req_coll *coll,
779 int index,
780 int ret, u64 len)
782 struct request_queue *q;
783 int min, max, i;
785 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786 coll, index, ret, len);
788 if (!rq)
789 return;
791 if (!coll) {
792 blk_end_request(rq, ret, len);
793 return;
796 q = rq->q;
798 spin_lock_irq(q->queue_lock);
799 coll->status[index].done = 1;
800 coll->status[index].rc = ret;
801 coll->status[index].bytes = len;
802 max = min = coll->num_done;
803 while (max < coll->total && coll->status[max].done)
804 max++;
806 for (i = min; i<max; i++) {
807 __blk_end_request(rq, coll->status[i].rc,
808 coll->status[i].bytes);
809 coll->num_done++;
810 kref_put(&coll->kref, rbd_coll_release);
812 spin_unlock_irq(q->queue_lock);
815 static void rbd_coll_end_req(struct rbd_request *req,
816 int ret, u64 len)
818 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
822 * Send ceph osd request
824 static int rbd_do_request(struct request *rq,
825 struct rbd_device *dev,
826 struct ceph_snap_context *snapc,
827 u64 snapid,
828 const char *obj, u64 ofs, u64 len,
829 struct bio *bio,
830 struct page **pages,
831 int num_pages,
832 int flags,
833 struct ceph_osd_req_op *ops,
834 int num_reply,
835 struct rbd_req_coll *coll,
836 int coll_index,
837 void (*rbd_cb)(struct ceph_osd_request *req,
838 struct ceph_msg *msg),
839 struct ceph_osd_request **linger_req,
840 u64 *ver)
842 struct ceph_osd_request *req;
843 struct ceph_file_layout *layout;
844 int ret;
845 u64 bno;
846 struct timespec mtime = CURRENT_TIME;
847 struct rbd_request *req_data;
848 struct ceph_osd_request_head *reqhead;
849 struct rbd_image_header *header = &dev->header;
851 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
852 if (!req_data) {
853 if (coll)
854 rbd_coll_end_req_index(rq, coll, coll_index,
855 -ENOMEM, len);
856 return -ENOMEM;
859 if (coll) {
860 req_data->coll = coll;
861 req_data->coll_index = coll_index;
864 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
866 down_read(&header->snap_rwsem);
868 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
869 snapc,
870 ops,
871 false,
872 GFP_NOIO, pages, bio);
873 if (!req) {
874 up_read(&header->snap_rwsem);
875 ret = -ENOMEM;
876 goto done_pages;
879 req->r_callback = rbd_cb;
881 req_data->rq = rq;
882 req_data->bio = bio;
883 req_data->pages = pages;
884 req_data->len = len;
886 req->r_priv = req_data;
888 reqhead = req->r_request->front.iov_base;
889 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
891 strncpy(req->r_oid, obj, sizeof(req->r_oid));
892 req->r_oid_len = strlen(req->r_oid);
894 layout = &req->r_file_layout;
895 memset(layout, 0, sizeof(*layout));
896 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
897 layout->fl_stripe_count = cpu_to_le32(1);
898 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
899 layout->fl_pg_preferred = cpu_to_le32(-1);
900 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
901 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
902 ofs, &len, &bno, req, ops);
904 ceph_osdc_build_request(req, ofs, &len,
905 ops,
906 snapc,
907 &mtime,
908 req->r_oid, req->r_oid_len);
909 up_read(&header->snap_rwsem);
911 if (linger_req) {
912 ceph_osdc_set_request_linger(&dev->client->osdc, req);
913 *linger_req = req;
916 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
917 if (ret < 0)
918 goto done_err;
920 if (!rbd_cb) {
921 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
922 if (ver)
923 *ver = le64_to_cpu(req->r_reassert_version.version);
924 dout("reassert_ver=%lld\n",
925 le64_to_cpu(req->r_reassert_version.version));
926 ceph_osdc_put_request(req);
928 return ret;
930 done_err:
931 bio_chain_put(req_data->bio);
932 ceph_osdc_put_request(req);
933 done_pages:
934 rbd_coll_end_req(req_data, ret, len);
935 kfree(req_data);
936 return ret;
940 * Ceph osd op callback
942 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
944 struct rbd_request *req_data = req->r_priv;
945 struct ceph_osd_reply_head *replyhead;
946 struct ceph_osd_op *op;
947 __s32 rc;
948 u64 bytes;
949 int read_op;
951 /* parse reply */
952 replyhead = msg->front.iov_base;
953 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
954 op = (void *)(replyhead + 1);
955 rc = le32_to_cpu(replyhead->result);
956 bytes = le64_to_cpu(op->extent.length);
957 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
959 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
961 if (rc == -ENOENT && read_op) {
962 zero_bio_chain(req_data->bio, 0);
963 rc = 0;
964 } else if (rc == 0 && read_op && bytes < req_data->len) {
965 zero_bio_chain(req_data->bio, bytes);
966 bytes = req_data->len;
969 rbd_coll_end_req(req_data, rc, bytes);
971 if (req_data->bio)
972 bio_chain_put(req_data->bio);
974 ceph_osdc_put_request(req);
975 kfree(req_data);
978 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
980 ceph_osdc_put_request(req);
984 * Do a synchronous ceph osd operation
986 static int rbd_req_sync_op(struct rbd_device *dev,
987 struct ceph_snap_context *snapc,
988 u64 snapid,
989 int opcode,
990 int flags,
991 struct ceph_osd_req_op *orig_ops,
992 int num_reply,
993 const char *obj,
994 u64 ofs, u64 len,
995 char *buf,
996 struct ceph_osd_request **linger_req,
997 u64 *ver)
999 int ret;
1000 struct page **pages;
1001 int num_pages;
1002 struct ceph_osd_req_op *ops = orig_ops;
1003 u32 payload_len;
1005 num_pages = calc_pages_for(ofs , len);
1006 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1007 if (IS_ERR(pages))
1008 return PTR_ERR(pages);
1010 if (!orig_ops) {
1011 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1012 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1013 if (ret < 0)
1014 goto done;
1016 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1017 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1018 if (ret < 0)
1019 goto done_ops;
1023 ret = rbd_do_request(NULL, dev, snapc, snapid,
1024 obj, ofs, len, NULL,
1025 pages, num_pages,
1026 flags,
1027 ops,
1029 NULL, 0,
1030 NULL,
1031 linger_req, ver);
1032 if (ret < 0)
1033 goto done_ops;
1035 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1036 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1038 done_ops:
1039 if (!orig_ops)
1040 rbd_destroy_ops(ops);
1041 done:
1042 ceph_release_page_vector(pages, num_pages);
1043 return ret;
1047 * Do an asynchronous ceph osd operation
1049 static int rbd_do_op(struct request *rq,
1050 struct rbd_device *rbd_dev ,
1051 struct ceph_snap_context *snapc,
1052 u64 snapid,
1053 int opcode, int flags, int num_reply,
1054 u64 ofs, u64 len,
1055 struct bio *bio,
1056 struct rbd_req_coll *coll,
1057 int coll_index)
1059 char *seg_name;
1060 u64 seg_ofs;
1061 u64 seg_len;
1062 int ret;
1063 struct ceph_osd_req_op *ops;
1064 u32 payload_len;
1066 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1067 if (!seg_name)
1068 return -ENOMEM;
1070 seg_len = rbd_get_segment(&rbd_dev->header,
1071 rbd_dev->header.block_name,
1072 ofs, len,
1073 seg_name, &seg_ofs);
1075 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1077 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1078 if (ret < 0)
1079 goto done;
1081 /* we've taken care of segment sizes earlier when we
1082 cloned the bios. We should never have a segment
1083 truncated at this point */
1084 BUG_ON(seg_len < len);
1086 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1087 seg_name, seg_ofs, seg_len,
1088 bio,
1089 NULL, 0,
1090 flags,
1091 ops,
1092 num_reply,
1093 coll, coll_index,
1094 rbd_req_cb, 0, NULL);
1096 rbd_destroy_ops(ops);
1097 done:
1098 kfree(seg_name);
1099 return ret;
1103 * Request async osd write
1105 static int rbd_req_write(struct request *rq,
1106 struct rbd_device *rbd_dev,
1107 struct ceph_snap_context *snapc,
1108 u64 ofs, u64 len,
1109 struct bio *bio,
1110 struct rbd_req_coll *coll,
1111 int coll_index)
1113 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1114 CEPH_OSD_OP_WRITE,
1115 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1117 ofs, len, bio, coll, coll_index);
1121 * Request async osd read
1123 static int rbd_req_read(struct request *rq,
1124 struct rbd_device *rbd_dev,
1125 u64 snapid,
1126 u64 ofs, u64 len,
1127 struct bio *bio,
1128 struct rbd_req_coll *coll,
1129 int coll_index)
1131 return rbd_do_op(rq, rbd_dev, NULL,
1132 (snapid ? snapid : CEPH_NOSNAP),
1133 CEPH_OSD_OP_READ,
1134 CEPH_OSD_FLAG_READ,
1136 ofs, len, bio, coll, coll_index);
1140 * Request sync osd read
1142 static int rbd_req_sync_read(struct rbd_device *dev,
1143 struct ceph_snap_context *snapc,
1144 u64 snapid,
1145 const char *obj,
1146 u64 ofs, u64 len,
1147 char *buf,
1148 u64 *ver)
1150 return rbd_req_sync_op(dev, NULL,
1151 (snapid ? snapid : CEPH_NOSNAP),
1152 CEPH_OSD_OP_READ,
1153 CEPH_OSD_FLAG_READ,
1154 NULL,
1155 1, obj, ofs, len, buf, NULL, ver);
1159 * Request sync osd watch
1161 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1162 u64 ver,
1163 u64 notify_id,
1164 const char *obj)
1166 struct ceph_osd_req_op *ops;
1167 struct page **pages = NULL;
1168 int ret;
1170 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1171 if (ret < 0)
1172 return ret;
1174 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1175 ops[0].watch.cookie = notify_id;
1176 ops[0].watch.flag = 0;
1178 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1179 obj, 0, 0, NULL,
1180 pages, 0,
1181 CEPH_OSD_FLAG_READ,
1182 ops,
1184 NULL, 0,
1185 rbd_simple_req_cb, 0, NULL);
1187 rbd_destroy_ops(ops);
1188 return ret;
1191 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1193 struct rbd_device *dev = (struct rbd_device *)data;
1194 if (!dev)
1195 return;
1197 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1198 notify_id, (int)opcode);
1199 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1200 __rbd_update_snaps(dev);
1201 mutex_unlock(&ctl_mutex);
1203 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1207 * Request sync osd watch
1209 static int rbd_req_sync_watch(struct rbd_device *dev,
1210 const char *obj,
1211 u64 ver)
1213 struct ceph_osd_req_op *ops;
1214 struct ceph_osd_client *osdc = &dev->client->osdc;
1216 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1217 if (ret < 0)
1218 return ret;
1220 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1221 (void *)dev, &dev->watch_event);
1222 if (ret < 0)
1223 goto fail;
1225 ops[0].watch.ver = cpu_to_le64(ver);
1226 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1227 ops[0].watch.flag = 1;
1229 ret = rbd_req_sync_op(dev, NULL,
1230 CEPH_NOSNAP,
1232 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1233 ops,
1234 1, obj, 0, 0, NULL,
1235 &dev->watch_request, NULL);
1237 if (ret < 0)
1238 goto fail_event;
1240 rbd_destroy_ops(ops);
1241 return 0;
1243 fail_event:
1244 ceph_osdc_cancel_event(dev->watch_event);
1245 dev->watch_event = NULL;
1246 fail:
1247 rbd_destroy_ops(ops);
1248 return ret;
1251 struct rbd_notify_info {
1252 struct rbd_device *dev;
1255 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1257 struct rbd_device *dev = (struct rbd_device *)data;
1258 if (!dev)
1259 return;
1261 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1262 notify_id, (int)opcode);
1266 * Request sync osd notify
1268 static int rbd_req_sync_notify(struct rbd_device *dev,
1269 const char *obj)
1271 struct ceph_osd_req_op *ops;
1272 struct ceph_osd_client *osdc = &dev->client->osdc;
1273 struct ceph_osd_event *event;
1274 struct rbd_notify_info info;
1275 int payload_len = sizeof(u32) + sizeof(u32);
1276 int ret;
1278 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1279 if (ret < 0)
1280 return ret;
1282 info.dev = dev;
1284 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1285 (void *)&info, &event);
1286 if (ret < 0)
1287 goto fail;
1289 ops[0].watch.ver = 1;
1290 ops[0].watch.flag = 1;
1291 ops[0].watch.cookie = event->cookie;
1292 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1293 ops[0].watch.timeout = 12;
1295 ret = rbd_req_sync_op(dev, NULL,
1296 CEPH_NOSNAP,
1298 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299 ops,
1300 1, obj, 0, 0, NULL, NULL, NULL);
1301 if (ret < 0)
1302 goto fail_event;
1304 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1305 dout("ceph_osdc_wait_event returned %d\n", ret);
1306 rbd_destroy_ops(ops);
1307 return 0;
1309 fail_event:
1310 ceph_osdc_cancel_event(event);
1311 fail:
1312 rbd_destroy_ops(ops);
1313 return ret;
1317 * Request sync osd rollback
1319 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1320 u64 snapid,
1321 const char *obj)
1323 struct ceph_osd_req_op *ops;
1324 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1325 if (ret < 0)
1326 return ret;
1328 ops[0].snap.snapid = snapid;
1330 ret = rbd_req_sync_op(dev, NULL,
1331 CEPH_NOSNAP,
1333 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1334 ops,
1335 1, obj, 0, 0, NULL, NULL, NULL);
1337 rbd_destroy_ops(ops);
1339 return ret;
1343 * Request sync osd read
1345 static int rbd_req_sync_exec(struct rbd_device *dev,
1346 const char *obj,
1347 const char *cls,
1348 const char *method,
1349 const char *data,
1350 int len,
1351 u64 *ver)
1353 struct ceph_osd_req_op *ops;
1354 int cls_len = strlen(cls);
1355 int method_len = strlen(method);
1356 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1357 cls_len + method_len + len);
1358 if (ret < 0)
1359 return ret;
1361 ops[0].cls.class_name = cls;
1362 ops[0].cls.class_len = (__u8)cls_len;
1363 ops[0].cls.method_name = method;
1364 ops[0].cls.method_len = (__u8)method_len;
1365 ops[0].cls.argc = 0;
1366 ops[0].cls.indata = data;
1367 ops[0].cls.indata_len = len;
1369 ret = rbd_req_sync_op(dev, NULL,
1370 CEPH_NOSNAP,
1372 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1373 ops,
1374 1, obj, 0, 0, NULL, NULL, ver);
1376 rbd_destroy_ops(ops);
1378 dout("cls_exec returned %d\n", ret);
1379 return ret;
1382 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1384 struct rbd_req_coll *coll =
1385 kzalloc(sizeof(struct rbd_req_coll) +
1386 sizeof(struct rbd_req_status) * num_reqs,
1387 GFP_ATOMIC);
1389 if (!coll)
1390 return NULL;
1391 coll->total = num_reqs;
1392 kref_init(&coll->kref);
1393 return coll;
1397 * block device queue callback
1399 static void rbd_rq_fn(struct request_queue *q)
1401 struct rbd_device *rbd_dev = q->queuedata;
1402 struct request *rq;
1403 struct bio_pair *bp = NULL;
1405 rq = blk_fetch_request(q);
1407 while (1) {
1408 struct bio *bio;
1409 struct bio *rq_bio, *next_bio = NULL;
1410 bool do_write;
1411 int size, op_size = 0;
1412 u64 ofs;
1413 int num_segs, cur_seg = 0;
1414 struct rbd_req_coll *coll;
1416 /* peek at request from block layer */
1417 if (!rq)
1418 break;
1420 dout("fetched request\n");
1422 /* filter out block requests we don't understand */
1423 if ((rq->cmd_type != REQ_TYPE_FS)) {
1424 __blk_end_request_all(rq, 0);
1425 goto next;
1428 /* deduce our operation (read, write) */
1429 do_write = (rq_data_dir(rq) == WRITE);
1431 size = blk_rq_bytes(rq);
1432 ofs = blk_rq_pos(rq) * 512ULL;
1433 rq_bio = rq->bio;
1434 if (do_write && rbd_dev->read_only) {
1435 __blk_end_request_all(rq, -EROFS);
1436 goto next;
1439 spin_unlock_irq(q->queue_lock);
1441 dout("%s 0x%x bytes at 0x%llx\n",
1442 do_write ? "write" : "read",
1443 size, blk_rq_pos(rq) * 512ULL);
1445 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1446 coll = rbd_alloc_coll(num_segs);
1447 if (!coll) {
1448 spin_lock_irq(q->queue_lock);
1449 __blk_end_request_all(rq, -ENOMEM);
1450 goto next;
1453 do {
1454 /* a bio clone to be passed down to OSD req */
1455 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1456 op_size = rbd_get_segment(&rbd_dev->header,
1457 rbd_dev->header.block_name,
1458 ofs, size,
1459 NULL, NULL);
1460 kref_get(&coll->kref);
1461 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1462 op_size, GFP_ATOMIC);
1463 if (!bio) {
1464 rbd_coll_end_req_index(rq, coll, cur_seg,
1465 -ENOMEM, op_size);
1466 goto next_seg;
1470 /* init OSD command: write or read */
1471 if (do_write)
1472 rbd_req_write(rq, rbd_dev,
1473 rbd_dev->header.snapc,
1474 ofs,
1475 op_size, bio,
1476 coll, cur_seg);
1477 else
1478 rbd_req_read(rq, rbd_dev,
1479 cur_snap_id(rbd_dev),
1480 ofs,
1481 op_size, bio,
1482 coll, cur_seg);
1484 next_seg:
1485 size -= op_size;
1486 ofs += op_size;
1488 cur_seg++;
1489 rq_bio = next_bio;
1490 } while (size > 0);
1491 kref_put(&coll->kref, rbd_coll_release);
1493 if (bp)
1494 bio_pair_release(bp);
1495 spin_lock_irq(q->queue_lock);
1496 next:
1497 rq = blk_fetch_request(q);
1502 * a queue callback. Makes sure that we don't create a bio that spans across
1503 * multiple osd objects. One exception would be with a single page bios,
1504 * which we handle later at bio_chain_clone
1506 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1507 struct bio_vec *bvec)
1509 struct rbd_device *rbd_dev = q->queuedata;
1510 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1511 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1512 unsigned int bio_sectors = bmd->bi_size >> 9;
1513 int max;
1515 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1516 + bio_sectors)) << 9;
1517 if (max < 0)
1518 max = 0; /* bio_add cannot handle a negative return */
1519 if (max <= bvec->bv_len && bio_sectors == 0)
1520 return bvec->bv_len;
1521 return max;
1524 static void rbd_free_disk(struct rbd_device *rbd_dev)
1526 struct gendisk *disk = rbd_dev->disk;
1528 if (!disk)
1529 return;
1531 rbd_header_free(&rbd_dev->header);
1533 if (disk->flags & GENHD_FL_UP)
1534 del_gendisk(disk);
1535 if (disk->queue)
1536 blk_cleanup_queue(disk->queue);
1537 put_disk(disk);
1541 * reload the ondisk the header
1543 static int rbd_read_header(struct rbd_device *rbd_dev,
1544 struct rbd_image_header *header)
1546 ssize_t rc;
1547 struct rbd_image_header_ondisk *dh;
1548 int snap_count = 0;
1549 u64 snap_names_len = 0;
1550 u64 ver;
1552 while (1) {
1553 int len = sizeof(*dh) +
1554 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1555 snap_names_len;
1557 rc = -ENOMEM;
1558 dh = kmalloc(len, GFP_KERNEL);
1559 if (!dh)
1560 return -ENOMEM;
1562 rc = rbd_req_sync_read(rbd_dev,
1563 NULL, CEPH_NOSNAP,
1564 rbd_dev->obj_md_name,
1565 0, len,
1566 (char *)dh, &ver);
1567 if (rc < 0)
1568 goto out_dh;
1570 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1571 if (rc < 0)
1572 goto out_dh;
1574 if (snap_count != header->total_snaps) {
1575 snap_count = header->total_snaps;
1576 snap_names_len = header->snap_names_len;
1577 rbd_header_free(header);
1578 kfree(dh);
1579 continue;
1581 break;
1583 header->obj_version = ver;
1585 out_dh:
1586 kfree(dh);
1587 return rc;
1591 * create a snapshot
1593 static int rbd_header_add_snap(struct rbd_device *dev,
1594 const char *snap_name,
1595 gfp_t gfp_flags)
1597 int name_len = strlen(snap_name);
1598 u64 new_snapid;
1599 int ret;
1600 void *data, *data_start, *data_end;
1601 u64 ver;
1603 /* we should create a snapshot only if we're pointing at the head */
1604 if (dev->cur_snap)
1605 return -EINVAL;
1607 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1608 &new_snapid);
1609 dout("created snapid=%lld\n", new_snapid);
1610 if (ret < 0)
1611 return ret;
1613 data = kmalloc(name_len + 16, gfp_flags);
1614 if (!data)
1615 return -ENOMEM;
1617 data_start = data;
1618 data_end = data + name_len + 16;
1620 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1621 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1623 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1624 data_start, data - data_start, &ver);
1626 kfree(data_start);
1628 if (ret < 0)
1629 return ret;
1631 dev->header.snapc->seq = new_snapid;
1633 return 0;
1634 bad:
1635 return -ERANGE;
1638 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1640 struct rbd_snap *snap;
1642 while (!list_empty(&rbd_dev->snaps)) {
1643 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1644 __rbd_remove_snap_dev(rbd_dev, snap);
1649 * only read the first part of the ondisk header, without the snaps info
1651 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1653 int ret;
1654 struct rbd_image_header h;
1655 u64 snap_seq;
1656 int follow_seq = 0;
1658 ret = rbd_read_header(rbd_dev, &h);
1659 if (ret < 0)
1660 return ret;
1662 down_write(&rbd_dev->header.snap_rwsem);
1664 snap_seq = rbd_dev->header.snapc->seq;
1665 if (rbd_dev->header.total_snaps &&
1666 rbd_dev->header.snapc->snaps[0] == snap_seq)
1667 /* pointing at the head, will need to follow that
1668 if head moves */
1669 follow_seq = 1;
1671 kfree(rbd_dev->header.snapc);
1672 kfree(rbd_dev->header.snap_names);
1673 kfree(rbd_dev->header.snap_sizes);
1675 rbd_dev->header.total_snaps = h.total_snaps;
1676 rbd_dev->header.snapc = h.snapc;
1677 rbd_dev->header.snap_names = h.snap_names;
1678 rbd_dev->header.snap_names_len = h.snap_names_len;
1679 rbd_dev->header.snap_sizes = h.snap_sizes;
1680 if (follow_seq)
1681 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1682 else
1683 rbd_dev->header.snapc->seq = snap_seq;
1685 ret = __rbd_init_snaps_header(rbd_dev);
1687 up_write(&rbd_dev->header.snap_rwsem);
1689 return ret;
1692 static int rbd_init_disk(struct rbd_device *rbd_dev)
1694 struct gendisk *disk;
1695 struct request_queue *q;
1696 int rc;
1697 u64 total_size = 0;
1699 /* contact OSD, request size info about the object being mapped */
1700 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1701 if (rc)
1702 return rc;
1704 /* no need to lock here, as rbd_dev is not registered yet */
1705 rc = __rbd_init_snaps_header(rbd_dev);
1706 if (rc)
1707 return rc;
1709 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1710 if (rc)
1711 return rc;
1713 /* create gendisk info */
1714 rc = -ENOMEM;
1715 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1716 if (!disk)
1717 goto out;
1719 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1720 disk->major = rbd_dev->major;
1721 disk->first_minor = 0;
1722 disk->fops = &rbd_bd_ops;
1723 disk->private_data = rbd_dev;
1725 /* init rq */
1726 rc = -ENOMEM;
1727 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1728 if (!q)
1729 goto out_disk;
1730 blk_queue_merge_bvec(q, rbd_merge_bvec);
1731 disk->queue = q;
1733 q->queuedata = rbd_dev;
1735 rbd_dev->disk = disk;
1736 rbd_dev->q = q;
1738 /* finally, announce the disk to the world */
1739 set_capacity(disk, total_size / 512ULL);
1740 add_disk(disk);
1742 pr_info("%s: added with size 0x%llx\n",
1743 disk->disk_name, (unsigned long long)total_size);
1744 return 0;
1746 out_disk:
1747 put_disk(disk);
1748 out:
1749 return rc;
1753 sysfs
1756 static ssize_t rbd_size_show(struct device *dev,
1757 struct device_attribute *attr, char *buf)
1759 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1761 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1764 static ssize_t rbd_major_show(struct device *dev,
1765 struct device_attribute *attr, char *buf)
1767 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1769 return sprintf(buf, "%d\n", rbd_dev->major);
1772 static ssize_t rbd_client_id_show(struct device *dev,
1773 struct device_attribute *attr, char *buf)
1775 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1777 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1780 static ssize_t rbd_pool_show(struct device *dev,
1781 struct device_attribute *attr, char *buf)
1783 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1785 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1788 static ssize_t rbd_name_show(struct device *dev,
1789 struct device_attribute *attr, char *buf)
1791 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1793 return sprintf(buf, "%s\n", rbd_dev->obj);
1796 static ssize_t rbd_snap_show(struct device *dev,
1797 struct device_attribute *attr,
1798 char *buf)
1800 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1802 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1805 static ssize_t rbd_image_refresh(struct device *dev,
1806 struct device_attribute *attr,
1807 const char *buf,
1808 size_t size)
1810 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1811 int rc;
1812 int ret = size;
1814 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1816 rc = __rbd_update_snaps(rbd_dev);
1817 if (rc < 0)
1818 ret = rc;
1820 mutex_unlock(&ctl_mutex);
1821 return ret;
1824 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1825 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1826 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1827 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1828 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1829 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1830 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1831 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1832 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1834 static struct attribute *rbd_attrs[] = {
1835 &dev_attr_size.attr,
1836 &dev_attr_major.attr,
1837 &dev_attr_client_id.attr,
1838 &dev_attr_pool.attr,
1839 &dev_attr_name.attr,
1840 &dev_attr_current_snap.attr,
1841 &dev_attr_refresh.attr,
1842 &dev_attr_create_snap.attr,
1843 &dev_attr_rollback_snap.attr,
1844 NULL
1847 static struct attribute_group rbd_attr_group = {
1848 .attrs = rbd_attrs,
1851 static const struct attribute_group *rbd_attr_groups[] = {
1852 &rbd_attr_group,
1853 NULL
1856 static void rbd_sysfs_dev_release(struct device *dev)
1860 static struct device_type rbd_device_type = {
1861 .name = "rbd",
1862 .groups = rbd_attr_groups,
1863 .release = rbd_sysfs_dev_release,
1868 sysfs - snapshots
1871 static ssize_t rbd_snap_size_show(struct device *dev,
1872 struct device_attribute *attr,
1873 char *buf)
1875 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1877 return sprintf(buf, "%lld\n", (long long)snap->size);
1880 static ssize_t rbd_snap_id_show(struct device *dev,
1881 struct device_attribute *attr,
1882 char *buf)
1884 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1886 return sprintf(buf, "%lld\n", (long long)snap->id);
1889 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1890 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1892 static struct attribute *rbd_snap_attrs[] = {
1893 &dev_attr_snap_size.attr,
1894 &dev_attr_snap_id.attr,
1895 NULL,
1898 static struct attribute_group rbd_snap_attr_group = {
1899 .attrs = rbd_snap_attrs,
1902 static void rbd_snap_dev_release(struct device *dev)
1904 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1905 kfree(snap->name);
1906 kfree(snap);
1909 static const struct attribute_group *rbd_snap_attr_groups[] = {
1910 &rbd_snap_attr_group,
1911 NULL
1914 static struct device_type rbd_snap_device_type = {
1915 .groups = rbd_snap_attr_groups,
1916 .release = rbd_snap_dev_release,
1919 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1920 struct rbd_snap *snap)
1922 list_del(&snap->node);
1923 device_unregister(&snap->dev);
1926 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1927 struct rbd_snap *snap,
1928 struct device *parent)
1930 struct device *dev = &snap->dev;
1931 int ret;
1933 dev->type = &rbd_snap_device_type;
1934 dev->parent = parent;
1935 dev->release = rbd_snap_dev_release;
1936 dev_set_name(dev, "snap_%s", snap->name);
1937 ret = device_register(dev);
1939 return ret;
1942 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1943 int i, const char *name,
1944 struct rbd_snap **snapp)
1946 int ret;
1947 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1948 if (!snap)
1949 return -ENOMEM;
1950 snap->name = kstrdup(name, GFP_KERNEL);
1951 snap->size = rbd_dev->header.snap_sizes[i];
1952 snap->id = rbd_dev->header.snapc->snaps[i];
1953 if (device_is_registered(&rbd_dev->dev)) {
1954 ret = rbd_register_snap_dev(rbd_dev, snap,
1955 &rbd_dev->dev);
1956 if (ret < 0)
1957 goto err;
1959 *snapp = snap;
1960 return 0;
1961 err:
1962 kfree(snap->name);
1963 kfree(snap);
1964 return ret;
1968 * search for the previous snap in a null delimited string list
1970 const char *rbd_prev_snap_name(const char *name, const char *start)
1972 if (name < start + 2)
1973 return NULL;
1975 name -= 2;
1976 while (*name) {
1977 if (name == start)
1978 return start;
1979 name--;
1981 return name + 1;
1985 * compare the old list of snapshots that we have to what's in the header
1986 * and update it accordingly. Note that the header holds the snapshots
1987 * in a reverse order (from newest to oldest) and we need to go from
1988 * older to new so that we don't get a duplicate snap name when
1989 * doing the process (e.g., removed snapshot and recreated a new
1990 * one with the same name.
1992 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1994 const char *name, *first_name;
1995 int i = rbd_dev->header.total_snaps;
1996 struct rbd_snap *snap, *old_snap = NULL;
1997 int ret;
1998 struct list_head *p, *n;
2000 first_name = rbd_dev->header.snap_names;
2001 name = first_name + rbd_dev->header.snap_names_len;
2003 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2004 u64 cur_id;
2006 old_snap = list_entry(p, struct rbd_snap, node);
2008 if (i)
2009 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2011 if (!i || old_snap->id < cur_id) {
2012 /* old_snap->id was skipped, thus was removed */
2013 __rbd_remove_snap_dev(rbd_dev, old_snap);
2014 continue;
2016 if (old_snap->id == cur_id) {
2017 /* we have this snapshot already */
2018 i--;
2019 name = rbd_prev_snap_name(name, first_name);
2020 continue;
2022 for (; i > 0;
2023 i--, name = rbd_prev_snap_name(name, first_name)) {
2024 if (!name) {
2025 WARN_ON(1);
2026 return -EINVAL;
2028 cur_id = rbd_dev->header.snapc->snaps[i];
2029 /* snapshot removal? handle it above */
2030 if (cur_id >= old_snap->id)
2031 break;
2032 /* a new snapshot */
2033 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2034 if (ret < 0)
2035 return ret;
2037 /* note that we add it backward so using n and not p */
2038 list_add(&snap->node, n);
2039 p = &snap->node;
2042 /* we're done going over the old snap list, just add what's left */
2043 for (; i > 0; i--) {
2044 name = rbd_prev_snap_name(name, first_name);
2045 if (!name) {
2046 WARN_ON(1);
2047 return -EINVAL;
2049 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2050 if (ret < 0)
2051 return ret;
2052 list_add(&snap->node, &rbd_dev->snaps);
2055 return 0;
2059 static void rbd_root_dev_release(struct device *dev)
2063 static struct device rbd_root_dev = {
2064 .init_name = "rbd",
2065 .release = rbd_root_dev_release,
2068 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2070 int ret = -ENOMEM;
2071 struct device *dev;
2072 struct rbd_snap *snap;
2074 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2075 dev = &rbd_dev->dev;
2077 dev->bus = &rbd_bus_type;
2078 dev->type = &rbd_device_type;
2079 dev->parent = &rbd_root_dev;
2080 dev->release = rbd_dev_release;
2081 dev_set_name(dev, "%d", rbd_dev->id);
2082 ret = device_register(dev);
2083 if (ret < 0)
2084 goto done_free;
2086 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2087 ret = rbd_register_snap_dev(rbd_dev, snap,
2088 &rbd_dev->dev);
2089 if (ret < 0)
2090 break;
2093 mutex_unlock(&ctl_mutex);
2094 return 0;
2095 done_free:
2096 mutex_unlock(&ctl_mutex);
2097 return ret;
2100 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2102 device_unregister(&rbd_dev->dev);
2105 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2107 int ret, rc;
2109 do {
2110 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2111 rbd_dev->header.obj_version);
2112 if (ret == -ERANGE) {
2113 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2114 rc = __rbd_update_snaps(rbd_dev);
2115 mutex_unlock(&ctl_mutex);
2116 if (rc < 0)
2117 return rc;
2119 } while (ret == -ERANGE);
2121 return ret;
2124 static ssize_t rbd_add(struct bus_type *bus,
2125 const char *buf,
2126 size_t count)
2128 struct ceph_osd_client *osdc;
2129 struct rbd_device *rbd_dev;
2130 ssize_t rc = -ENOMEM;
2131 int irc, new_id = 0;
2132 struct list_head *tmp;
2133 char *mon_dev_name;
2134 char *options;
2136 if (!try_module_get(THIS_MODULE))
2137 return -ENODEV;
2139 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2140 if (!mon_dev_name)
2141 goto err_out_mod;
2143 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2144 if (!options)
2145 goto err_mon_dev;
2147 /* new rbd_device object */
2148 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2149 if (!rbd_dev)
2150 goto err_out_opt;
2152 /* static rbd_device initialization */
2153 spin_lock_init(&rbd_dev->lock);
2154 INIT_LIST_HEAD(&rbd_dev->node);
2155 INIT_LIST_HEAD(&rbd_dev->snaps);
2157 /* generate unique id: find highest unique id, add one */
2158 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2160 list_for_each(tmp, &rbd_dev_list) {
2161 struct rbd_device *rbd_dev;
2163 rbd_dev = list_entry(tmp, struct rbd_device, node);
2164 if (rbd_dev->id >= new_id)
2165 new_id = rbd_dev->id + 1;
2168 rbd_dev->id = new_id;
2170 /* add to global list */
2171 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2173 /* parse add command */
2174 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2175 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2176 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2177 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2178 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2179 mon_dev_name, options, rbd_dev->pool_name,
2180 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2181 rc = -EINVAL;
2182 goto err_out_slot;
2185 if (rbd_dev->snap_name[0] == 0)
2186 rbd_dev->snap_name[0] = '-';
2188 rbd_dev->obj_len = strlen(rbd_dev->obj);
2189 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2190 rbd_dev->obj, RBD_SUFFIX);
2192 /* initialize rest of new object */
2193 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2194 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2195 if (rc < 0)
2196 goto err_out_slot;
2198 mutex_unlock(&ctl_mutex);
2200 /* pick the pool */
2201 osdc = &rbd_dev->client->osdc;
2202 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2203 if (rc < 0)
2204 goto err_out_client;
2205 rbd_dev->poolid = rc;
2207 /* register our block device */
2208 irc = register_blkdev(0, rbd_dev->name);
2209 if (irc < 0) {
2210 rc = irc;
2211 goto err_out_client;
2213 rbd_dev->major = irc;
2215 rc = rbd_bus_add_dev(rbd_dev);
2216 if (rc)
2217 goto err_out_blkdev;
2219 /* set up and announce blkdev mapping */
2220 rc = rbd_init_disk(rbd_dev);
2221 if (rc)
2222 goto err_out_bus;
2224 rc = rbd_init_watch_dev(rbd_dev);
2225 if (rc)
2226 goto err_out_bus;
2228 return count;
2230 err_out_bus:
2231 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2232 list_del_init(&rbd_dev->node);
2233 mutex_unlock(&ctl_mutex);
2235 /* this will also clean up rest of rbd_dev stuff */
2237 rbd_bus_del_dev(rbd_dev);
2238 kfree(options);
2239 kfree(mon_dev_name);
2240 return rc;
2242 err_out_blkdev:
2243 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2244 err_out_client:
2245 rbd_put_client(rbd_dev);
2246 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2247 err_out_slot:
2248 list_del_init(&rbd_dev->node);
2249 mutex_unlock(&ctl_mutex);
2251 kfree(rbd_dev);
2252 err_out_opt:
2253 kfree(options);
2254 err_mon_dev:
2255 kfree(mon_dev_name);
2256 err_out_mod:
2257 dout("Error adding device %s\n", buf);
2258 module_put(THIS_MODULE);
2259 return rc;
2262 static struct rbd_device *__rbd_get_dev(unsigned long id)
2264 struct list_head *tmp;
2265 struct rbd_device *rbd_dev;
2267 list_for_each(tmp, &rbd_dev_list) {
2268 rbd_dev = list_entry(tmp, struct rbd_device, node);
2269 if (rbd_dev->id == id)
2270 return rbd_dev;
2272 return NULL;
2275 static void rbd_dev_release(struct device *dev)
2277 struct rbd_device *rbd_dev =
2278 container_of(dev, struct rbd_device, dev);
2280 if (rbd_dev->watch_request)
2281 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2282 rbd_dev->watch_request);
2283 if (rbd_dev->watch_event)
2284 ceph_osdc_cancel_event(rbd_dev->watch_event);
2286 rbd_put_client(rbd_dev);
2288 /* clean up and free blkdev */
2289 rbd_free_disk(rbd_dev);
2290 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2291 kfree(rbd_dev);
2293 /* release module ref */
2294 module_put(THIS_MODULE);
2297 static ssize_t rbd_remove(struct bus_type *bus,
2298 const char *buf,
2299 size_t count)
2301 struct rbd_device *rbd_dev = NULL;
2302 int target_id, rc;
2303 unsigned long ul;
2304 int ret = count;
2306 rc = strict_strtoul(buf, 10, &ul);
2307 if (rc)
2308 return rc;
2310 /* convert to int; abort if we lost anything in the conversion */
2311 target_id = (int) ul;
2312 if (target_id != ul)
2313 return -EINVAL;
2315 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2317 rbd_dev = __rbd_get_dev(target_id);
2318 if (!rbd_dev) {
2319 ret = -ENOENT;
2320 goto done;
2323 list_del_init(&rbd_dev->node);
2325 __rbd_remove_all_snaps(rbd_dev);
2326 rbd_bus_del_dev(rbd_dev);
2328 done:
2329 mutex_unlock(&ctl_mutex);
2330 return ret;
2333 static ssize_t rbd_snap_add(struct device *dev,
2334 struct device_attribute *attr,
2335 const char *buf,
2336 size_t count)
2338 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2339 int ret;
2340 char *name = kmalloc(count + 1, GFP_KERNEL);
2341 if (!name)
2342 return -ENOMEM;
2344 snprintf(name, count, "%s", buf);
2346 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2348 ret = rbd_header_add_snap(rbd_dev,
2349 name, GFP_KERNEL);
2350 if (ret < 0)
2351 goto err_unlock;
2353 ret = __rbd_update_snaps(rbd_dev);
2354 if (ret < 0)
2355 goto err_unlock;
2357 /* shouldn't hold ctl_mutex when notifying.. notify might
2358 trigger a watch callback that would need to get that mutex */
2359 mutex_unlock(&ctl_mutex);
2361 /* make a best effort, don't error if failed */
2362 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2364 ret = count;
2365 kfree(name);
2366 return ret;
2368 err_unlock:
2369 mutex_unlock(&ctl_mutex);
2370 kfree(name);
2371 return ret;
2374 static ssize_t rbd_snap_rollback(struct device *dev,
2375 struct device_attribute *attr,
2376 const char *buf,
2377 size_t count)
2379 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2380 int ret;
2381 u64 snapid;
2382 u64 cur_ofs;
2383 char *seg_name = NULL;
2384 char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2385 ret = -ENOMEM;
2386 if (!snap_name)
2387 return ret;
2389 /* parse snaps add command */
2390 snprintf(snap_name, count, "%s", buf);
2391 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2392 if (!seg_name)
2393 goto done;
2395 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2397 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2398 if (ret < 0)
2399 goto done_unlock;
2401 dout("snapid=%lld\n", snapid);
2403 cur_ofs = 0;
2404 while (cur_ofs < rbd_dev->header.image_size) {
2405 cur_ofs += rbd_get_segment(&rbd_dev->header,
2406 rbd_dev->obj,
2407 cur_ofs, (u64)-1,
2408 seg_name, NULL);
2409 dout("seg_name=%s\n", seg_name);
2411 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2412 if (ret < 0)
2413 pr_warning("could not roll back obj %s err=%d\n",
2414 seg_name, ret);
2417 ret = __rbd_update_snaps(rbd_dev);
2418 if (ret < 0)
2419 goto done_unlock;
2421 ret = count;
2423 done_unlock:
2424 mutex_unlock(&ctl_mutex);
2425 done:
2426 kfree(seg_name);
2427 kfree(snap_name);
2429 return ret;
2432 static struct bus_attribute rbd_bus_attrs[] = {
2433 __ATTR(add, S_IWUSR, NULL, rbd_add),
2434 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2435 __ATTR_NULL
2439 * create control files in sysfs
2440 * /sys/bus/rbd/...
2442 static int rbd_sysfs_init(void)
2444 int ret;
2446 rbd_bus_type.bus_attrs = rbd_bus_attrs;
2448 ret = bus_register(&rbd_bus_type);
2449 if (ret < 0)
2450 return ret;
2452 ret = device_register(&rbd_root_dev);
2454 return ret;
2457 static void rbd_sysfs_cleanup(void)
2459 device_unregister(&rbd_root_dev);
2460 bus_unregister(&rbd_bus_type);
2463 int __init rbd_init(void)
2465 int rc;
2467 rc = rbd_sysfs_init();
2468 if (rc)
2469 return rc;
2470 spin_lock_init(&node_lock);
2471 pr_info("loaded " DRV_NAME_LONG "\n");
2472 return 0;
2475 void __exit rbd_exit(void)
2477 rbd_sysfs_cleanup();
2480 module_init(rbd_init);
2481 module_exit(rbd_exit);
2483 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2484 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2485 MODULE_DESCRIPTION("rados block device");
2487 /* following authorship retained from original osdblk.c */
2488 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2490 MODULE_LICENSE("GPL");