Linux 3.4.102
[linux/fpc-iii.git] / drivers / block / rbd.c
blobcba3d0278b86a141d0707b64234c68d2ec5a47c4
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59 #define RBD_MAX_POOL_NAME_LEN 64
60 #define RBD_MAX_SNAP_NAME_LEN 32
61 #define RBD_MAX_OPT_LEN 1024
63 #define RBD_SNAP_HEAD_NAME "-"
66 * An RBD device name will be "rbd#", where the "rbd" comes from
67 * RBD_DRV_NAME above, and # is a unique integer identifier.
68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69 * enough to hold all possible device names.
71 #define DEV_NAME_LEN 32
72 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
74 #define RBD_READ_ONLY_DEFAULT false
77 * block device image metadata (in-memory version)
79 struct rbd_image_header {
80 u64 image_size;
81 char block_name[32];
82 __u8 obj_order;
83 __u8 crypt_type;
84 __u8 comp_type;
85 struct ceph_snap_context *snapc;
86 size_t snap_names_len;
87 u64 snap_seq;
88 u32 total_snaps;
90 char *snap_names;
91 u64 *snap_sizes;
93 u64 obj_version;
96 struct rbd_options {
97 bool read_only;
101 * an instance of the client. multiple devices may share an rbd client.
103 struct rbd_client {
104 struct ceph_client *client;
105 struct rbd_options *rbd_opts;
106 struct kref kref;
107 struct list_head node;
111 * a request completion status
113 struct rbd_req_status {
114 int done;
115 int rc;
116 u64 bytes;
120 * a collection of requests
122 struct rbd_req_coll {
123 int total;
124 int num_done;
125 struct kref kref;
126 struct rbd_req_status status[0];
130 * a single io request
132 struct rbd_request {
133 struct request *rq; /* blk layer request */
134 struct bio *bio; /* cloned bio */
135 struct page **pages; /* list of used pages */
136 u64 len;
137 int coll_index;
138 struct rbd_req_coll *coll;
141 struct rbd_snap {
142 struct device dev;
143 const char *name;
144 size_t size;
145 struct list_head node;
146 u64 id;
150 * a single device
152 struct rbd_device {
153 int id; /* blkdev unique id */
155 int major; /* blkdev assigned major */
156 struct gendisk *disk; /* blkdev's gendisk and rq */
157 struct request_queue *q;
159 struct rbd_client *rbd_client;
161 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
163 spinlock_t lock; /* queue lock */
165 struct rbd_image_header header;
166 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
167 int obj_len;
168 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
169 char pool_name[RBD_MAX_POOL_NAME_LEN];
170 int poolid;
172 struct ceph_osd_event *watch_event;
173 struct ceph_osd_request *watch_request;
175 /* protects updating the header */
176 struct rw_semaphore header_rwsem;
177 /* name of the snapshot this device reads from */
178 char snap_name[RBD_MAX_SNAP_NAME_LEN];
179 /* id of the snapshot this device reads from */
180 u64 snap_id; /* current snapshot id */
181 /* whether the snap_id this device reads from still exists */
182 bool snap_exists;
183 bool read_only;
185 struct list_head node;
187 /* list of snapshots */
188 struct list_head snaps;
190 /* sysfs related */
191 struct device dev;
192 unsigned long open_count;
195 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
197 static LIST_HEAD(rbd_dev_list); /* devices */
198 static DEFINE_SPINLOCK(rbd_dev_list_lock);
200 static LIST_HEAD(rbd_client_list); /* clients */
201 static DEFINE_SPINLOCK(rbd_client_list_lock);
203 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
204 static void rbd_dev_release(struct device *dev);
205 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
206 struct rbd_snap *snap);
208 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 size_t count);
210 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 size_t count);
213 static struct bus_attribute rbd_bus_attrs[] = {
214 __ATTR(add, S_IWUSR, NULL, rbd_add),
215 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 __ATTR_NULL
219 static struct bus_type rbd_bus_type = {
220 .name = "rbd",
221 .bus_attrs = rbd_bus_attrs,
224 static void rbd_root_dev_release(struct device *dev)
228 static struct device rbd_root_dev = {
229 .init_name = "rbd",
230 .release = rbd_root_dev_release,
234 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
236 return get_device(&rbd_dev->dev);
239 static void rbd_put_dev(struct rbd_device *rbd_dev)
241 put_device(&rbd_dev->dev);
244 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
246 static int rbd_open(struct block_device *bdev, fmode_t mode)
248 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
250 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
251 return -EROFS;
253 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
254 rbd_get_dev(rbd_dev);
255 set_device_ro(bdev, rbd_dev->read_only);
256 rbd_dev->open_count++;
257 mutex_unlock(&ctl_mutex);
259 return 0;
262 static int rbd_release(struct gendisk *disk, fmode_t mode)
264 struct rbd_device *rbd_dev = disk->private_data;
266 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
267 BUG_ON(!rbd_dev->open_count);
268 rbd_dev->open_count--;
269 rbd_put_dev(rbd_dev);
270 mutex_unlock(&ctl_mutex);
272 return 0;
275 static const struct block_device_operations rbd_bd_ops = {
276 .owner = THIS_MODULE,
277 .open = rbd_open,
278 .release = rbd_release,
282 * Initialize an rbd client instance.
283 * We own *opt.
285 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
286 struct rbd_options *rbd_opts)
288 struct rbd_client *rbdc;
289 int ret = -ENOMEM;
291 dout("rbd_client_create\n");
292 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
293 if (!rbdc)
294 goto out_opt;
296 kref_init(&rbdc->kref);
297 INIT_LIST_HEAD(&rbdc->node);
299 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
301 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
302 if (IS_ERR(rbdc->client))
303 goto out_mutex;
304 opt = NULL; /* Now rbdc->client is responsible for opt */
306 ret = ceph_open_session(rbdc->client);
307 if (ret < 0)
308 goto out_err;
310 rbdc->rbd_opts = rbd_opts;
312 spin_lock(&rbd_client_list_lock);
313 list_add_tail(&rbdc->node, &rbd_client_list);
314 spin_unlock(&rbd_client_list_lock);
316 mutex_unlock(&ctl_mutex);
318 dout("rbd_client_create created %p\n", rbdc);
319 return rbdc;
321 out_err:
322 ceph_destroy_client(rbdc->client);
323 out_mutex:
324 mutex_unlock(&ctl_mutex);
325 kfree(rbdc);
326 out_opt:
327 if (opt)
328 ceph_destroy_options(opt);
329 return ERR_PTR(ret);
333 * Find a ceph client with specific addr and configuration.
335 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
337 struct rbd_client *client_node;
339 if (opt->flags & CEPH_OPT_NOSHARE)
340 return NULL;
342 list_for_each_entry(client_node, &rbd_client_list, node)
343 if (ceph_compare_options(opt, client_node->client) == 0)
344 return client_node;
345 return NULL;
349 * mount options
351 enum {
352 Opt_last_int,
353 /* int args above */
354 Opt_last_string,
355 /* string args above */
356 Opt_read_only,
357 Opt_read_write,
358 /* Boolean args above */
359 Opt_last_bool,
362 static match_table_t rbdopt_tokens = {
363 /* int args above */
364 /* string args above */
365 {Opt_read_only, "read_only"},
366 {Opt_read_only, "ro"}, /* Alternate spelling */
367 {Opt_read_write, "read_write"},
368 {Opt_read_write, "rw"}, /* Alternate spelling */
369 /* Boolean args above */
370 {-1, NULL}
373 static int parse_rbd_opts_token(char *c, void *private)
375 struct rbd_options *rbdopt = private;
376 substring_t argstr[MAX_OPT_ARGS];
377 int token, intval, ret;
379 token = match_token(c, rbdopt_tokens, argstr);
380 if (token < 0)
381 return -EINVAL;
383 if (token < Opt_last_int) {
384 ret = match_int(&argstr[0], &intval);
385 if (ret < 0) {
386 pr_err("bad mount option arg (not int) "
387 "at '%s'\n", c);
388 return ret;
390 dout("got int token %d val %d\n", token, intval);
391 } else if (token > Opt_last_int && token < Opt_last_string) {
392 dout("got string token %d val %s\n", token,
393 argstr[0].from);
394 } else if (token > Opt_last_string && token < Opt_last_bool) {
395 dout("got Boolean token %d\n", token);
396 } else {
397 dout("got token %d\n", token);
400 switch (token) {
401 case Opt_read_only:
402 rbdopt->read_only = true;
403 break;
404 case Opt_read_write:
405 rbdopt->read_only = false;
406 break;
407 default:
408 BUG_ON(token);
410 return 0;
414 * Get a ceph client with specific addr and configuration, if one does
415 * not exist create it.
417 static struct rbd_client *rbd_get_client(const char *mon_addr,
418 size_t mon_addr_len,
419 char *options)
421 struct rbd_client *rbdc;
422 struct ceph_options *opt;
423 struct rbd_options *rbd_opts;
425 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
426 if (!rbd_opts)
427 return ERR_PTR(-ENOMEM);
429 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
431 opt = ceph_parse_options(options, mon_addr,
432 mon_addr + mon_addr_len,
433 parse_rbd_opts_token, rbd_opts);
434 if (IS_ERR(opt)) {
435 kfree(rbd_opts);
436 return ERR_CAST(opt);
439 spin_lock(&rbd_client_list_lock);
440 rbdc = __rbd_client_find(opt);
441 if (rbdc) {
442 /* using an existing client */
443 kref_get(&rbdc->kref);
444 spin_unlock(&rbd_client_list_lock);
446 ceph_destroy_options(opt);
447 kfree(rbd_opts);
449 return rbdc;
451 spin_unlock(&rbd_client_list_lock);
453 rbdc = rbd_client_create(opt, rbd_opts);
455 if (IS_ERR(rbdc))
456 kfree(rbd_opts);
458 return rbdc;
462 * Destroy ceph client
464 * Caller must hold rbd_client_list_lock.
466 static void rbd_client_release(struct kref *kref)
468 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
470 dout("rbd_release_client %p\n", rbdc);
471 spin_lock(&rbd_client_list_lock);
472 list_del(&rbdc->node);
473 spin_unlock(&rbd_client_list_lock);
475 ceph_destroy_client(rbdc->client);
476 kfree(rbdc->rbd_opts);
477 kfree(rbdc);
481 * Drop reference to ceph client node. If it's not referenced anymore, release
482 * it.
484 static void rbd_put_client(struct rbd_device *rbd_dev)
486 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
487 rbd_dev->rbd_client = NULL;
491 * Destroy requests collection
493 static void rbd_coll_release(struct kref *kref)
495 struct rbd_req_coll *coll =
496 container_of(kref, struct rbd_req_coll, kref);
498 dout("rbd_coll_release %p\n", coll);
499 kfree(coll);
503 * Create a new header structure, translate header format from the on-disk
504 * header.
506 static int rbd_header_from_disk(struct rbd_image_header *header,
507 struct rbd_image_header_ondisk *ondisk,
508 int allocated_snaps,
509 gfp_t gfp_flags)
511 int i;
512 u32 snap_count;
514 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
515 return -ENXIO;
517 snap_count = le32_to_cpu(ondisk->snap_count);
518 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
519 snap_count * sizeof(u64),
520 gfp_flags);
521 if (!header->snapc)
522 return -ENOMEM;
524 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
525 if (snap_count) {
526 header->snap_names = kmalloc(header->snap_names_len,
527 GFP_KERNEL);
528 if (!header->snap_names)
529 goto err_snapc;
530 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
531 GFP_KERNEL);
532 if (!header->snap_sizes)
533 goto err_names;
534 } else {
535 header->snap_names = NULL;
536 header->snap_sizes = NULL;
538 memcpy(header->block_name, ondisk->block_name,
539 sizeof(ondisk->block_name));
541 header->image_size = le64_to_cpu(ondisk->image_size);
542 header->obj_order = ondisk->options.order;
543 header->crypt_type = ondisk->options.crypt_type;
544 header->comp_type = ondisk->options.comp_type;
546 atomic_set(&header->snapc->nref, 1);
547 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
548 header->snapc->num_snaps = snap_count;
549 header->total_snaps = snap_count;
551 if (snap_count && allocated_snaps == snap_count) {
552 for (i = 0; i < snap_count; i++) {
553 header->snapc->snaps[i] =
554 le64_to_cpu(ondisk->snaps[i].id);
555 header->snap_sizes[i] =
556 le64_to_cpu(ondisk->snaps[i].image_size);
559 /* copy snapshot names */
560 memcpy(header->snap_names, &ondisk->snaps[i],
561 header->snap_names_len);
564 return 0;
566 err_names:
567 kfree(header->snap_names);
568 err_snapc:
569 kfree(header->snapc);
570 return -ENOMEM;
573 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
574 u64 *seq, u64 *size)
576 int i;
577 char *p = header->snap_names;
579 for (i = 0; i < header->total_snaps; i++) {
580 if (!strcmp(snap_name, p)) {
582 /* Found it. Pass back its id and/or size */
584 if (seq)
585 *seq = header->snapc->snaps[i];
586 if (size)
587 *size = header->snap_sizes[i];
588 return i;
590 p += strlen(p) + 1; /* Skip ahead to the next name */
592 return -ENOENT;
595 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
597 struct rbd_image_header *header = &dev->header;
598 struct ceph_snap_context *snapc = header->snapc;
599 int ret = -ENOENT;
601 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
603 down_write(&dev->header_rwsem);
605 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
606 sizeof (RBD_SNAP_HEAD_NAME))) {
607 if (header->total_snaps)
608 snapc->seq = header->snap_seq;
609 else
610 snapc->seq = 0;
611 dev->snap_id = CEPH_NOSNAP;
612 dev->snap_exists = false;
613 dev->read_only = dev->rbd_client->rbd_opts->read_only;
614 if (size)
615 *size = header->image_size;
616 } else {
617 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
618 if (ret < 0)
619 goto done;
620 dev->snap_id = snapc->seq;
621 dev->snap_exists = true;
622 dev->read_only = true; /* No choice for snapshots */
625 ret = 0;
626 done:
627 up_write(&dev->header_rwsem);
628 return ret;
631 static void rbd_header_free(struct rbd_image_header *header)
633 ceph_put_snap_context(header->snapc);
634 kfree(header->snap_names);
635 kfree(header->snap_sizes);
639 * get the actual striped segment name, offset and length
641 static u64 rbd_get_segment(struct rbd_image_header *header,
642 const char *block_name,
643 u64 ofs, u64 len,
644 char *seg_name, u64 *segofs)
646 u64 seg = ofs >> header->obj_order;
648 if (seg_name)
649 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
650 "%s.%012llx", block_name, seg);
652 ofs = ofs & ((1 << header->obj_order) - 1);
653 len = min_t(u64, len, (1 << header->obj_order) - ofs);
655 if (segofs)
656 *segofs = ofs;
658 return len;
661 static int rbd_get_num_segments(struct rbd_image_header *header,
662 u64 ofs, u64 len)
664 u64 start_seg = ofs >> header->obj_order;
665 u64 end_seg = (ofs + len - 1) >> header->obj_order;
666 return end_seg - start_seg + 1;
670 * returns the size of an object in the image
672 static u64 rbd_obj_bytes(struct rbd_image_header *header)
674 return 1 << header->obj_order;
678 * bio helpers
681 static void bio_chain_put(struct bio *chain)
683 struct bio *tmp;
685 while (chain) {
686 tmp = chain;
687 chain = chain->bi_next;
688 bio_put(tmp);
693 * zeros a bio chain, starting at specific offset
695 static void zero_bio_chain(struct bio *chain, int start_ofs)
697 struct bio_vec *bv;
698 unsigned long flags;
699 void *buf;
700 int i;
701 int pos = 0;
703 while (chain) {
704 bio_for_each_segment(bv, chain, i) {
705 if (pos + bv->bv_len > start_ofs) {
706 int remainder = max(start_ofs - pos, 0);
707 buf = bvec_kmap_irq(bv, &flags);
708 memset(buf + remainder, 0,
709 bv->bv_len - remainder);
710 bvec_kunmap_irq(buf, &flags);
712 pos += bv->bv_len;
715 chain = chain->bi_next;
720 * bio_chain_clone - clone a chain of bios up to a certain length.
721 * might return a bio_pair that will need to be released.
723 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
724 struct bio_pair **bp,
725 int len, gfp_t gfpmask)
727 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
728 int total = 0;
730 if (*bp) {
731 bio_pair_release(*bp);
732 *bp = NULL;
735 while (old_chain && (total < len)) {
736 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
737 if (!tmp)
738 goto err_out;
740 if (total + old_chain->bi_size > len) {
741 struct bio_pair *bp;
744 * this split can only happen with a single paged bio,
745 * split_bio will BUG_ON if this is not the case
747 dout("bio_chain_clone split! total=%d remaining=%d"
748 "bi_size=%d\n",
749 (int)total, (int)len-total,
750 (int)old_chain->bi_size);
752 /* split the bio. We'll release it either in the next
753 call, or it will have to be released outside */
754 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
755 if (!bp)
756 goto err_out;
758 __bio_clone(tmp, &bp->bio1);
760 *next = &bp->bio2;
761 } else {
762 __bio_clone(tmp, old_chain);
763 *next = old_chain->bi_next;
766 tmp->bi_bdev = NULL;
767 gfpmask &= ~__GFP_WAIT;
768 tmp->bi_next = NULL;
770 if (!new_chain) {
771 new_chain = tail = tmp;
772 } else {
773 tail->bi_next = tmp;
774 tail = tmp;
776 old_chain = old_chain->bi_next;
778 total += tmp->bi_size;
781 BUG_ON(total < len);
783 if (tail)
784 tail->bi_next = NULL;
786 *old = old_chain;
788 return new_chain;
790 err_out:
791 dout("bio_chain_clone with err\n");
792 bio_chain_put(new_chain);
793 return NULL;
797 * helpers for osd request op vectors.
799 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
800 int num_ops,
801 int opcode,
802 u32 payload_len)
804 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
805 GFP_NOIO);
806 if (!*ops)
807 return -ENOMEM;
808 (*ops)[0].op = opcode;
810 * op extent offset and length will be set later on
811 * in calc_raw_layout()
813 (*ops)[0].payload_len = payload_len;
814 return 0;
817 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
819 kfree(ops);
822 static void rbd_coll_end_req_index(struct request *rq,
823 struct rbd_req_coll *coll,
824 int index,
825 int ret, u64 len)
827 struct request_queue *q;
828 int min, max, i;
830 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
831 coll, index, ret, len);
833 if (!rq)
834 return;
836 if (!coll) {
837 blk_end_request(rq, ret, len);
838 return;
841 q = rq->q;
843 spin_lock_irq(q->queue_lock);
844 coll->status[index].done = 1;
845 coll->status[index].rc = ret;
846 coll->status[index].bytes = len;
847 max = min = coll->num_done;
848 while (max < coll->total && coll->status[max].done)
849 max++;
851 for (i = min; i<max; i++) {
852 __blk_end_request(rq, coll->status[i].rc,
853 coll->status[i].bytes);
854 coll->num_done++;
855 kref_put(&coll->kref, rbd_coll_release);
857 spin_unlock_irq(q->queue_lock);
860 static void rbd_coll_end_req(struct rbd_request *req,
861 int ret, u64 len)
863 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
867 * Send ceph osd request
869 static int rbd_do_request(struct request *rq,
870 struct rbd_device *dev,
871 struct ceph_snap_context *snapc,
872 u64 snapid,
873 const char *obj, u64 ofs, u64 len,
874 struct bio *bio,
875 struct page **pages,
876 int num_pages,
877 int flags,
878 struct ceph_osd_req_op *ops,
879 int num_reply,
880 struct rbd_req_coll *coll,
881 int coll_index,
882 void (*rbd_cb)(struct ceph_osd_request *req,
883 struct ceph_msg *msg),
884 struct ceph_osd_request **linger_req,
885 u64 *ver)
887 struct ceph_osd_request *req;
888 struct ceph_file_layout *layout;
889 int ret;
890 u64 bno;
891 struct timespec mtime = CURRENT_TIME;
892 struct rbd_request *req_data;
893 struct ceph_osd_request_head *reqhead;
894 struct ceph_osd_client *osdc;
896 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
897 if (!req_data) {
898 if (coll)
899 rbd_coll_end_req_index(rq, coll, coll_index,
900 -ENOMEM, len);
901 return -ENOMEM;
904 if (coll) {
905 req_data->coll = coll;
906 req_data->coll_index = coll_index;
909 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
911 osdc = &dev->rbd_client->client->osdc;
912 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
913 false, GFP_NOIO, pages, bio);
914 if (!req) {
915 ret = -ENOMEM;
916 goto done_pages;
919 req->r_callback = rbd_cb;
921 req_data->rq = rq;
922 req_data->bio = bio;
923 req_data->pages = pages;
924 req_data->len = len;
926 req->r_priv = req_data;
928 reqhead = req->r_request->front.iov_base;
929 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
931 strncpy(req->r_oid, obj, sizeof(req->r_oid));
932 req->r_oid_len = strlen(req->r_oid);
934 layout = &req->r_file_layout;
935 memset(layout, 0, sizeof(*layout));
936 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
937 layout->fl_stripe_count = cpu_to_le32(1);
938 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
939 layout->fl_pg_preferred = cpu_to_le32(-1);
940 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
941 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
942 req, ops);
943 BUG_ON(ret != 0);
945 ceph_osdc_build_request(req, ofs, &len,
946 ops,
947 snapc,
948 &mtime,
949 req->r_oid, req->r_oid_len);
951 if (linger_req) {
952 ceph_osdc_set_request_linger(osdc, req);
953 *linger_req = req;
956 ret = ceph_osdc_start_request(osdc, req, false);
957 if (ret < 0)
958 goto done_err;
960 if (!rbd_cb) {
961 ret = ceph_osdc_wait_request(osdc, req);
962 if (ver)
963 *ver = le64_to_cpu(req->r_reassert_version.version);
964 dout("reassert_ver=%lld\n",
965 le64_to_cpu(req->r_reassert_version.version));
966 ceph_osdc_put_request(req);
968 return ret;
970 done_err:
971 bio_chain_put(req_data->bio);
972 ceph_osdc_put_request(req);
973 done_pages:
974 rbd_coll_end_req(req_data, ret, len);
975 kfree(req_data);
976 return ret;
980 * Ceph osd op callback
982 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
984 struct rbd_request *req_data = req->r_priv;
985 struct ceph_osd_reply_head *replyhead;
986 struct ceph_osd_op *op;
987 __s32 rc;
988 u64 bytes;
989 int read_op;
991 /* parse reply */
992 replyhead = msg->front.iov_base;
993 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994 op = (void *)(replyhead + 1);
995 rc = le32_to_cpu(replyhead->result);
996 bytes = le64_to_cpu(op->extent.length);
997 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
999 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
1001 if (rc == -ENOENT && read_op) {
1002 zero_bio_chain(req_data->bio, 0);
1003 rc = 0;
1004 } else if (rc == 0 && read_op && bytes < req_data->len) {
1005 zero_bio_chain(req_data->bio, bytes);
1006 bytes = req_data->len;
1009 rbd_coll_end_req(req_data, rc, bytes);
1011 if (req_data->bio)
1012 bio_chain_put(req_data->bio);
1014 ceph_osdc_put_request(req);
1015 kfree(req_data);
1018 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1020 ceph_osdc_put_request(req);
1024 * Do a synchronous ceph osd operation
1026 static int rbd_req_sync_op(struct rbd_device *dev,
1027 struct ceph_snap_context *snapc,
1028 u64 snapid,
1029 int opcode,
1030 int flags,
1031 struct ceph_osd_req_op *orig_ops,
1032 int num_reply,
1033 const char *obj,
1034 u64 ofs, u64 len,
1035 char *buf,
1036 struct ceph_osd_request **linger_req,
1037 u64 *ver)
1039 int ret;
1040 struct page **pages;
1041 int num_pages;
1042 struct ceph_osd_req_op *ops = orig_ops;
1043 u32 payload_len;
1045 num_pages = calc_pages_for(ofs , len);
1046 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1047 if (IS_ERR(pages))
1048 return PTR_ERR(pages);
1050 if (!orig_ops) {
1051 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1052 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1053 if (ret < 0)
1054 goto done;
1056 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1057 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1058 if (ret < 0)
1059 goto done_ops;
1063 ret = rbd_do_request(NULL, dev, snapc, snapid,
1064 obj, ofs, len, NULL,
1065 pages, num_pages,
1066 flags,
1067 ops,
1069 NULL, 0,
1070 NULL,
1071 linger_req, ver);
1072 if (ret < 0)
1073 goto done_ops;
1075 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1076 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1078 done_ops:
1079 if (!orig_ops)
1080 rbd_destroy_ops(ops);
1081 done:
1082 ceph_release_page_vector(pages, num_pages);
1083 return ret;
1087 * Do an asynchronous ceph osd operation
1089 static int rbd_do_op(struct request *rq,
1090 struct rbd_device *rbd_dev ,
1091 struct ceph_snap_context *snapc,
1092 u64 snapid,
1093 int opcode, int flags, int num_reply,
1094 u64 ofs, u64 len,
1095 struct bio *bio,
1096 struct rbd_req_coll *coll,
1097 int coll_index)
1099 char *seg_name;
1100 u64 seg_ofs;
1101 u64 seg_len;
1102 int ret;
1103 struct ceph_osd_req_op *ops;
1104 u32 payload_len;
1106 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1107 if (!seg_name)
1108 return -ENOMEM;
1110 seg_len = rbd_get_segment(&rbd_dev->header,
1111 rbd_dev->header.block_name,
1112 ofs, len,
1113 seg_name, &seg_ofs);
1115 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1117 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1118 if (ret < 0)
1119 goto done;
1121 /* we've taken care of segment sizes earlier when we
1122 cloned the bios. We should never have a segment
1123 truncated at this point */
1124 BUG_ON(seg_len < len);
1126 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1127 seg_name, seg_ofs, seg_len,
1128 bio,
1129 NULL, 0,
1130 flags,
1131 ops,
1132 num_reply,
1133 coll, coll_index,
1134 rbd_req_cb, 0, NULL);
1136 rbd_destroy_ops(ops);
1137 done:
1138 kfree(seg_name);
1139 return ret;
1143 * Request async osd write
1145 static int rbd_req_write(struct request *rq,
1146 struct rbd_device *rbd_dev,
1147 struct ceph_snap_context *snapc,
1148 u64 ofs, u64 len,
1149 struct bio *bio,
1150 struct rbd_req_coll *coll,
1151 int coll_index)
1153 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1154 CEPH_OSD_OP_WRITE,
1155 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1157 ofs, len, bio, coll, coll_index);
1161 * Request async osd read
1163 static int rbd_req_read(struct request *rq,
1164 struct rbd_device *rbd_dev,
1165 u64 snapid,
1166 u64 ofs, u64 len,
1167 struct bio *bio,
1168 struct rbd_req_coll *coll,
1169 int coll_index)
1171 return rbd_do_op(rq, rbd_dev, NULL,
1172 (snapid ? snapid : CEPH_NOSNAP),
1173 CEPH_OSD_OP_READ,
1174 CEPH_OSD_FLAG_READ,
1176 ofs, len, bio, coll, coll_index);
1180 * Request sync osd read
1182 static int rbd_req_sync_read(struct rbd_device *dev,
1183 struct ceph_snap_context *snapc,
1184 u64 snapid,
1185 const char *obj,
1186 u64 ofs, u64 len,
1187 char *buf,
1188 u64 *ver)
1190 return rbd_req_sync_op(dev, NULL,
1191 (snapid ? snapid : CEPH_NOSNAP),
1192 CEPH_OSD_OP_READ,
1193 CEPH_OSD_FLAG_READ,
1194 NULL,
1195 1, obj, ofs, len, buf, NULL, ver);
1199 * Request sync osd watch
1201 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1202 u64 ver,
1203 u64 notify_id,
1204 const char *obj)
1206 struct ceph_osd_req_op *ops;
1207 struct page **pages = NULL;
1208 int ret;
1210 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1211 if (ret < 0)
1212 return ret;
1214 ops[0].watch.ver = cpu_to_le64(ver);
1215 ops[0].watch.cookie = notify_id;
1216 ops[0].watch.flag = 0;
1218 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1219 obj, 0, 0, NULL,
1220 pages, 0,
1221 CEPH_OSD_FLAG_READ,
1222 ops,
1224 NULL, 0,
1225 rbd_simple_req_cb, 0, NULL);
1227 rbd_destroy_ops(ops);
1228 return ret;
1231 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1233 struct rbd_device *dev = (struct rbd_device *)data;
1234 u64 hver;
1235 int rc;
1237 if (!dev)
1238 return;
1240 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1241 notify_id, (int)opcode);
1242 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1243 rc = __rbd_update_snaps(dev);
1244 hver = dev->header.obj_version;
1245 mutex_unlock(&ctl_mutex);
1246 if (rc)
1247 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1248 " update snaps: %d\n", dev->major, rc);
1250 rbd_req_sync_notify_ack(dev, hver, notify_id, dev->obj_md_name);
1254 * Request sync osd watch
1256 static int rbd_req_sync_watch(struct rbd_device *dev,
1257 const char *obj,
1258 u64 ver)
1260 struct ceph_osd_req_op *ops;
1261 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1263 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1264 if (ret < 0)
1265 return ret;
1267 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1268 (void *)dev, &dev->watch_event);
1269 if (ret < 0)
1270 goto fail;
1272 ops[0].watch.ver = cpu_to_le64(ver);
1273 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1274 ops[0].watch.flag = 1;
1276 ret = rbd_req_sync_op(dev, NULL,
1277 CEPH_NOSNAP,
1279 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1280 ops,
1281 1, obj, 0, 0, NULL,
1282 &dev->watch_request, NULL);
1284 if (ret < 0)
1285 goto fail_event;
1287 rbd_destroy_ops(ops);
1288 return 0;
1290 fail_event:
1291 ceph_osdc_cancel_event(dev->watch_event);
1292 dev->watch_event = NULL;
1293 fail:
1294 rbd_destroy_ops(ops);
1295 return ret;
1299 * Request sync osd unwatch
1301 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1302 const char *obj)
1304 struct ceph_osd_req_op *ops;
1306 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1307 if (ret < 0)
1308 return ret;
1310 ops[0].watch.ver = 0;
1311 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1312 ops[0].watch.flag = 0;
1314 ret = rbd_req_sync_op(dev, NULL,
1315 CEPH_NOSNAP,
1317 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1318 ops,
1319 1, obj, 0, 0, NULL, NULL, NULL);
1321 rbd_destroy_ops(ops);
1322 ceph_osdc_cancel_event(dev->watch_event);
1323 dev->watch_event = NULL;
1324 return ret;
1327 #if 0
1329 * Request sync osd read
1331 static int rbd_req_sync_exec(struct rbd_device *dev,
1332 const char *obj,
1333 const char *cls,
1334 const char *method,
1335 const char *data,
1336 int len,
1337 u64 *ver)
1339 struct ceph_osd_req_op *ops;
1340 int cls_len = strlen(cls);
1341 int method_len = strlen(method);
1342 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1343 cls_len + method_len + len);
1344 if (ret < 0)
1345 return ret;
1347 ops[0].cls.class_name = cls;
1348 ops[0].cls.class_len = (__u8)cls_len;
1349 ops[0].cls.method_name = method;
1350 ops[0].cls.method_len = (__u8)method_len;
1351 ops[0].cls.argc = 0;
1352 ops[0].cls.indata = data;
1353 ops[0].cls.indata_len = len;
1355 ret = rbd_req_sync_op(dev, NULL,
1356 CEPH_NOSNAP,
1358 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1359 ops,
1360 1, obj, 0, 0, NULL, NULL, ver);
1362 rbd_destroy_ops(ops);
1364 dout("cls_exec returned %d\n", ret);
1365 return ret;
1367 #endif
1369 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1371 struct rbd_req_coll *coll =
1372 kzalloc(sizeof(struct rbd_req_coll) +
1373 sizeof(struct rbd_req_status) * num_reqs,
1374 GFP_ATOMIC);
1376 if (!coll)
1377 return NULL;
1378 coll->total = num_reqs;
1379 kref_init(&coll->kref);
1380 return coll;
1384 * block device queue callback
1386 static void rbd_rq_fn(struct request_queue *q)
1388 struct rbd_device *rbd_dev = q->queuedata;
1389 struct request *rq;
1390 struct bio_pair *bp = NULL;
1392 while ((rq = blk_fetch_request(q))) {
1393 struct bio *bio;
1394 struct bio *rq_bio, *next_bio = NULL;
1395 bool do_write;
1396 int size, op_size = 0;
1397 u64 ofs;
1398 int num_segs, cur_seg = 0;
1399 struct rbd_req_coll *coll;
1400 struct ceph_snap_context *snapc;
1402 /* peek at request from block layer */
1403 if (!rq)
1404 break;
1406 dout("fetched request\n");
1408 /* filter out block requests we don't understand */
1409 if ((rq->cmd_type != REQ_TYPE_FS)) {
1410 __blk_end_request_all(rq, 0);
1411 continue;
1414 /* deduce our operation (read, write) */
1415 do_write = (rq_data_dir(rq) == WRITE);
1417 size = blk_rq_bytes(rq);
1418 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1419 rq_bio = rq->bio;
1420 if (do_write && rbd_dev->read_only) {
1421 __blk_end_request_all(rq, -EROFS);
1422 continue;
1425 spin_unlock_irq(q->queue_lock);
1427 down_read(&rbd_dev->header_rwsem);
1429 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1430 up_read(&rbd_dev->header_rwsem);
1431 dout("request for non-existent snapshot");
1432 spin_lock_irq(q->queue_lock);
1433 __blk_end_request_all(rq, -ENXIO);
1434 continue;
1437 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1439 up_read(&rbd_dev->header_rwsem);
1441 dout("%s 0x%x bytes at 0x%llx\n",
1442 do_write ? "write" : "read",
1443 size, blk_rq_pos(rq) * SECTOR_SIZE);
1445 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1446 coll = rbd_alloc_coll(num_segs);
1447 if (!coll) {
1448 spin_lock_irq(q->queue_lock);
1449 __blk_end_request_all(rq, -ENOMEM);
1450 ceph_put_snap_context(snapc);
1451 continue;
1454 do {
1455 /* a bio clone to be passed down to OSD req */
1456 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1457 op_size = rbd_get_segment(&rbd_dev->header,
1458 rbd_dev->header.block_name,
1459 ofs, size,
1460 NULL, NULL);
1461 kref_get(&coll->kref);
1462 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1463 op_size, GFP_ATOMIC);
1464 if (!bio) {
1465 rbd_coll_end_req_index(rq, coll, cur_seg,
1466 -ENOMEM, op_size);
1467 goto next_seg;
1471 /* init OSD command: write or read */
1472 if (do_write)
1473 rbd_req_write(rq, rbd_dev,
1474 snapc,
1475 ofs,
1476 op_size, bio,
1477 coll, cur_seg);
1478 else
1479 rbd_req_read(rq, rbd_dev,
1480 rbd_dev->snap_id,
1481 ofs,
1482 op_size, bio,
1483 coll, cur_seg);
1485 next_seg:
1486 size -= op_size;
1487 ofs += op_size;
1489 cur_seg++;
1490 rq_bio = next_bio;
1491 } while (size > 0);
1492 kref_put(&coll->kref, rbd_coll_release);
1494 if (bp)
1495 bio_pair_release(bp);
1496 spin_lock_irq(q->queue_lock);
1498 ceph_put_snap_context(snapc);
1503 * a queue callback. Makes sure that we don't create a bio that spans across
1504 * multiple osd objects. One exception would be with a single page bios,
1505 * which we handle later at bio_chain_clone
1507 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1508 struct bio_vec *bvec)
1510 struct rbd_device *rbd_dev = q->queuedata;
1511 unsigned int chunk_sectors;
1512 sector_t sector;
1513 unsigned int bio_sectors;
1514 int max;
1516 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1517 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1518 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1520 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1521 + bio_sectors)) << SECTOR_SHIFT;
1522 if (max < 0)
1523 max = 0; /* bio_add cannot handle a negative return */
1524 if (max <= bvec->bv_len && bio_sectors == 0)
1525 return bvec->bv_len;
1526 return max;
1529 static void rbd_free_disk(struct rbd_device *rbd_dev)
1531 struct gendisk *disk = rbd_dev->disk;
1533 if (!disk)
1534 return;
1536 rbd_header_free(&rbd_dev->header);
1538 if (disk->flags & GENHD_FL_UP)
1539 del_gendisk(disk);
1540 if (disk->queue)
1541 blk_cleanup_queue(disk->queue);
1542 put_disk(disk);
1546 * reload the ondisk the header
1548 static int rbd_read_header(struct rbd_device *rbd_dev,
1549 struct rbd_image_header *header)
1551 ssize_t rc;
1552 struct rbd_image_header_ondisk *dh;
1553 int snap_count = 0;
1554 u64 ver;
1555 size_t len;
1558 * First reads the fixed-size header to determine the number
1559 * of snapshots, then re-reads it, along with all snapshot
1560 * records as well as their stored names.
1562 len = sizeof (*dh);
1563 while (1) {
1564 dh = kmalloc(len, GFP_KERNEL);
1565 if (!dh)
1566 return -ENOMEM;
1568 rc = rbd_req_sync_read(rbd_dev,
1569 NULL, CEPH_NOSNAP,
1570 rbd_dev->obj_md_name,
1571 0, len,
1572 (char *)dh, &ver);
1573 if (rc < 0)
1574 goto out_dh;
1576 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1577 if (rc < 0) {
1578 if (rc == -ENXIO)
1579 pr_warning("unrecognized header format"
1580 " for image %s", rbd_dev->obj);
1581 goto out_dh;
1584 if (snap_count == header->total_snaps)
1585 break;
1587 snap_count = header->total_snaps;
1588 len = sizeof (*dh) +
1589 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1590 header->snap_names_len;
1592 rbd_header_free(header);
1593 kfree(dh);
1595 header->obj_version = ver;
1597 out_dh:
1598 kfree(dh);
1599 return rc;
1602 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1604 struct rbd_snap *snap;
1606 while (!list_empty(&rbd_dev->snaps)) {
1607 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1608 __rbd_remove_snap_dev(rbd_dev, snap);
1613 * only read the first part of the ondisk header, without the snaps info
1615 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1617 int ret;
1618 struct rbd_image_header h;
1619 u64 snap_seq;
1620 int follow_seq = 0;
1622 ret = rbd_read_header(rbd_dev, &h);
1623 if (ret < 0)
1624 return ret;
1626 down_write(&rbd_dev->header_rwsem);
1628 /* resized? */
1629 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1630 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1632 dout("setting size to %llu sectors", (unsigned long long) size);
1633 set_capacity(rbd_dev->disk, size);
1636 snap_seq = rbd_dev->header.snapc->seq;
1637 if (rbd_dev->header.total_snaps &&
1638 rbd_dev->header.snapc->snaps[0] == snap_seq)
1639 /* pointing at the head, will need to follow that
1640 if head moves */
1641 follow_seq = 1;
1643 ceph_put_snap_context(rbd_dev->header.snapc);
1644 kfree(rbd_dev->header.snap_names);
1645 kfree(rbd_dev->header.snap_sizes);
1647 rbd_dev->header.obj_version = h.obj_version;
1648 rbd_dev->header.image_size = h.image_size;
1649 rbd_dev->header.total_snaps = h.total_snaps;
1650 rbd_dev->header.snapc = h.snapc;
1651 rbd_dev->header.snap_names = h.snap_names;
1652 rbd_dev->header.snap_names_len = h.snap_names_len;
1653 rbd_dev->header.snap_sizes = h.snap_sizes;
1654 if (follow_seq)
1655 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1656 else
1657 rbd_dev->header.snapc->seq = snap_seq;
1659 ret = __rbd_init_snaps_header(rbd_dev);
1661 up_write(&rbd_dev->header_rwsem);
1663 return ret;
1666 static int rbd_init_disk(struct rbd_device *rbd_dev)
1668 struct gendisk *disk;
1669 struct request_queue *q;
1670 int rc;
1671 u64 segment_size;
1672 u64 total_size = 0;
1674 /* contact OSD, request size info about the object being mapped */
1675 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1676 if (rc)
1677 return rc;
1679 /* no need to lock here, as rbd_dev is not registered yet */
1680 rc = __rbd_init_snaps_header(rbd_dev);
1681 if (rc)
1682 return rc;
1684 rc = rbd_header_set_snap(rbd_dev, &total_size);
1685 if (rc)
1686 return rc;
1688 /* create gendisk info */
1689 rc = -ENOMEM;
1690 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1691 if (!disk)
1692 goto out;
1694 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1695 rbd_dev->id);
1696 disk->major = rbd_dev->major;
1697 disk->first_minor = 0;
1698 disk->fops = &rbd_bd_ops;
1699 disk->private_data = rbd_dev;
1701 /* init rq */
1702 rc = -ENOMEM;
1703 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1704 if (!q)
1705 goto out_disk;
1707 /* We use the default size, but let's be explicit about it. */
1708 blk_queue_physical_block_size(q, SECTOR_SIZE);
1710 /* set io sizes to object size */
1711 segment_size = rbd_obj_bytes(&rbd_dev->header);
1712 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1713 blk_queue_max_segment_size(q, segment_size);
1714 blk_queue_io_min(q, segment_size);
1715 blk_queue_io_opt(q, segment_size);
1717 blk_queue_merge_bvec(q, rbd_merge_bvec);
1718 disk->queue = q;
1720 q->queuedata = rbd_dev;
1722 rbd_dev->disk = disk;
1723 rbd_dev->q = q;
1725 /* finally, announce the disk to the world */
1726 set_capacity(disk, total_size / SECTOR_SIZE);
1727 add_disk(disk);
1729 pr_info("%s: added with size 0x%llx\n",
1730 disk->disk_name, (unsigned long long)total_size);
1731 return 0;
1733 out_disk:
1734 put_disk(disk);
1735 out:
1736 return rc;
1740 sysfs
1743 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1745 return container_of(dev, struct rbd_device, dev);
1748 static ssize_t rbd_size_show(struct device *dev,
1749 struct device_attribute *attr, char *buf)
1751 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1752 sector_t size;
1754 down_read(&rbd_dev->header_rwsem);
1755 size = get_capacity(rbd_dev->disk);
1756 up_read(&rbd_dev->header_rwsem);
1758 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1761 static ssize_t rbd_major_show(struct device *dev,
1762 struct device_attribute *attr, char *buf)
1764 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1766 return sprintf(buf, "%d\n", rbd_dev->major);
1769 static ssize_t rbd_client_id_show(struct device *dev,
1770 struct device_attribute *attr, char *buf)
1772 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1774 return sprintf(buf, "client%lld\n",
1775 ceph_client_id(rbd_dev->rbd_client->client));
1778 static ssize_t rbd_pool_show(struct device *dev,
1779 struct device_attribute *attr, char *buf)
1781 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1783 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1786 static ssize_t rbd_name_show(struct device *dev,
1787 struct device_attribute *attr, char *buf)
1789 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1791 return sprintf(buf, "%s\n", rbd_dev->obj);
1794 static ssize_t rbd_snap_show(struct device *dev,
1795 struct device_attribute *attr,
1796 char *buf)
1798 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1800 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1803 static ssize_t rbd_image_refresh(struct device *dev,
1804 struct device_attribute *attr,
1805 const char *buf,
1806 size_t size)
1808 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1809 int rc;
1810 int ret = size;
1812 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1814 rc = __rbd_update_snaps(rbd_dev);
1815 if (rc < 0)
1816 ret = rc;
1818 mutex_unlock(&ctl_mutex);
1819 return ret;
1822 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1823 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1824 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1825 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1826 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1827 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1828 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1830 static struct attribute *rbd_attrs[] = {
1831 &dev_attr_size.attr,
1832 &dev_attr_major.attr,
1833 &dev_attr_client_id.attr,
1834 &dev_attr_pool.attr,
1835 &dev_attr_name.attr,
1836 &dev_attr_current_snap.attr,
1837 &dev_attr_refresh.attr,
1838 NULL
1841 static struct attribute_group rbd_attr_group = {
1842 .attrs = rbd_attrs,
1845 static const struct attribute_group *rbd_attr_groups[] = {
1846 &rbd_attr_group,
1847 NULL
1850 static void rbd_sysfs_dev_release(struct device *dev)
1854 static struct device_type rbd_device_type = {
1855 .name = "rbd",
1856 .groups = rbd_attr_groups,
1857 .release = rbd_sysfs_dev_release,
1862 sysfs - snapshots
1865 static ssize_t rbd_snap_size_show(struct device *dev,
1866 struct device_attribute *attr,
1867 char *buf)
1869 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1871 return sprintf(buf, "%zd\n", snap->size);
1874 static ssize_t rbd_snap_id_show(struct device *dev,
1875 struct device_attribute *attr,
1876 char *buf)
1878 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1880 return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
1883 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1884 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1886 static struct attribute *rbd_snap_attrs[] = {
1887 &dev_attr_snap_size.attr,
1888 &dev_attr_snap_id.attr,
1889 NULL,
1892 static struct attribute_group rbd_snap_attr_group = {
1893 .attrs = rbd_snap_attrs,
1896 static void rbd_snap_dev_release(struct device *dev)
1898 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1899 kfree(snap->name);
1900 kfree(snap);
1903 static const struct attribute_group *rbd_snap_attr_groups[] = {
1904 &rbd_snap_attr_group,
1905 NULL
1908 static struct device_type rbd_snap_device_type = {
1909 .groups = rbd_snap_attr_groups,
1910 .release = rbd_snap_dev_release,
1913 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1914 struct rbd_snap *snap)
1916 list_del(&snap->node);
1917 device_unregister(&snap->dev);
1920 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1921 struct rbd_snap *snap,
1922 struct device *parent)
1924 struct device *dev = &snap->dev;
1925 int ret;
1927 dev->type = &rbd_snap_device_type;
1928 dev->parent = parent;
1929 dev->release = rbd_snap_dev_release;
1930 dev_set_name(dev, "snap_%s", snap->name);
1931 ret = device_register(dev);
1933 return ret;
1936 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1937 int i, const char *name,
1938 struct rbd_snap **snapp)
1940 int ret;
1941 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1942 if (!snap)
1943 return -ENOMEM;
1944 snap->name = kstrdup(name, GFP_KERNEL);
1945 snap->size = rbd_dev->header.snap_sizes[i];
1946 snap->id = rbd_dev->header.snapc->snaps[i];
1947 if (device_is_registered(&rbd_dev->dev)) {
1948 ret = rbd_register_snap_dev(rbd_dev, snap,
1949 &rbd_dev->dev);
1950 if (ret < 0)
1951 goto err;
1953 *snapp = snap;
1954 return 0;
1955 err:
1956 kfree(snap->name);
1957 kfree(snap);
1958 return ret;
1962 * search for the previous snap in a null delimited string list
1964 const char *rbd_prev_snap_name(const char *name, const char *start)
1966 if (name < start + 2)
1967 return NULL;
1969 name -= 2;
1970 while (*name) {
1971 if (name == start)
1972 return start;
1973 name--;
1975 return name + 1;
1979 * compare the old list of snapshots that we have to what's in the header
1980 * and update it accordingly. Note that the header holds the snapshots
1981 * in a reverse order (from newest to oldest) and we need to go from
1982 * older to new so that we don't get a duplicate snap name when
1983 * doing the process (e.g., removed snapshot and recreated a new
1984 * one with the same name.
1986 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1988 const char *name, *first_name;
1989 int i = rbd_dev->header.total_snaps;
1990 struct rbd_snap *snap, *old_snap = NULL;
1991 int ret;
1992 struct list_head *p, *n;
1994 first_name = rbd_dev->header.snap_names;
1995 name = first_name + rbd_dev->header.snap_names_len;
1997 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1998 u64 cur_id;
2000 old_snap = list_entry(p, struct rbd_snap, node);
2002 if (i)
2003 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2005 if (!i || old_snap->id < cur_id) {
2007 * old_snap->id was skipped, thus was
2008 * removed. If this rbd_dev is mapped to
2009 * the removed snapshot, record that it no
2010 * longer exists, to prevent further I/O.
2012 if (rbd_dev->snap_id == old_snap->id)
2013 rbd_dev->snap_exists = false;
2014 __rbd_remove_snap_dev(rbd_dev, old_snap);
2015 continue;
2017 if (old_snap->id == cur_id) {
2018 /* we have this snapshot already */
2019 i--;
2020 name = rbd_prev_snap_name(name, first_name);
2021 continue;
2023 for (; i > 0;
2024 i--, name = rbd_prev_snap_name(name, first_name)) {
2025 if (!name) {
2026 WARN_ON(1);
2027 return -EINVAL;
2029 cur_id = rbd_dev->header.snapc->snaps[i];
2030 /* snapshot removal? handle it above */
2031 if (cur_id >= old_snap->id)
2032 break;
2033 /* a new snapshot */
2034 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2035 if (ret < 0)
2036 return ret;
2038 /* note that we add it backward so using n and not p */
2039 list_add(&snap->node, n);
2040 p = &snap->node;
2043 /* we're done going over the old snap list, just add what's left */
2044 for (; i > 0; i--) {
2045 name = rbd_prev_snap_name(name, first_name);
2046 if (!name) {
2047 WARN_ON(1);
2048 return -EINVAL;
2050 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2051 if (ret < 0)
2052 return ret;
2053 list_add(&snap->node, &rbd_dev->snaps);
2056 return 0;
2059 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2061 int ret;
2062 struct device *dev;
2063 struct rbd_snap *snap;
2065 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2066 dev = &rbd_dev->dev;
2068 dev->bus = &rbd_bus_type;
2069 dev->type = &rbd_device_type;
2070 dev->parent = &rbd_root_dev;
2071 dev->release = rbd_dev_release;
2072 dev_set_name(dev, "%d", rbd_dev->id);
2073 ret = device_register(dev);
2074 if (ret < 0)
2075 goto out;
2077 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2078 ret = rbd_register_snap_dev(rbd_dev, snap,
2079 &rbd_dev->dev);
2080 if (ret < 0)
2081 break;
2083 out:
2084 mutex_unlock(&ctl_mutex);
2085 return ret;
2088 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2090 device_unregister(&rbd_dev->dev);
2093 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2095 int ret, rc;
2097 do {
2098 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2099 rbd_dev->header.obj_version);
2100 if (ret == -ERANGE) {
2101 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2102 rc = __rbd_update_snaps(rbd_dev);
2103 mutex_unlock(&ctl_mutex);
2104 if (rc < 0)
2105 return rc;
2107 } while (ret == -ERANGE);
2109 return ret;
2112 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2115 * Get a unique rbd identifier for the given new rbd_dev, and add
2116 * the rbd_dev to the global list. The minimum rbd id is 1.
2118 static void rbd_id_get(struct rbd_device *rbd_dev)
2120 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2122 spin_lock(&rbd_dev_list_lock);
2123 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2124 spin_unlock(&rbd_dev_list_lock);
2128 * Remove an rbd_dev from the global list, and record that its
2129 * identifier is no longer in use.
2131 static void rbd_id_put(struct rbd_device *rbd_dev)
2133 struct list_head *tmp;
2134 int rbd_id = rbd_dev->id;
2135 int max_id;
2137 BUG_ON(rbd_id < 1);
2139 spin_lock(&rbd_dev_list_lock);
2140 list_del_init(&rbd_dev->node);
2143 * If the id being "put" is not the current maximum, there
2144 * is nothing special we need to do.
2146 if (rbd_id != atomic64_read(&rbd_id_max)) {
2147 spin_unlock(&rbd_dev_list_lock);
2148 return;
2152 * We need to update the current maximum id. Search the
2153 * list to find out what it is. We're more likely to find
2154 * the maximum at the end, so search the list backward.
2156 max_id = 0;
2157 list_for_each_prev(tmp, &rbd_dev_list) {
2158 struct rbd_device *rbd_dev;
2160 rbd_dev = list_entry(tmp, struct rbd_device, node);
2161 if (rbd_dev->id > max_id)
2162 max_id = rbd_dev->id;
2164 spin_unlock(&rbd_dev_list_lock);
2167 * The max id could have been updated by rbd_id_get(), in
2168 * which case it now accurately reflects the new maximum.
2169 * Be careful not to overwrite the maximum value in that
2170 * case.
2172 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2176 * Skips over white space at *buf, and updates *buf to point to the
2177 * first found non-space character (if any). Returns the length of
2178 * the token (string of non-white space characters) found. Note
2179 * that *buf must be terminated with '\0'.
2181 static inline size_t next_token(const char **buf)
2184 * These are the characters that produce nonzero for
2185 * isspace() in the "C" and "POSIX" locales.
2187 const char *spaces = " \f\n\r\t\v";
2189 *buf += strspn(*buf, spaces); /* Find start of token */
2191 return strcspn(*buf, spaces); /* Return token length */
2195 * Finds the next token in *buf, and if the provided token buffer is
2196 * big enough, copies the found token into it. The result, if
2197 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2198 * must be terminated with '\0' on entry.
2200 * Returns the length of the token found (not including the '\0').
2201 * Return value will be 0 if no token is found, and it will be >=
2202 * token_size if the token would not fit.
2204 * The *buf pointer will be updated to point beyond the end of the
2205 * found token. Note that this occurs even if the token buffer is
2206 * too small to hold it.
2208 static inline size_t copy_token(const char **buf,
2209 char *token,
2210 size_t token_size)
2212 size_t len;
2214 len = next_token(buf);
2215 if (len < token_size) {
2216 memcpy(token, *buf, len);
2217 *(token + len) = '\0';
2219 *buf += len;
2221 return len;
2225 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2226 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2227 * on the list of monitor addresses and other options provided via
2228 * /sys/bus/rbd/add.
2230 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2231 const char *buf,
2232 const char **mon_addrs,
2233 size_t *mon_addrs_size,
2234 char *options,
2235 size_t options_size)
2237 size_t len;
2239 /* The first four tokens are required */
2241 len = next_token(&buf);
2242 if (!len)
2243 return -EINVAL;
2244 *mon_addrs_size = len + 1;
2245 *mon_addrs = buf;
2247 buf += len;
2249 len = copy_token(&buf, options, options_size);
2250 if (!len || len >= options_size)
2251 return -EINVAL;
2253 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2254 if (!len || len >= sizeof (rbd_dev->pool_name))
2255 return -EINVAL;
2257 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2258 if (!len || len >= sizeof (rbd_dev->obj))
2259 return -EINVAL;
2261 /* We have the object length in hand, save it. */
2263 rbd_dev->obj_len = len;
2265 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2266 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2267 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2270 * The snapshot name is optional, but it's an error if it's
2271 * too long. If no snapshot is supplied, fill in the default.
2273 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2274 if (!len)
2275 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2276 sizeof (RBD_SNAP_HEAD_NAME));
2277 else if (len >= sizeof (rbd_dev->snap_name))
2278 return -EINVAL;
2280 return 0;
2283 static ssize_t rbd_add(struct bus_type *bus,
2284 const char *buf,
2285 size_t count)
2287 struct rbd_device *rbd_dev;
2288 const char *mon_addrs = NULL;
2289 size_t mon_addrs_size = 0;
2290 char *options = NULL;
2291 struct ceph_osd_client *osdc;
2292 int rc = -ENOMEM;
2294 if (!try_module_get(THIS_MODULE))
2295 return -ENODEV;
2297 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2298 if (!rbd_dev)
2299 goto err_nomem;
2300 options = kmalloc(count, GFP_KERNEL);
2301 if (!options)
2302 goto err_nomem;
2304 /* static rbd_device initialization */
2305 spin_lock_init(&rbd_dev->lock);
2306 INIT_LIST_HEAD(&rbd_dev->node);
2307 INIT_LIST_HEAD(&rbd_dev->snaps);
2308 init_rwsem(&rbd_dev->header_rwsem);
2310 init_rwsem(&rbd_dev->header_rwsem);
2312 /* generate unique id: find highest unique id, add one */
2313 rbd_id_get(rbd_dev);
2315 /* Fill in the device name, now that we have its id. */
2316 BUILD_BUG_ON(DEV_NAME_LEN
2317 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2318 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2320 /* parse add command */
2321 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2322 options, count);
2323 if (rc)
2324 goto err_put_id;
2326 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2327 options);
2328 if (IS_ERR(rbd_dev->rbd_client)) {
2329 rc = PTR_ERR(rbd_dev->rbd_client);
2330 goto err_put_id;
2333 /* pick the pool */
2334 osdc = &rbd_dev->rbd_client->client->osdc;
2335 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2336 if (rc < 0)
2337 goto err_out_client;
2338 rbd_dev->poolid = rc;
2340 /* register our block device */
2341 rc = register_blkdev(0, rbd_dev->name);
2342 if (rc < 0)
2343 goto err_out_client;
2344 rbd_dev->major = rc;
2346 rc = rbd_bus_add_dev(rbd_dev);
2347 if (rc)
2348 goto err_out_blkdev;
2351 * At this point cleanup in the event of an error is the job
2352 * of the sysfs code (initiated by rbd_bus_del_dev()).
2354 * Set up and announce blkdev mapping.
2356 rc = rbd_init_disk(rbd_dev);
2357 if (rc)
2358 goto err_out_bus;
2360 rc = rbd_init_watch_dev(rbd_dev);
2361 if (rc)
2362 goto err_out_bus;
2364 return count;
2366 err_out_bus:
2367 /* this will also clean up rest of rbd_dev stuff */
2369 rbd_bus_del_dev(rbd_dev);
2370 kfree(options);
2371 return rc;
2373 err_out_blkdev:
2374 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2375 err_out_client:
2376 rbd_put_client(rbd_dev);
2377 err_put_id:
2378 rbd_id_put(rbd_dev);
2379 err_nomem:
2380 kfree(options);
2381 kfree(rbd_dev);
2383 dout("Error adding device %s\n", buf);
2384 module_put(THIS_MODULE);
2386 return (ssize_t) rc;
2389 static struct rbd_device *__rbd_get_dev(unsigned long id)
2391 struct list_head *tmp;
2392 struct rbd_device *rbd_dev;
2394 spin_lock(&rbd_dev_list_lock);
2395 list_for_each(tmp, &rbd_dev_list) {
2396 rbd_dev = list_entry(tmp, struct rbd_device, node);
2397 if (rbd_dev->id == id) {
2398 spin_unlock(&rbd_dev_list_lock);
2399 return rbd_dev;
2402 spin_unlock(&rbd_dev_list_lock);
2403 return NULL;
2406 static void rbd_dev_release(struct device *dev)
2408 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2410 if (rbd_dev->watch_request) {
2411 struct ceph_client *client = rbd_dev->rbd_client->client;
2413 ceph_osdc_unregister_linger_request(&client->osdc,
2414 rbd_dev->watch_request);
2416 if (rbd_dev->watch_event)
2417 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2419 rbd_put_client(rbd_dev);
2421 /* clean up and free blkdev */
2422 rbd_free_disk(rbd_dev);
2423 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2425 /* done with the id, and with the rbd_dev */
2426 rbd_id_put(rbd_dev);
2427 kfree(rbd_dev);
2429 /* release module ref */
2430 module_put(THIS_MODULE);
2433 static ssize_t rbd_remove(struct bus_type *bus,
2434 const char *buf,
2435 size_t count)
2437 struct rbd_device *rbd_dev = NULL;
2438 int target_id, rc;
2439 unsigned long ul;
2440 int ret = count;
2442 rc = strict_strtoul(buf, 10, &ul);
2443 if (rc)
2444 return rc;
2446 /* convert to int; abort if we lost anything in the conversion */
2447 target_id = (int) ul;
2448 if (target_id != ul)
2449 return -EINVAL;
2451 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2453 rbd_dev = __rbd_get_dev(target_id);
2454 if (!rbd_dev) {
2455 ret = -ENOENT;
2456 goto done;
2459 if (rbd_dev->open_count) {
2460 ret = -EBUSY;
2461 goto done;
2464 __rbd_remove_all_snaps(rbd_dev);
2465 rbd_bus_del_dev(rbd_dev);
2467 done:
2468 mutex_unlock(&ctl_mutex);
2469 return ret;
2473 * create control files in sysfs
2474 * /sys/bus/rbd/...
2476 static int rbd_sysfs_init(void)
2478 int ret;
2480 ret = device_register(&rbd_root_dev);
2481 if (ret < 0)
2482 return ret;
2484 ret = bus_register(&rbd_bus_type);
2485 if (ret < 0)
2486 device_unregister(&rbd_root_dev);
2488 return ret;
2491 static void rbd_sysfs_cleanup(void)
2493 bus_unregister(&rbd_bus_type);
2494 device_unregister(&rbd_root_dev);
2497 int __init rbd_init(void)
2499 int rc;
2501 rc = rbd_sysfs_init();
2502 if (rc)
2503 return rc;
2504 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2505 return 0;
2508 void __exit rbd_exit(void)
2510 rbd_sysfs_cleanup();
2513 module_init(rbd_init);
2514 module_exit(rbd_exit);
2516 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2517 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2518 MODULE_DESCRIPTION("rados block device");
2520 /* following authorship retained from original osdblk.c */
2521 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2523 MODULE_LICENSE("GPL");