IPVS: fix netns if reading ip_vs_* procfs entries
[linux-2.6/linux-mips.git] / drivers / block / rbd.c
blob16dc3645291cd7bbb3eab203b83dde68939b61d7
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
49 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN 64
51 #define RBD_MAX_SNAP_NAME_LEN 32
52 #define RBD_MAX_OPT_LEN 1024
54 #define RBD_SNAP_HEAD_NAME "-"
56 #define DEV_NAME_LEN 32
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
61 * block device image metadata (in-memory version)
63 struct rbd_image_header {
64 u64 image_size;
65 char block_name[32];
66 __u8 obj_order;
67 __u8 crypt_type;
68 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
72 u64 snap_seq;
73 u32 total_snaps;
75 char *snap_names;
76 u64 *snap_sizes;
78 u64 obj_version;
81 struct rbd_options {
82 int notify_timeout;
86 * an instance of the client. multiple devices may share a client.
88 struct rbd_client {
89 struct ceph_client *client;
90 struct rbd_options *rbd_opts;
91 struct kref kref;
92 struct list_head node;
96 * a single io request
98 struct rbd_request {
99 struct request *rq; /* blk layer request */
100 struct bio *bio; /* cloned bio */
101 struct page **pages; /* list of used pages */
102 u64 len;
105 struct rbd_snap {
106 struct device dev;
107 const char *name;
108 size_t size;
109 struct list_head node;
110 u64 id;
114 * a single device
116 struct rbd_device {
117 int id; /* blkdev unique id */
119 int major; /* blkdev assigned major */
120 struct gendisk *disk; /* blkdev's gendisk and rq */
121 struct request_queue *q;
123 struct ceph_client *client;
124 struct rbd_client *rbd_client;
126 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
128 spinlock_t lock; /* queue lock */
130 struct rbd_image_header header;
131 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
132 int obj_len;
133 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
134 char pool_name[RBD_MAX_POOL_NAME_LEN];
135 int poolid;
137 struct ceph_osd_event *watch_event;
138 struct ceph_osd_request *watch_request;
140 char snap_name[RBD_MAX_SNAP_NAME_LEN];
141 u32 cur_snap; /* index+1 of current snapshot within snap context
142 0 - for the head */
143 int read_only;
145 struct list_head node;
147 /* list of snapshots */
148 struct list_head snaps;
150 /* sysfs related */
151 struct device dev;
154 static struct bus_type rbd_bus_type = {
155 .name = "rbd",
158 static spinlock_t node_lock; /* protects client get/put */
160 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
161 static LIST_HEAD(rbd_dev_list); /* devices */
162 static LIST_HEAD(rbd_client_list); /* clients */
164 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
165 static void rbd_dev_release(struct device *dev);
166 static ssize_t rbd_snap_rollback(struct device *dev,
167 struct device_attribute *attr,
168 const char *buf,
169 size_t size);
170 static ssize_t rbd_snap_add(struct device *dev,
171 struct device_attribute *attr,
172 const char *buf,
173 size_t count);
174 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
175 struct rbd_snap *snap);;
178 static struct rbd_device *dev_to_rbd(struct device *dev)
180 return container_of(dev, struct rbd_device, dev);
183 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
185 return get_device(&rbd_dev->dev);
188 static void rbd_put_dev(struct rbd_device *rbd_dev)
190 put_device(&rbd_dev->dev);
193 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
195 static int rbd_open(struct block_device *bdev, fmode_t mode)
197 struct gendisk *disk = bdev->bd_disk;
198 struct rbd_device *rbd_dev = disk->private_data;
200 rbd_get_dev(rbd_dev);
202 set_device_ro(bdev, rbd_dev->read_only);
204 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
205 return -EROFS;
207 return 0;
210 static int rbd_release(struct gendisk *disk, fmode_t mode)
212 struct rbd_device *rbd_dev = disk->private_data;
214 rbd_put_dev(rbd_dev);
216 return 0;
219 static const struct block_device_operations rbd_bd_ops = {
220 .owner = THIS_MODULE,
221 .open = rbd_open,
222 .release = rbd_release,
226 * Initialize an rbd client instance.
227 * We own *opt.
229 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
230 struct rbd_options *rbd_opts)
232 struct rbd_client *rbdc;
233 int ret = -ENOMEM;
235 dout("rbd_client_create\n");
236 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
237 if (!rbdc)
238 goto out_opt;
240 kref_init(&rbdc->kref);
241 INIT_LIST_HEAD(&rbdc->node);
243 rbdc->client = ceph_create_client(opt, rbdc);
244 if (IS_ERR(rbdc->client))
245 goto out_rbdc;
246 opt = NULL; /* Now rbdc->client is responsible for opt */
248 ret = ceph_open_session(rbdc->client);
249 if (ret < 0)
250 goto out_err;
252 rbdc->rbd_opts = rbd_opts;
254 spin_lock(&node_lock);
255 list_add_tail(&rbdc->node, &rbd_client_list);
256 spin_unlock(&node_lock);
258 dout("rbd_client_create created %p\n", rbdc);
259 return rbdc;
261 out_err:
262 ceph_destroy_client(rbdc->client);
263 out_rbdc:
264 kfree(rbdc);
265 out_opt:
266 if (opt)
267 ceph_destroy_options(opt);
268 return ERR_PTR(ret);
272 * Find a ceph client with specific addr and configuration.
274 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
276 struct rbd_client *client_node;
278 if (opt->flags & CEPH_OPT_NOSHARE)
279 return NULL;
281 list_for_each_entry(client_node, &rbd_client_list, node)
282 if (ceph_compare_options(opt, client_node->client) == 0)
283 return client_node;
284 return NULL;
288 * mount options
290 enum {
291 Opt_notify_timeout,
292 Opt_last_int,
293 /* int args above */
294 Opt_last_string,
295 /* string args above */
298 static match_table_t rbdopt_tokens = {
299 {Opt_notify_timeout, "notify_timeout=%d"},
300 /* int args above */
301 /* string args above */
302 {-1, NULL}
305 static int parse_rbd_opts_token(char *c, void *private)
307 struct rbd_options *rbdopt = private;
308 substring_t argstr[MAX_OPT_ARGS];
309 int token, intval, ret;
311 token = match_token((char *)c, rbdopt_tokens, argstr);
312 if (token < 0)
313 return -EINVAL;
315 if (token < Opt_last_int) {
316 ret = match_int(&argstr[0], &intval);
317 if (ret < 0) {
318 pr_err("bad mount option arg (not int) "
319 "at '%s'\n", c);
320 return ret;
322 dout("got int token %d val %d\n", token, intval);
323 } else if (token > Opt_last_int && token < Opt_last_string) {
324 dout("got string token %d val %s\n", token,
325 argstr[0].from);
326 } else {
327 dout("got token %d\n", token);
330 switch (token) {
331 case Opt_notify_timeout:
332 rbdopt->notify_timeout = intval;
333 break;
334 default:
335 BUG_ON(token);
337 return 0;
341 * Get a ceph client with specific addr and configuration, if one does
342 * not exist create it.
344 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
345 char *options)
347 struct rbd_client *rbdc;
348 struct ceph_options *opt;
349 int ret;
350 struct rbd_options *rbd_opts;
352 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
353 if (!rbd_opts)
354 return -ENOMEM;
356 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
358 ret = ceph_parse_options(&opt, options, mon_addr,
359 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
360 if (ret < 0)
361 goto done_err;
363 spin_lock(&node_lock);
364 rbdc = __rbd_client_find(opt);
365 if (rbdc) {
366 ceph_destroy_options(opt);
368 /* using an existing client */
369 kref_get(&rbdc->kref);
370 rbd_dev->rbd_client = rbdc;
371 rbd_dev->client = rbdc->client;
372 spin_unlock(&node_lock);
373 return 0;
375 spin_unlock(&node_lock);
377 rbdc = rbd_client_create(opt, rbd_opts);
378 if (IS_ERR(rbdc)) {
379 ret = PTR_ERR(rbdc);
380 goto done_err;
383 rbd_dev->rbd_client = rbdc;
384 rbd_dev->client = rbdc->client;
385 return 0;
386 done_err:
387 kfree(rbd_opts);
388 return ret;
392 * Destroy ceph client
394 static void rbd_client_release(struct kref *kref)
396 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
398 dout("rbd_release_client %p\n", rbdc);
399 spin_lock(&node_lock);
400 list_del(&rbdc->node);
401 spin_unlock(&node_lock);
403 ceph_destroy_client(rbdc->client);
404 kfree(rbdc->rbd_opts);
405 kfree(rbdc);
409 * Drop reference to ceph client node. If it's not referenced anymore, release
410 * it.
412 static void rbd_put_client(struct rbd_device *rbd_dev)
414 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
415 rbd_dev->rbd_client = NULL;
416 rbd_dev->client = NULL;
421 * Create a new header structure, translate header format from the on-disk
422 * header.
424 static int rbd_header_from_disk(struct rbd_image_header *header,
425 struct rbd_image_header_ondisk *ondisk,
426 int allocated_snaps,
427 gfp_t gfp_flags)
429 int i;
430 u32 snap_count = le32_to_cpu(ondisk->snap_count);
431 int ret = -ENOMEM;
433 init_rwsem(&header->snap_rwsem);
434 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
435 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
436 snap_count *
437 sizeof(struct rbd_image_snap_ondisk),
438 gfp_flags);
439 if (!header->snapc)
440 return -ENOMEM;
441 if (snap_count) {
442 header->snap_names = kmalloc(header->snap_names_len,
443 GFP_KERNEL);
444 if (!header->snap_names)
445 goto err_snapc;
446 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
447 GFP_KERNEL);
448 if (!header->snap_sizes)
449 goto err_names;
450 } else {
451 header->snap_names = NULL;
452 header->snap_sizes = NULL;
454 memcpy(header->block_name, ondisk->block_name,
455 sizeof(ondisk->block_name));
457 header->image_size = le64_to_cpu(ondisk->image_size);
458 header->obj_order = ondisk->options.order;
459 header->crypt_type = ondisk->options.crypt_type;
460 header->comp_type = ondisk->options.comp_type;
462 atomic_set(&header->snapc->nref, 1);
463 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
464 header->snapc->num_snaps = snap_count;
465 header->total_snaps = snap_count;
467 if (snap_count &&
468 allocated_snaps == snap_count) {
469 for (i = 0; i < snap_count; i++) {
470 header->snapc->snaps[i] =
471 le64_to_cpu(ondisk->snaps[i].id);
472 header->snap_sizes[i] =
473 le64_to_cpu(ondisk->snaps[i].image_size);
476 /* copy snapshot names */
477 memcpy(header->snap_names, &ondisk->snaps[i],
478 header->snap_names_len);
481 return 0;
483 err_names:
484 kfree(header->snap_names);
485 err_snapc:
486 kfree(header->snapc);
487 return ret;
490 static int snap_index(struct rbd_image_header *header, int snap_num)
492 return header->total_snaps - snap_num;
495 static u64 cur_snap_id(struct rbd_device *rbd_dev)
497 struct rbd_image_header *header = &rbd_dev->header;
499 if (!rbd_dev->cur_snap)
500 return 0;
502 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
505 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
506 u64 *seq, u64 *size)
508 int i;
509 char *p = header->snap_names;
511 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
512 if (strcmp(snap_name, p) == 0)
513 break;
515 if (i == header->total_snaps)
516 return -ENOENT;
517 if (seq)
518 *seq = header->snapc->snaps[i];
520 if (size)
521 *size = header->snap_sizes[i];
523 return i;
526 static int rbd_header_set_snap(struct rbd_device *dev,
527 const char *snap_name,
528 u64 *size)
530 struct rbd_image_header *header = &dev->header;
531 struct ceph_snap_context *snapc = header->snapc;
532 int ret = -ENOENT;
534 down_write(&header->snap_rwsem);
536 if (!snap_name ||
537 !*snap_name ||
538 strcmp(snap_name, "-") == 0 ||
539 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
540 if (header->total_snaps)
541 snapc->seq = header->snap_seq;
542 else
543 snapc->seq = 0;
544 dev->cur_snap = 0;
545 dev->read_only = 0;
546 if (size)
547 *size = header->image_size;
548 } else {
549 ret = snap_by_name(header, snap_name, &snapc->seq, size);
550 if (ret < 0)
551 goto done;
553 dev->cur_snap = header->total_snaps - ret;
554 dev->read_only = 1;
557 ret = 0;
558 done:
559 up_write(&header->snap_rwsem);
560 return ret;
563 static void rbd_header_free(struct rbd_image_header *header)
565 kfree(header->snapc);
566 kfree(header->snap_names);
567 kfree(header->snap_sizes);
571 * get the actual striped segment name, offset and length
573 static u64 rbd_get_segment(struct rbd_image_header *header,
574 const char *block_name,
575 u64 ofs, u64 len,
576 char *seg_name, u64 *segofs)
578 u64 seg = ofs >> header->obj_order;
580 if (seg_name)
581 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
582 "%s.%012llx", block_name, seg);
584 ofs = ofs & ((1 << header->obj_order) - 1);
585 len = min_t(u64, len, (1 << header->obj_order) - ofs);
587 if (segofs)
588 *segofs = ofs;
590 return len;
594 * bio helpers
597 static void bio_chain_put(struct bio *chain)
599 struct bio *tmp;
601 while (chain) {
602 tmp = chain;
603 chain = chain->bi_next;
604 bio_put(tmp);
609 * zeros a bio chain, starting at specific offset
611 static void zero_bio_chain(struct bio *chain, int start_ofs)
613 struct bio_vec *bv;
614 unsigned long flags;
615 void *buf;
616 int i;
617 int pos = 0;
619 while (chain) {
620 bio_for_each_segment(bv, chain, i) {
621 if (pos + bv->bv_len > start_ofs) {
622 int remainder = max(start_ofs - pos, 0);
623 buf = bvec_kmap_irq(bv, &flags);
624 memset(buf + remainder, 0,
625 bv->bv_len - remainder);
626 bvec_kunmap_irq(buf, &flags);
628 pos += bv->bv_len;
631 chain = chain->bi_next;
636 * bio_chain_clone - clone a chain of bios up to a certain length.
637 * might return a bio_pair that will need to be released.
639 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
640 struct bio_pair **bp,
641 int len, gfp_t gfpmask)
643 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
644 int total = 0;
646 if (*bp) {
647 bio_pair_release(*bp);
648 *bp = NULL;
651 while (old_chain && (total < len)) {
652 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
653 if (!tmp)
654 goto err_out;
656 if (total + old_chain->bi_size > len) {
657 struct bio_pair *bp;
660 * this split can only happen with a single paged bio,
661 * split_bio will BUG_ON if this is not the case
663 dout("bio_chain_clone split! total=%d remaining=%d"
664 "bi_size=%d\n",
665 (int)total, (int)len-total,
666 (int)old_chain->bi_size);
668 /* split the bio. We'll release it either in the next
669 call, or it will have to be released outside */
670 bp = bio_split(old_chain, (len - total) / 512ULL);
671 if (!bp)
672 goto err_out;
674 __bio_clone(tmp, &bp->bio1);
676 *next = &bp->bio2;
677 } else {
678 __bio_clone(tmp, old_chain);
679 *next = old_chain->bi_next;
682 tmp->bi_bdev = NULL;
683 gfpmask &= ~__GFP_WAIT;
684 tmp->bi_next = NULL;
686 if (!new_chain) {
687 new_chain = tail = tmp;
688 } else {
689 tail->bi_next = tmp;
690 tail = tmp;
692 old_chain = old_chain->bi_next;
694 total += tmp->bi_size;
697 BUG_ON(total < len);
699 if (tail)
700 tail->bi_next = NULL;
702 *old = old_chain;
704 return new_chain;
706 err_out:
707 dout("bio_chain_clone with err\n");
708 bio_chain_put(new_chain);
709 return NULL;
713 * helpers for osd request op vectors.
715 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
716 int num_ops,
717 int opcode,
718 u32 payload_len)
720 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
721 GFP_NOIO);
722 if (!*ops)
723 return -ENOMEM;
724 (*ops)[0].op = opcode;
726 * op extent offset and length will be set later on
727 * in calc_raw_layout()
729 (*ops)[0].payload_len = payload_len;
730 return 0;
733 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
735 kfree(ops);
739 * Send ceph osd request
741 static int rbd_do_request(struct request *rq,
742 struct rbd_device *dev,
743 struct ceph_snap_context *snapc,
744 u64 snapid,
745 const char *obj, u64 ofs, u64 len,
746 struct bio *bio,
747 struct page **pages,
748 int num_pages,
749 int flags,
750 struct ceph_osd_req_op *ops,
751 int num_reply,
752 void (*rbd_cb)(struct ceph_osd_request *req,
753 struct ceph_msg *msg),
754 struct ceph_osd_request **linger_req,
755 u64 *ver)
757 struct ceph_osd_request *req;
758 struct ceph_file_layout *layout;
759 int ret;
760 u64 bno;
761 struct timespec mtime = CURRENT_TIME;
762 struct rbd_request *req_data;
763 struct ceph_osd_request_head *reqhead;
764 struct rbd_image_header *header = &dev->header;
766 ret = -ENOMEM;
767 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
768 if (!req_data)
769 goto done;
771 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
773 down_read(&header->snap_rwsem);
775 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
776 snapc,
777 ops,
778 false,
779 GFP_NOIO, pages, bio);
780 if (IS_ERR(req)) {
781 up_read(&header->snap_rwsem);
782 ret = PTR_ERR(req);
783 goto done_pages;
786 req->r_callback = rbd_cb;
788 req_data->rq = rq;
789 req_data->bio = bio;
790 req_data->pages = pages;
791 req_data->len = len;
793 req->r_priv = req_data;
795 reqhead = req->r_request->front.iov_base;
796 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
798 strncpy(req->r_oid, obj, sizeof(req->r_oid));
799 req->r_oid_len = strlen(req->r_oid);
801 layout = &req->r_file_layout;
802 memset(layout, 0, sizeof(*layout));
803 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
804 layout->fl_stripe_count = cpu_to_le32(1);
805 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
806 layout->fl_pg_preferred = cpu_to_le32(-1);
807 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
808 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
809 ofs, &len, &bno, req, ops);
811 ceph_osdc_build_request(req, ofs, &len,
812 ops,
813 snapc,
814 &mtime,
815 req->r_oid, req->r_oid_len);
816 up_read(&header->snap_rwsem);
818 if (linger_req) {
819 ceph_osdc_set_request_linger(&dev->client->osdc, req);
820 *linger_req = req;
823 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
824 if (ret < 0)
825 goto done_err;
827 if (!rbd_cb) {
828 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
829 if (ver)
830 *ver = le64_to_cpu(req->r_reassert_version.version);
831 dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
832 ceph_osdc_put_request(req);
834 return ret;
836 done_err:
837 bio_chain_put(req_data->bio);
838 ceph_osdc_put_request(req);
839 done_pages:
840 kfree(req_data);
841 done:
842 if (rq)
843 blk_end_request(rq, ret, len);
844 return ret;
848 * Ceph osd op callback
850 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
852 struct rbd_request *req_data = req->r_priv;
853 struct ceph_osd_reply_head *replyhead;
854 struct ceph_osd_op *op;
855 __s32 rc;
856 u64 bytes;
857 int read_op;
859 /* parse reply */
860 replyhead = msg->front.iov_base;
861 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
862 op = (void *)(replyhead + 1);
863 rc = le32_to_cpu(replyhead->result);
864 bytes = le64_to_cpu(op->extent.length);
865 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
867 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
869 if (rc == -ENOENT && read_op) {
870 zero_bio_chain(req_data->bio, 0);
871 rc = 0;
872 } else if (rc == 0 && read_op && bytes < req_data->len) {
873 zero_bio_chain(req_data->bio, bytes);
874 bytes = req_data->len;
877 blk_end_request(req_data->rq, rc, bytes);
879 if (req_data->bio)
880 bio_chain_put(req_data->bio);
882 ceph_osdc_put_request(req);
883 kfree(req_data);
886 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
888 ceph_osdc_put_request(req);
892 * Do a synchronous ceph osd operation
894 static int rbd_req_sync_op(struct rbd_device *dev,
895 struct ceph_snap_context *snapc,
896 u64 snapid,
897 int opcode,
898 int flags,
899 struct ceph_osd_req_op *orig_ops,
900 int num_reply,
901 const char *obj,
902 u64 ofs, u64 len,
903 char *buf,
904 struct ceph_osd_request **linger_req,
905 u64 *ver)
907 int ret;
908 struct page **pages;
909 int num_pages;
910 struct ceph_osd_req_op *ops = orig_ops;
911 u32 payload_len;
913 num_pages = calc_pages_for(ofs , len);
914 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
915 if (IS_ERR(pages))
916 return PTR_ERR(pages);
918 if (!orig_ops) {
919 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
920 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
921 if (ret < 0)
922 goto done;
924 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
925 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
926 if (ret < 0)
927 goto done_ops;
931 ret = rbd_do_request(NULL, dev, snapc, snapid,
932 obj, ofs, len, NULL,
933 pages, num_pages,
934 flags,
935 ops,
937 NULL,
938 linger_req, ver);
939 if (ret < 0)
940 goto done_ops;
942 if ((flags & CEPH_OSD_FLAG_READ) && buf)
943 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
945 done_ops:
946 if (!orig_ops)
947 rbd_destroy_ops(ops);
948 done:
949 ceph_release_page_vector(pages, num_pages);
950 return ret;
954 * Do an asynchronous ceph osd operation
956 static int rbd_do_op(struct request *rq,
957 struct rbd_device *rbd_dev ,
958 struct ceph_snap_context *snapc,
959 u64 snapid,
960 int opcode, int flags, int num_reply,
961 u64 ofs, u64 len,
962 struct bio *bio)
964 char *seg_name;
965 u64 seg_ofs;
966 u64 seg_len;
967 int ret;
968 struct ceph_osd_req_op *ops;
969 u32 payload_len;
971 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
972 if (!seg_name)
973 return -ENOMEM;
975 seg_len = rbd_get_segment(&rbd_dev->header,
976 rbd_dev->header.block_name,
977 ofs, len,
978 seg_name, &seg_ofs);
980 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
982 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
983 if (ret < 0)
984 goto done;
986 /* we've taken care of segment sizes earlier when we
987 cloned the bios. We should never have a segment
988 truncated at this point */
989 BUG_ON(seg_len < len);
991 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
992 seg_name, seg_ofs, seg_len,
993 bio,
994 NULL, 0,
995 flags,
996 ops,
997 num_reply,
998 rbd_req_cb, 0, NULL);
999 done:
1000 kfree(seg_name);
1001 return ret;
1005 * Request async osd write
1007 static int rbd_req_write(struct request *rq,
1008 struct rbd_device *rbd_dev,
1009 struct ceph_snap_context *snapc,
1010 u64 ofs, u64 len,
1011 struct bio *bio)
1013 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1014 CEPH_OSD_OP_WRITE,
1015 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1017 ofs, len, bio);
1021 * Request async osd read
1023 static int rbd_req_read(struct request *rq,
1024 struct rbd_device *rbd_dev,
1025 u64 snapid,
1026 u64 ofs, u64 len,
1027 struct bio *bio)
1029 return rbd_do_op(rq, rbd_dev, NULL,
1030 (snapid ? snapid : CEPH_NOSNAP),
1031 CEPH_OSD_OP_READ,
1032 CEPH_OSD_FLAG_READ,
1034 ofs, len, bio);
1038 * Request sync osd read
1040 static int rbd_req_sync_read(struct rbd_device *dev,
1041 struct ceph_snap_context *snapc,
1042 u64 snapid,
1043 const char *obj,
1044 u64 ofs, u64 len,
1045 char *buf,
1046 u64 *ver)
1048 return rbd_req_sync_op(dev, NULL,
1049 (snapid ? snapid : CEPH_NOSNAP),
1050 CEPH_OSD_OP_READ,
1051 CEPH_OSD_FLAG_READ,
1052 NULL,
1053 1, obj, ofs, len, buf, NULL, ver);
1057 * Request sync osd watch
1059 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1060 u64 ver,
1061 u64 notify_id,
1062 const char *obj)
1064 struct ceph_osd_req_op *ops;
1065 struct page **pages = NULL;
1066 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1067 if (ret < 0)
1068 return ret;
1070 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1071 ops[0].watch.cookie = notify_id;
1072 ops[0].watch.flag = 0;
1074 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1075 obj, 0, 0, NULL,
1076 pages, 0,
1077 CEPH_OSD_FLAG_READ,
1078 ops,
1080 rbd_simple_req_cb, 0, NULL);
1082 rbd_destroy_ops(ops);
1083 return ret;
1086 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1088 struct rbd_device *dev = (struct rbd_device *)data;
1089 if (!dev)
1090 return;
1092 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1093 notify_id, (int)opcode);
1094 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1095 __rbd_update_snaps(dev);
1096 mutex_unlock(&ctl_mutex);
1098 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1102 * Request sync osd watch
1104 static int rbd_req_sync_watch(struct rbd_device *dev,
1105 const char *obj,
1106 u64 ver)
1108 struct ceph_osd_req_op *ops;
1109 struct ceph_osd_client *osdc = &dev->client->osdc;
1111 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1112 if (ret < 0)
1113 return ret;
1115 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1116 (void *)dev, &dev->watch_event);
1117 if (ret < 0)
1118 goto fail;
1120 ops[0].watch.ver = cpu_to_le64(ver);
1121 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1122 ops[0].watch.flag = 1;
1124 ret = rbd_req_sync_op(dev, NULL,
1125 CEPH_NOSNAP,
1127 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1128 ops,
1129 1, obj, 0, 0, NULL,
1130 &dev->watch_request, NULL);
1132 if (ret < 0)
1133 goto fail_event;
1135 rbd_destroy_ops(ops);
1136 return 0;
1138 fail_event:
1139 ceph_osdc_cancel_event(dev->watch_event);
1140 dev->watch_event = NULL;
1141 fail:
1142 rbd_destroy_ops(ops);
1143 return ret;
1146 struct rbd_notify_info {
1147 struct rbd_device *dev;
1150 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1152 struct rbd_device *dev = (struct rbd_device *)data;
1153 if (!dev)
1154 return;
1156 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1157 notify_id, (int)opcode);
1161 * Request sync osd notify
1163 static int rbd_req_sync_notify(struct rbd_device *dev,
1164 const char *obj)
1166 struct ceph_osd_req_op *ops;
1167 struct ceph_osd_client *osdc = &dev->client->osdc;
1168 struct ceph_osd_event *event;
1169 struct rbd_notify_info info;
1170 int payload_len = sizeof(u32) + sizeof(u32);
1171 int ret;
1173 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1174 if (ret < 0)
1175 return ret;
1177 info.dev = dev;
1179 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1180 (void *)&info, &event);
1181 if (ret < 0)
1182 goto fail;
1184 ops[0].watch.ver = 1;
1185 ops[0].watch.flag = 1;
1186 ops[0].watch.cookie = event->cookie;
1187 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1188 ops[0].watch.timeout = 12;
1190 ret = rbd_req_sync_op(dev, NULL,
1191 CEPH_NOSNAP,
1193 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1194 ops,
1195 1, obj, 0, 0, NULL, NULL, NULL);
1196 if (ret < 0)
1197 goto fail_event;
1199 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1200 dout("ceph_osdc_wait_event returned %d\n", ret);
1201 rbd_destroy_ops(ops);
1202 return 0;
1204 fail_event:
1205 ceph_osdc_cancel_event(event);
1206 fail:
1207 rbd_destroy_ops(ops);
1208 return ret;
1212 * Request sync osd rollback
1214 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1215 u64 snapid,
1216 const char *obj)
1218 struct ceph_osd_req_op *ops;
1219 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1220 if (ret < 0)
1221 return ret;
1223 ops[0].snap.snapid = snapid;
1225 ret = rbd_req_sync_op(dev, NULL,
1226 CEPH_NOSNAP,
1228 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1229 ops,
1230 1, obj, 0, 0, NULL, NULL, NULL);
1232 rbd_destroy_ops(ops);
1234 return ret;
1238 * Request sync osd read
1240 static int rbd_req_sync_exec(struct rbd_device *dev,
1241 const char *obj,
1242 const char *cls,
1243 const char *method,
1244 const char *data,
1245 int len,
1246 u64 *ver)
1248 struct ceph_osd_req_op *ops;
1249 int cls_len = strlen(cls);
1250 int method_len = strlen(method);
1251 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1252 cls_len + method_len + len);
1253 if (ret < 0)
1254 return ret;
1256 ops[0].cls.class_name = cls;
1257 ops[0].cls.class_len = (__u8)cls_len;
1258 ops[0].cls.method_name = method;
1259 ops[0].cls.method_len = (__u8)method_len;
1260 ops[0].cls.argc = 0;
1261 ops[0].cls.indata = data;
1262 ops[0].cls.indata_len = len;
1264 ret = rbd_req_sync_op(dev, NULL,
1265 CEPH_NOSNAP,
1267 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1268 ops,
1269 1, obj, 0, 0, NULL, NULL, ver);
1271 rbd_destroy_ops(ops);
1273 dout("cls_exec returned %d\n", ret);
1274 return ret;
1278 * block device queue callback
1280 static void rbd_rq_fn(struct request_queue *q)
1282 struct rbd_device *rbd_dev = q->queuedata;
1283 struct request *rq;
1284 struct bio_pair *bp = NULL;
1286 rq = blk_fetch_request(q);
1288 while (1) {
1289 struct bio *bio;
1290 struct bio *rq_bio, *next_bio = NULL;
1291 bool do_write;
1292 int size, op_size = 0;
1293 u64 ofs;
1295 /* peek at request from block layer */
1296 if (!rq)
1297 break;
1299 dout("fetched request\n");
1301 /* filter out block requests we don't understand */
1302 if ((rq->cmd_type != REQ_TYPE_FS)) {
1303 __blk_end_request_all(rq, 0);
1304 goto next;
1307 /* deduce our operation (read, write) */
1308 do_write = (rq_data_dir(rq) == WRITE);
1310 size = blk_rq_bytes(rq);
1311 ofs = blk_rq_pos(rq) * 512ULL;
1312 rq_bio = rq->bio;
1313 if (do_write && rbd_dev->read_only) {
1314 __blk_end_request_all(rq, -EROFS);
1315 goto next;
1318 spin_unlock_irq(q->queue_lock);
1320 dout("%s 0x%x bytes at 0x%llx\n",
1321 do_write ? "write" : "read",
1322 size, blk_rq_pos(rq) * 512ULL);
1324 do {
1325 /* a bio clone to be passed down to OSD req */
1326 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1327 op_size = rbd_get_segment(&rbd_dev->header,
1328 rbd_dev->header.block_name,
1329 ofs, size,
1330 NULL, NULL);
1331 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1332 op_size, GFP_ATOMIC);
1333 if (!bio) {
1334 spin_lock_irq(q->queue_lock);
1335 __blk_end_request_all(rq, -ENOMEM);
1336 goto next;
1339 /* init OSD command: write or read */
1340 if (do_write)
1341 rbd_req_write(rq, rbd_dev,
1342 rbd_dev->header.snapc,
1343 ofs,
1344 op_size, bio);
1345 else
1346 rbd_req_read(rq, rbd_dev,
1347 cur_snap_id(rbd_dev),
1348 ofs,
1349 op_size, bio);
1351 size -= op_size;
1352 ofs += op_size;
1354 rq_bio = next_bio;
1355 } while (size > 0);
1357 if (bp)
1358 bio_pair_release(bp);
1360 spin_lock_irq(q->queue_lock);
1361 next:
1362 rq = blk_fetch_request(q);
1367 * a queue callback. Makes sure that we don't create a bio that spans across
1368 * multiple osd objects. One exception would be with a single page bios,
1369 * which we handle later at bio_chain_clone
1371 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1372 struct bio_vec *bvec)
1374 struct rbd_device *rbd_dev = q->queuedata;
1375 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1376 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1377 unsigned int bio_sectors = bmd->bi_size >> 9;
1378 int max;
1380 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1381 + bio_sectors)) << 9;
1382 if (max < 0)
1383 max = 0; /* bio_add cannot handle a negative return */
1384 if (max <= bvec->bv_len && bio_sectors == 0)
1385 return bvec->bv_len;
1386 return max;
1389 static void rbd_free_disk(struct rbd_device *rbd_dev)
1391 struct gendisk *disk = rbd_dev->disk;
1393 if (!disk)
1394 return;
1396 rbd_header_free(&rbd_dev->header);
1398 if (disk->flags & GENHD_FL_UP)
1399 del_gendisk(disk);
1400 if (disk->queue)
1401 blk_cleanup_queue(disk->queue);
1402 put_disk(disk);
1406 * reload the ondisk the header
1408 static int rbd_read_header(struct rbd_device *rbd_dev,
1409 struct rbd_image_header *header)
1411 ssize_t rc;
1412 struct rbd_image_header_ondisk *dh;
1413 int snap_count = 0;
1414 u64 snap_names_len = 0;
1415 u64 ver;
1417 while (1) {
1418 int len = sizeof(*dh) +
1419 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1420 snap_names_len;
1422 rc = -ENOMEM;
1423 dh = kmalloc(len, GFP_KERNEL);
1424 if (!dh)
1425 return -ENOMEM;
1427 rc = rbd_req_sync_read(rbd_dev,
1428 NULL, CEPH_NOSNAP,
1429 rbd_dev->obj_md_name,
1430 0, len,
1431 (char *)dh, &ver);
1432 if (rc < 0)
1433 goto out_dh;
1435 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1436 if (rc < 0)
1437 goto out_dh;
1439 if (snap_count != header->total_snaps) {
1440 snap_count = header->total_snaps;
1441 snap_names_len = header->snap_names_len;
1442 rbd_header_free(header);
1443 kfree(dh);
1444 continue;
1446 break;
1448 header->obj_version = ver;
1450 out_dh:
1451 kfree(dh);
1452 return rc;
1456 * create a snapshot
1458 static int rbd_header_add_snap(struct rbd_device *dev,
1459 const char *snap_name,
1460 gfp_t gfp_flags)
1462 int name_len = strlen(snap_name);
1463 u64 new_snapid;
1464 int ret;
1465 void *data, *data_start, *data_end;
1466 u64 ver;
1468 /* we should create a snapshot only if we're pointing at the head */
1469 if (dev->cur_snap)
1470 return -EINVAL;
1472 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1473 &new_snapid);
1474 dout("created snapid=%lld\n", new_snapid);
1475 if (ret < 0)
1476 return ret;
1478 data = kmalloc(name_len + 16, gfp_flags);
1479 if (!data)
1480 return -ENOMEM;
1482 data_start = data;
1483 data_end = data + name_len + 16;
1485 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1486 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1488 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1489 data_start, data - data_start, &ver);
1491 kfree(data_start);
1493 if (ret < 0)
1494 return ret;
1496 dev->header.snapc->seq = new_snapid;
1498 return 0;
1499 bad:
1500 return -ERANGE;
1503 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1505 struct rbd_snap *snap;
1507 while (!list_empty(&rbd_dev->snaps)) {
1508 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1509 __rbd_remove_snap_dev(rbd_dev, snap);
1514 * only read the first part of the ondisk header, without the snaps info
1516 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1518 int ret;
1519 struct rbd_image_header h;
1520 u64 snap_seq;
1521 int follow_seq = 0;
1523 ret = rbd_read_header(rbd_dev, &h);
1524 if (ret < 0)
1525 return ret;
1527 down_write(&rbd_dev->header.snap_rwsem);
1529 snap_seq = rbd_dev->header.snapc->seq;
1530 if (rbd_dev->header.total_snaps &&
1531 rbd_dev->header.snapc->snaps[0] == snap_seq)
1532 /* pointing at the head, will need to follow that
1533 if head moves */
1534 follow_seq = 1;
1536 kfree(rbd_dev->header.snapc);
1537 kfree(rbd_dev->header.snap_names);
1538 kfree(rbd_dev->header.snap_sizes);
1540 rbd_dev->header.total_snaps = h.total_snaps;
1541 rbd_dev->header.snapc = h.snapc;
1542 rbd_dev->header.snap_names = h.snap_names;
1543 rbd_dev->header.snap_names_len = h.snap_names_len;
1544 rbd_dev->header.snap_sizes = h.snap_sizes;
1545 if (follow_seq)
1546 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1547 else
1548 rbd_dev->header.snapc->seq = snap_seq;
1550 ret = __rbd_init_snaps_header(rbd_dev);
1552 up_write(&rbd_dev->header.snap_rwsem);
1554 return ret;
1557 static int rbd_init_disk(struct rbd_device *rbd_dev)
1559 struct gendisk *disk;
1560 struct request_queue *q;
1561 int rc;
1562 u64 total_size = 0;
1564 /* contact OSD, request size info about the object being mapped */
1565 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1566 if (rc)
1567 return rc;
1569 /* no need to lock here, as rbd_dev is not registered yet */
1570 rc = __rbd_init_snaps_header(rbd_dev);
1571 if (rc)
1572 return rc;
1574 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1575 if (rc)
1576 return rc;
1578 /* create gendisk info */
1579 rc = -ENOMEM;
1580 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1581 if (!disk)
1582 goto out;
1584 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1585 disk->major = rbd_dev->major;
1586 disk->first_minor = 0;
1587 disk->fops = &rbd_bd_ops;
1588 disk->private_data = rbd_dev;
1590 /* init rq */
1591 rc = -ENOMEM;
1592 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1593 if (!q)
1594 goto out_disk;
1595 blk_queue_merge_bvec(q, rbd_merge_bvec);
1596 disk->queue = q;
1598 q->queuedata = rbd_dev;
1600 rbd_dev->disk = disk;
1601 rbd_dev->q = q;
1603 /* finally, announce the disk to the world */
1604 set_capacity(disk, total_size / 512ULL);
1605 add_disk(disk);
1607 pr_info("%s: added with size 0x%llx\n",
1608 disk->disk_name, (unsigned long long)total_size);
1609 return 0;
1611 out_disk:
1612 put_disk(disk);
1613 out:
1614 return rc;
1618 sysfs
1621 static ssize_t rbd_size_show(struct device *dev,
1622 struct device_attribute *attr, char *buf)
1624 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1626 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1629 static ssize_t rbd_major_show(struct device *dev,
1630 struct device_attribute *attr, char *buf)
1632 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1634 return sprintf(buf, "%d\n", rbd_dev->major);
1637 static ssize_t rbd_client_id_show(struct device *dev,
1638 struct device_attribute *attr, char *buf)
1640 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1642 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1645 static ssize_t rbd_pool_show(struct device *dev,
1646 struct device_attribute *attr, char *buf)
1648 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1650 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1653 static ssize_t rbd_name_show(struct device *dev,
1654 struct device_attribute *attr, char *buf)
1656 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1658 return sprintf(buf, "%s\n", rbd_dev->obj);
1661 static ssize_t rbd_snap_show(struct device *dev,
1662 struct device_attribute *attr,
1663 char *buf)
1665 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1667 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1670 static ssize_t rbd_image_refresh(struct device *dev,
1671 struct device_attribute *attr,
1672 const char *buf,
1673 size_t size)
1675 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1676 int rc;
1677 int ret = size;
1679 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1681 rc = __rbd_update_snaps(rbd_dev);
1682 if (rc < 0)
1683 ret = rc;
1685 mutex_unlock(&ctl_mutex);
1686 return ret;
1689 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1690 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1691 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1692 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1693 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1694 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1695 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1696 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1697 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1699 static struct attribute *rbd_attrs[] = {
1700 &dev_attr_size.attr,
1701 &dev_attr_major.attr,
1702 &dev_attr_client_id.attr,
1703 &dev_attr_pool.attr,
1704 &dev_attr_name.attr,
1705 &dev_attr_current_snap.attr,
1706 &dev_attr_refresh.attr,
1707 &dev_attr_create_snap.attr,
1708 &dev_attr_rollback_snap.attr,
1709 NULL
1712 static struct attribute_group rbd_attr_group = {
1713 .attrs = rbd_attrs,
1716 static const struct attribute_group *rbd_attr_groups[] = {
1717 &rbd_attr_group,
1718 NULL
1721 static void rbd_sysfs_dev_release(struct device *dev)
1725 static struct device_type rbd_device_type = {
1726 .name = "rbd",
1727 .groups = rbd_attr_groups,
1728 .release = rbd_sysfs_dev_release,
1733 sysfs - snapshots
1736 static ssize_t rbd_snap_size_show(struct device *dev,
1737 struct device_attribute *attr,
1738 char *buf)
1740 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1742 return sprintf(buf, "%lld\n", (long long)snap->size);
1745 static ssize_t rbd_snap_id_show(struct device *dev,
1746 struct device_attribute *attr,
1747 char *buf)
1749 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1751 return sprintf(buf, "%lld\n", (long long)snap->id);
1754 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1755 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1757 static struct attribute *rbd_snap_attrs[] = {
1758 &dev_attr_snap_size.attr,
1759 &dev_attr_snap_id.attr,
1760 NULL,
1763 static struct attribute_group rbd_snap_attr_group = {
1764 .attrs = rbd_snap_attrs,
1767 static void rbd_snap_dev_release(struct device *dev)
1769 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1770 kfree(snap->name);
1771 kfree(snap);
1774 static const struct attribute_group *rbd_snap_attr_groups[] = {
1775 &rbd_snap_attr_group,
1776 NULL
1779 static struct device_type rbd_snap_device_type = {
1780 .groups = rbd_snap_attr_groups,
1781 .release = rbd_snap_dev_release,
1784 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1785 struct rbd_snap *snap)
1787 list_del(&snap->node);
1788 device_unregister(&snap->dev);
1791 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1792 struct rbd_snap *snap,
1793 struct device *parent)
1795 struct device *dev = &snap->dev;
1796 int ret;
1798 dev->type = &rbd_snap_device_type;
1799 dev->parent = parent;
1800 dev->release = rbd_snap_dev_release;
1801 dev_set_name(dev, "snap_%s", snap->name);
1802 ret = device_register(dev);
1804 return ret;
1807 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1808 int i, const char *name,
1809 struct rbd_snap **snapp)
1811 int ret;
1812 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1813 if (!snap)
1814 return -ENOMEM;
1815 snap->name = kstrdup(name, GFP_KERNEL);
1816 snap->size = rbd_dev->header.snap_sizes[i];
1817 snap->id = rbd_dev->header.snapc->snaps[i];
1818 if (device_is_registered(&rbd_dev->dev)) {
1819 ret = rbd_register_snap_dev(rbd_dev, snap,
1820 &rbd_dev->dev);
1821 if (ret < 0)
1822 goto err;
1824 *snapp = snap;
1825 return 0;
1826 err:
1827 kfree(snap->name);
1828 kfree(snap);
1829 return ret;
1833 * search for the previous snap in a null delimited string list
1835 const char *rbd_prev_snap_name(const char *name, const char *start)
1837 if (name < start + 2)
1838 return NULL;
1840 name -= 2;
1841 while (*name) {
1842 if (name == start)
1843 return start;
1844 name--;
1846 return name + 1;
1850 * compare the old list of snapshots that we have to what's in the header
1851 * and update it accordingly. Note that the header holds the snapshots
1852 * in a reverse order (from newest to oldest) and we need to go from
1853 * older to new so that we don't get a duplicate snap name when
1854 * doing the process (e.g., removed snapshot and recreated a new
1855 * one with the same name.
1857 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1859 const char *name, *first_name;
1860 int i = rbd_dev->header.total_snaps;
1861 struct rbd_snap *snap, *old_snap = NULL;
1862 int ret;
1863 struct list_head *p, *n;
1865 first_name = rbd_dev->header.snap_names;
1866 name = first_name + rbd_dev->header.snap_names_len;
1868 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1869 u64 cur_id;
1871 old_snap = list_entry(p, struct rbd_snap, node);
1873 if (i)
1874 cur_id = rbd_dev->header.snapc->snaps[i - 1];
1876 if (!i || old_snap->id < cur_id) {
1877 /* old_snap->id was skipped, thus was removed */
1878 __rbd_remove_snap_dev(rbd_dev, old_snap);
1879 continue;
1881 if (old_snap->id == cur_id) {
1882 /* we have this snapshot already */
1883 i--;
1884 name = rbd_prev_snap_name(name, first_name);
1885 continue;
1887 for (; i > 0;
1888 i--, name = rbd_prev_snap_name(name, first_name)) {
1889 if (!name) {
1890 WARN_ON(1);
1891 return -EINVAL;
1893 cur_id = rbd_dev->header.snapc->snaps[i];
1894 /* snapshot removal? handle it above */
1895 if (cur_id >= old_snap->id)
1896 break;
1897 /* a new snapshot */
1898 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1899 if (ret < 0)
1900 return ret;
1902 /* note that we add it backward so using n and not p */
1903 list_add(&snap->node, n);
1904 p = &snap->node;
1907 /* we're done going over the old snap list, just add what's left */
1908 for (; i > 0; i--) {
1909 name = rbd_prev_snap_name(name, first_name);
1910 if (!name) {
1911 WARN_ON(1);
1912 return -EINVAL;
1914 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1915 if (ret < 0)
1916 return ret;
1917 list_add(&snap->node, &rbd_dev->snaps);
1920 return 0;
1924 static void rbd_root_dev_release(struct device *dev)
1928 static struct device rbd_root_dev = {
1929 .init_name = "rbd",
1930 .release = rbd_root_dev_release,
1933 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1935 int ret = -ENOMEM;
1936 struct device *dev;
1937 struct rbd_snap *snap;
1939 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1940 dev = &rbd_dev->dev;
1942 dev->bus = &rbd_bus_type;
1943 dev->type = &rbd_device_type;
1944 dev->parent = &rbd_root_dev;
1945 dev->release = rbd_dev_release;
1946 dev_set_name(dev, "%d", rbd_dev->id);
1947 ret = device_register(dev);
1948 if (ret < 0)
1949 goto done_free;
1951 list_for_each_entry(snap, &rbd_dev->snaps, node) {
1952 ret = rbd_register_snap_dev(rbd_dev, snap,
1953 &rbd_dev->dev);
1954 if (ret < 0)
1955 break;
1958 mutex_unlock(&ctl_mutex);
1959 return 0;
1960 done_free:
1961 mutex_unlock(&ctl_mutex);
1962 return ret;
1965 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1967 device_unregister(&rbd_dev->dev);
1970 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
1972 int ret, rc;
1974 do {
1975 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
1976 rbd_dev->header.obj_version);
1977 if (ret == -ERANGE) {
1978 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1979 rc = __rbd_update_snaps(rbd_dev);
1980 mutex_unlock(&ctl_mutex);
1981 if (rc < 0)
1982 return rc;
1984 } while (ret == -ERANGE);
1986 return ret;
1989 static ssize_t rbd_add(struct bus_type *bus,
1990 const char *buf,
1991 size_t count)
1993 struct ceph_osd_client *osdc;
1994 struct rbd_device *rbd_dev;
1995 ssize_t rc = -ENOMEM;
1996 int irc, new_id = 0;
1997 struct list_head *tmp;
1998 char *mon_dev_name;
1999 char *options;
2001 if (!try_module_get(THIS_MODULE))
2002 return -ENODEV;
2004 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2005 if (!mon_dev_name)
2006 goto err_out_mod;
2008 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2009 if (!options)
2010 goto err_mon_dev;
2012 /* new rbd_device object */
2013 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2014 if (!rbd_dev)
2015 goto err_out_opt;
2017 /* static rbd_device initialization */
2018 spin_lock_init(&rbd_dev->lock);
2019 INIT_LIST_HEAD(&rbd_dev->node);
2020 INIT_LIST_HEAD(&rbd_dev->snaps);
2022 /* generate unique id: find highest unique id, add one */
2023 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2025 list_for_each(tmp, &rbd_dev_list) {
2026 struct rbd_device *rbd_dev;
2028 rbd_dev = list_entry(tmp, struct rbd_device, node);
2029 if (rbd_dev->id >= new_id)
2030 new_id = rbd_dev->id + 1;
2033 rbd_dev->id = new_id;
2035 /* add to global list */
2036 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2038 /* parse add command */
2039 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2040 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2041 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2042 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2043 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2044 mon_dev_name, options, rbd_dev->pool_name,
2045 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2046 rc = -EINVAL;
2047 goto err_out_slot;
2050 if (rbd_dev->snap_name[0] == 0)
2051 rbd_dev->snap_name[0] = '-';
2053 rbd_dev->obj_len = strlen(rbd_dev->obj);
2054 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2055 rbd_dev->obj, RBD_SUFFIX);
2057 /* initialize rest of new object */
2058 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2059 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2060 if (rc < 0)
2061 goto err_out_slot;
2063 mutex_unlock(&ctl_mutex);
2065 /* pick the pool */
2066 osdc = &rbd_dev->client->osdc;
2067 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2068 if (rc < 0)
2069 goto err_out_client;
2070 rbd_dev->poolid = rc;
2072 /* register our block device */
2073 irc = register_blkdev(0, rbd_dev->name);
2074 if (irc < 0) {
2075 rc = irc;
2076 goto err_out_client;
2078 rbd_dev->major = irc;
2080 rc = rbd_bus_add_dev(rbd_dev);
2081 if (rc)
2082 goto err_out_blkdev;
2084 /* set up and announce blkdev mapping */
2085 rc = rbd_init_disk(rbd_dev);
2086 if (rc)
2087 goto err_out_bus;
2089 rc = rbd_init_watch_dev(rbd_dev);
2090 if (rc)
2091 goto err_out_bus;
2093 return count;
2095 err_out_bus:
2096 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2097 list_del_init(&rbd_dev->node);
2098 mutex_unlock(&ctl_mutex);
2100 /* this will also clean up rest of rbd_dev stuff */
2102 rbd_bus_del_dev(rbd_dev);
2103 kfree(options);
2104 kfree(mon_dev_name);
2105 return rc;
2107 err_out_blkdev:
2108 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2109 err_out_client:
2110 rbd_put_client(rbd_dev);
2111 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2112 err_out_slot:
2113 list_del_init(&rbd_dev->node);
2114 mutex_unlock(&ctl_mutex);
2116 kfree(rbd_dev);
2117 err_out_opt:
2118 kfree(options);
2119 err_mon_dev:
2120 kfree(mon_dev_name);
2121 err_out_mod:
2122 dout("Error adding device %s\n", buf);
2123 module_put(THIS_MODULE);
2124 return rc;
2127 static struct rbd_device *__rbd_get_dev(unsigned long id)
2129 struct list_head *tmp;
2130 struct rbd_device *rbd_dev;
2132 list_for_each(tmp, &rbd_dev_list) {
2133 rbd_dev = list_entry(tmp, struct rbd_device, node);
2134 if (rbd_dev->id == id)
2135 return rbd_dev;
2137 return NULL;
2140 static void rbd_dev_release(struct device *dev)
2142 struct rbd_device *rbd_dev =
2143 container_of(dev, struct rbd_device, dev);
2145 if (rbd_dev->watch_request)
2146 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2147 rbd_dev->watch_request);
2148 if (rbd_dev->watch_event)
2149 ceph_osdc_cancel_event(rbd_dev->watch_event);
2151 rbd_put_client(rbd_dev);
2153 /* clean up and free blkdev */
2154 rbd_free_disk(rbd_dev);
2155 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2156 kfree(rbd_dev);
2158 /* release module ref */
2159 module_put(THIS_MODULE);
2162 static ssize_t rbd_remove(struct bus_type *bus,
2163 const char *buf,
2164 size_t count)
2166 struct rbd_device *rbd_dev = NULL;
2167 int target_id, rc;
2168 unsigned long ul;
2169 int ret = count;
2171 rc = strict_strtoul(buf, 10, &ul);
2172 if (rc)
2173 return rc;
2175 /* convert to int; abort if we lost anything in the conversion */
2176 target_id = (int) ul;
2177 if (target_id != ul)
2178 return -EINVAL;
2180 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2182 rbd_dev = __rbd_get_dev(target_id);
2183 if (!rbd_dev) {
2184 ret = -ENOENT;
2185 goto done;
2188 list_del_init(&rbd_dev->node);
2190 __rbd_remove_all_snaps(rbd_dev);
2191 rbd_bus_del_dev(rbd_dev);
2193 done:
2194 mutex_unlock(&ctl_mutex);
2195 return ret;
2198 static ssize_t rbd_snap_add(struct device *dev,
2199 struct device_attribute *attr,
2200 const char *buf,
2201 size_t count)
2203 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2204 int ret;
2205 char *name = kmalloc(count + 1, GFP_KERNEL);
2206 if (!name)
2207 return -ENOMEM;
2209 snprintf(name, count, "%s", buf);
2211 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2213 ret = rbd_header_add_snap(rbd_dev,
2214 name, GFP_KERNEL);
2215 if (ret < 0)
2216 goto err_unlock;
2218 ret = __rbd_update_snaps(rbd_dev);
2219 if (ret < 0)
2220 goto err_unlock;
2222 /* shouldn't hold ctl_mutex when notifying.. notify might
2223 trigger a watch callback that would need to get that mutex */
2224 mutex_unlock(&ctl_mutex);
2226 /* make a best effort, don't error if failed */
2227 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2229 ret = count;
2230 kfree(name);
2231 return ret;
2233 err_unlock:
2234 mutex_unlock(&ctl_mutex);
2235 kfree(name);
2236 return ret;
2239 static ssize_t rbd_snap_rollback(struct device *dev,
2240 struct device_attribute *attr,
2241 const char *buf,
2242 size_t count)
2244 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2245 int ret;
2246 u64 snapid;
2247 u64 cur_ofs;
2248 char *seg_name = NULL;
2249 char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2250 ret = -ENOMEM;
2251 if (!snap_name)
2252 return ret;
2254 /* parse snaps add command */
2255 snprintf(snap_name, count, "%s", buf);
2256 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2257 if (!seg_name)
2258 goto done;
2260 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2262 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2263 if (ret < 0)
2264 goto done_unlock;
2266 dout("snapid=%lld\n", snapid);
2268 cur_ofs = 0;
2269 while (cur_ofs < rbd_dev->header.image_size) {
2270 cur_ofs += rbd_get_segment(&rbd_dev->header,
2271 rbd_dev->obj,
2272 cur_ofs, (u64)-1,
2273 seg_name, NULL);
2274 dout("seg_name=%s\n", seg_name);
2276 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2277 if (ret < 0)
2278 pr_warning("could not roll back obj %s err=%d\n",
2279 seg_name, ret);
2282 ret = __rbd_update_snaps(rbd_dev);
2283 if (ret < 0)
2284 goto done_unlock;
2286 ret = count;
2288 done_unlock:
2289 mutex_unlock(&ctl_mutex);
2290 done:
2291 kfree(seg_name);
2292 kfree(snap_name);
2294 return ret;
2297 static struct bus_attribute rbd_bus_attrs[] = {
2298 __ATTR(add, S_IWUSR, NULL, rbd_add),
2299 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2300 __ATTR_NULL
2304 * create control files in sysfs
2305 * /sys/bus/rbd/...
2307 static int rbd_sysfs_init(void)
2309 int ret;
2311 rbd_bus_type.bus_attrs = rbd_bus_attrs;
2313 ret = bus_register(&rbd_bus_type);
2314 if (ret < 0)
2315 return ret;
2317 ret = device_register(&rbd_root_dev);
2319 return ret;
2322 static void rbd_sysfs_cleanup(void)
2324 device_unregister(&rbd_root_dev);
2325 bus_unregister(&rbd_bus_type);
2328 int __init rbd_init(void)
2330 int rc;
2332 rc = rbd_sysfs_init();
2333 if (rc)
2334 return rc;
2335 spin_lock_init(&node_lock);
2336 pr_info("loaded " DRV_NAME_LONG "\n");
2337 return 0;
2340 void __exit rbd_exit(void)
2342 rbd_sysfs_cleanup();
2345 module_init(rbd_init);
2346 module_exit(rbd_exit);
2348 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2349 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2350 MODULE_DESCRIPTION("rados block device");
2352 /* following authorship retained from original osdblk.c */
2353 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2355 MODULE_LICENSE("GPL");