Merge branch 'media_fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/mchehab...
[cris-mirror.git] / net / ceph / osd_client.c
blob3e20a122ffa2f2bf40a91596fe12bb93e6874b64
1 #include <linux/ceph/ceph_debug.h>
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
21 #define OSD_OP_FRONT_LEN 4096
22 #define OSD_OPREPLY_FRONT_LEN 512
24 static const struct ceph_connection_operations osd_con_ops;
25 static int __kick_requests(struct ceph_osd_client *osdc,
26 struct ceph_osd *kickosd);
28 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
30 static int op_needs_trail(int op)
32 switch (op) {
33 case CEPH_OSD_OP_GETXATTR:
34 case CEPH_OSD_OP_SETXATTR:
35 case CEPH_OSD_OP_CMPXATTR:
36 case CEPH_OSD_OP_CALL:
37 return 1;
38 default:
39 return 0;
43 static int op_has_extent(int op)
45 return (op == CEPH_OSD_OP_READ ||
46 op == CEPH_OSD_OP_WRITE);
49 void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
50 struct ceph_file_layout *layout,
51 u64 snapid,
52 u64 off, u64 *plen, u64 *bno,
53 struct ceph_osd_request *req,
54 struct ceph_osd_req_op *op)
56 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
57 u64 orig_len = *plen;
58 u64 objoff, objlen; /* extent in object */
60 reqhead->snapid = cpu_to_le64(snapid);
62 /* object extent? */
63 ceph_calc_file_object_mapping(layout, off, plen, bno,
64 &objoff, &objlen);
65 if (*plen < orig_len)
66 dout(" skipping last %llu, final file extent %llu~%llu\n",
67 orig_len - *plen, off, *plen);
69 if (op_has_extent(op->op)) {
70 op->extent.offset = objoff;
71 op->extent.length = objlen;
73 req->r_num_pages = calc_pages_for(off, *plen);
74 req->r_page_alignment = off & ~PAGE_MASK;
75 if (op->op == CEPH_OSD_OP_WRITE)
76 op->payload_len = *plen;
78 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
79 *bno, objoff, objlen, req->r_num_pages);
82 EXPORT_SYMBOL(ceph_calc_raw_layout);
85 * Implement client access to distributed object storage cluster.
87 * All data objects are stored within a cluster/cloud of OSDs, or
88 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
89 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
90 * remote daemons serving up and coordinating consistent and safe
91 * access to storage.
93 * Cluster membership and the mapping of data objects onto storage devices
94 * are described by the osd map.
96 * We keep track of pending OSD requests (read, write), resubmit
97 * requests to different OSDs when the cluster topology/data layout
98 * change, or retry the affected requests when the communications
99 * channel with an OSD is reset.
103 * calculate the mapping of a file extent onto an object, and fill out the
104 * request accordingly. shorten extent as necessary if it crosses an
105 * object boundary.
107 * fill osd op in request message.
109 static void calc_layout(struct ceph_osd_client *osdc,
110 struct ceph_vino vino,
111 struct ceph_file_layout *layout,
112 u64 off, u64 *plen,
113 struct ceph_osd_request *req,
114 struct ceph_osd_req_op *op)
116 u64 bno;
118 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
119 plen, &bno, req, op);
121 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
122 req->r_oid_len = strlen(req->r_oid);
126 * requests
128 void ceph_osdc_release_request(struct kref *kref)
130 struct ceph_osd_request *req = container_of(kref,
131 struct ceph_osd_request,
132 r_kref);
134 if (req->r_request)
135 ceph_msg_put(req->r_request);
136 if (req->r_reply)
137 ceph_msg_put(req->r_reply);
138 if (req->r_con_filling_msg) {
139 dout("release_request revoking pages %p from con %p\n",
140 req->r_pages, req->r_con_filling_msg);
141 ceph_con_revoke_message(req->r_con_filling_msg,
142 req->r_reply);
143 ceph_con_put(req->r_con_filling_msg);
145 if (req->r_own_pages)
146 ceph_release_page_vector(req->r_pages,
147 req->r_num_pages);
148 #ifdef CONFIG_BLOCK
149 if (req->r_bio)
150 bio_put(req->r_bio);
151 #endif
152 ceph_put_snap_context(req->r_snapc);
153 if (req->r_trail) {
154 ceph_pagelist_release(req->r_trail);
155 kfree(req->r_trail);
157 if (req->r_mempool)
158 mempool_free(req, req->r_osdc->req_mempool);
159 else
160 kfree(req);
162 EXPORT_SYMBOL(ceph_osdc_release_request);
164 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
166 int i = 0;
168 if (needs_trail)
169 *needs_trail = 0;
170 while (ops[i].op) {
171 if (needs_trail && op_needs_trail(ops[i].op))
172 *needs_trail = 1;
173 i++;
176 return i;
179 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
180 int flags,
181 struct ceph_snap_context *snapc,
182 struct ceph_osd_req_op *ops,
183 bool use_mempool,
184 gfp_t gfp_flags,
185 struct page **pages,
186 struct bio *bio)
188 struct ceph_osd_request *req;
189 struct ceph_msg *msg;
190 int needs_trail;
191 int num_op = get_num_ops(ops, &needs_trail);
192 size_t msg_size = sizeof(struct ceph_osd_request_head);
194 msg_size += num_op*sizeof(struct ceph_osd_op);
196 if (use_mempool) {
197 req = mempool_alloc(osdc->req_mempool, gfp_flags);
198 memset(req, 0, sizeof(*req));
199 } else {
200 req = kzalloc(sizeof(*req), gfp_flags);
202 if (req == NULL)
203 return NULL;
205 req->r_osdc = osdc;
206 req->r_mempool = use_mempool;
208 kref_init(&req->r_kref);
209 init_completion(&req->r_completion);
210 init_completion(&req->r_safe_completion);
211 INIT_LIST_HEAD(&req->r_unsafe_item);
212 req->r_flags = flags;
214 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
216 /* create reply message */
217 if (use_mempool)
218 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
219 else
220 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
221 OSD_OPREPLY_FRONT_LEN, gfp_flags);
222 if (!msg) {
223 ceph_osdc_put_request(req);
224 return NULL;
226 req->r_reply = msg;
228 /* allocate space for the trailing data */
229 if (needs_trail) {
230 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
231 if (!req->r_trail) {
232 ceph_osdc_put_request(req);
233 return NULL;
235 ceph_pagelist_init(req->r_trail);
237 /* create request message; allow space for oid */
238 msg_size += 40;
239 if (snapc)
240 msg_size += sizeof(u64) * snapc->num_snaps;
241 if (use_mempool)
242 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
243 else
244 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
245 if (!msg) {
246 ceph_osdc_put_request(req);
247 return NULL;
250 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
251 memset(msg->front.iov_base, 0, msg->front.iov_len);
253 req->r_request = msg;
254 req->r_pages = pages;
255 #ifdef CONFIG_BLOCK
256 if (bio) {
257 req->r_bio = bio;
258 bio_get(req->r_bio);
260 #endif
262 return req;
264 EXPORT_SYMBOL(ceph_osdc_alloc_request);
266 static void osd_req_encode_op(struct ceph_osd_request *req,
267 struct ceph_osd_op *dst,
268 struct ceph_osd_req_op *src)
270 dst->op = cpu_to_le16(src->op);
272 switch (dst->op) {
273 case CEPH_OSD_OP_READ:
274 case CEPH_OSD_OP_WRITE:
275 dst->extent.offset =
276 cpu_to_le64(src->extent.offset);
277 dst->extent.length =
278 cpu_to_le64(src->extent.length);
279 dst->extent.truncate_size =
280 cpu_to_le64(src->extent.truncate_size);
281 dst->extent.truncate_seq =
282 cpu_to_le32(src->extent.truncate_seq);
283 break;
285 case CEPH_OSD_OP_GETXATTR:
286 case CEPH_OSD_OP_SETXATTR:
287 case CEPH_OSD_OP_CMPXATTR:
288 BUG_ON(!req->r_trail);
290 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
291 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
292 dst->xattr.cmp_op = src->xattr.cmp_op;
293 dst->xattr.cmp_mode = src->xattr.cmp_mode;
294 ceph_pagelist_append(req->r_trail, src->xattr.name,
295 src->xattr.name_len);
296 ceph_pagelist_append(req->r_trail, src->xattr.val,
297 src->xattr.value_len);
298 break;
299 case CEPH_OSD_OP_CALL:
300 BUG_ON(!req->r_trail);
302 dst->cls.class_len = src->cls.class_len;
303 dst->cls.method_len = src->cls.method_len;
304 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
306 ceph_pagelist_append(req->r_trail, src->cls.class_name,
307 src->cls.class_len);
308 ceph_pagelist_append(req->r_trail, src->cls.method_name,
309 src->cls.method_len);
310 ceph_pagelist_append(req->r_trail, src->cls.indata,
311 src->cls.indata_len);
312 break;
313 case CEPH_OSD_OP_ROLLBACK:
314 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
315 break;
316 case CEPH_OSD_OP_STARTSYNC:
317 break;
318 default:
319 pr_err("unrecognized osd opcode %d\n", dst->op);
320 WARN_ON(1);
321 break;
323 dst->payload_len = cpu_to_le32(src->payload_len);
327 * build new request AND message
330 void ceph_osdc_build_request(struct ceph_osd_request *req,
331 u64 off, u64 *plen,
332 struct ceph_osd_req_op *src_ops,
333 struct ceph_snap_context *snapc,
334 struct timespec *mtime,
335 const char *oid,
336 int oid_len)
338 struct ceph_msg *msg = req->r_request;
339 struct ceph_osd_request_head *head;
340 struct ceph_osd_req_op *src_op;
341 struct ceph_osd_op *op;
342 void *p;
343 int num_op = get_num_ops(src_ops, NULL);
344 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
345 int flags = req->r_flags;
346 u64 data_len = 0;
347 int i;
349 head = msg->front.iov_base;
350 op = (void *)(head + 1);
351 p = (void *)(op + num_op);
353 req->r_snapc = ceph_get_snap_context(snapc);
355 head->client_inc = cpu_to_le32(1); /* always, for now. */
356 head->flags = cpu_to_le32(flags);
357 if (flags & CEPH_OSD_FLAG_WRITE)
358 ceph_encode_timespec(&head->mtime, mtime);
359 head->num_ops = cpu_to_le16(num_op);
362 /* fill in oid */
363 head->object_len = cpu_to_le32(oid_len);
364 memcpy(p, oid, oid_len);
365 p += oid_len;
367 src_op = src_ops;
368 while (src_op->op) {
369 osd_req_encode_op(req, op, src_op);
370 src_op++;
371 op++;
374 if (req->r_trail)
375 data_len += req->r_trail->length;
377 if (snapc) {
378 head->snap_seq = cpu_to_le64(snapc->seq);
379 head->num_snaps = cpu_to_le32(snapc->num_snaps);
380 for (i = 0; i < snapc->num_snaps; i++) {
381 put_unaligned_le64(snapc->snaps[i], p);
382 p += sizeof(u64);
386 if (flags & CEPH_OSD_FLAG_WRITE) {
387 req->r_request->hdr.data_off = cpu_to_le16(off);
388 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
389 } else if (data_len) {
390 req->r_request->hdr.data_off = 0;
391 req->r_request->hdr.data_len = cpu_to_le32(data_len);
394 req->r_request->page_alignment = req->r_page_alignment;
396 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
397 msg_size = p - msg->front.iov_base;
398 msg->front.iov_len = msg_size;
399 msg->hdr.front_len = cpu_to_le32(msg_size);
400 return;
402 EXPORT_SYMBOL(ceph_osdc_build_request);
405 * build new request AND message, calculate layout, and adjust file
406 * extent as needed.
408 * if the file was recently truncated, we include information about its
409 * old and new size so that the object can be updated appropriately. (we
410 * avoid synchronously deleting truncated objects because it's slow.)
412 * if @do_sync, include a 'startsync' command so that the osd will flush
413 * data quickly.
415 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
416 struct ceph_file_layout *layout,
417 struct ceph_vino vino,
418 u64 off, u64 *plen,
419 int opcode, int flags,
420 struct ceph_snap_context *snapc,
421 int do_sync,
422 u32 truncate_seq,
423 u64 truncate_size,
424 struct timespec *mtime,
425 bool use_mempool, int num_reply,
426 int page_align)
428 struct ceph_osd_req_op ops[3];
429 struct ceph_osd_request *req;
431 ops[0].op = opcode;
432 ops[0].extent.truncate_seq = truncate_seq;
433 ops[0].extent.truncate_size = truncate_size;
434 ops[0].payload_len = 0;
436 if (do_sync) {
437 ops[1].op = CEPH_OSD_OP_STARTSYNC;
438 ops[1].payload_len = 0;
439 ops[2].op = 0;
440 } else
441 ops[1].op = 0;
443 req = ceph_osdc_alloc_request(osdc, flags,
444 snapc, ops,
445 use_mempool,
446 GFP_NOFS, NULL, NULL);
447 if (IS_ERR(req))
448 return req;
450 /* calculate max write size */
451 calc_layout(osdc, vino, layout, off, plen, req, ops);
452 req->r_file_layout = *layout; /* keep a copy */
454 /* in case it differs from natural alignment that calc_layout
455 filled in for us */
456 req->r_page_alignment = page_align;
458 ceph_osdc_build_request(req, off, plen, ops,
459 snapc,
460 mtime,
461 req->r_oid, req->r_oid_len);
463 return req;
465 EXPORT_SYMBOL(ceph_osdc_new_request);
468 * We keep osd requests in an rbtree, sorted by ->r_tid.
470 static void __insert_request(struct ceph_osd_client *osdc,
471 struct ceph_osd_request *new)
473 struct rb_node **p = &osdc->requests.rb_node;
474 struct rb_node *parent = NULL;
475 struct ceph_osd_request *req = NULL;
477 while (*p) {
478 parent = *p;
479 req = rb_entry(parent, struct ceph_osd_request, r_node);
480 if (new->r_tid < req->r_tid)
481 p = &(*p)->rb_left;
482 else if (new->r_tid > req->r_tid)
483 p = &(*p)->rb_right;
484 else
485 BUG();
488 rb_link_node(&new->r_node, parent, p);
489 rb_insert_color(&new->r_node, &osdc->requests);
492 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
493 u64 tid)
495 struct ceph_osd_request *req;
496 struct rb_node *n = osdc->requests.rb_node;
498 while (n) {
499 req = rb_entry(n, struct ceph_osd_request, r_node);
500 if (tid < req->r_tid)
501 n = n->rb_left;
502 else if (tid > req->r_tid)
503 n = n->rb_right;
504 else
505 return req;
507 return NULL;
510 static struct ceph_osd_request *
511 __lookup_request_ge(struct ceph_osd_client *osdc,
512 u64 tid)
514 struct ceph_osd_request *req;
515 struct rb_node *n = osdc->requests.rb_node;
517 while (n) {
518 req = rb_entry(n, struct ceph_osd_request, r_node);
519 if (tid < req->r_tid) {
520 if (!n->rb_left)
521 return req;
522 n = n->rb_left;
523 } else if (tid > req->r_tid) {
524 n = n->rb_right;
525 } else {
526 return req;
529 return NULL;
534 * If the osd connection drops, we need to resubmit all requests.
536 static void osd_reset(struct ceph_connection *con)
538 struct ceph_osd *osd = con->private;
539 struct ceph_osd_client *osdc;
541 if (!osd)
542 return;
543 dout("osd_reset osd%d\n", osd->o_osd);
544 osdc = osd->o_osdc;
545 down_read(&osdc->map_sem);
546 kick_requests(osdc, osd);
547 up_read(&osdc->map_sem);
551 * Track open sessions with osds.
553 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
555 struct ceph_osd *osd;
557 osd = kzalloc(sizeof(*osd), GFP_NOFS);
558 if (!osd)
559 return NULL;
561 atomic_set(&osd->o_ref, 1);
562 osd->o_osdc = osdc;
563 INIT_LIST_HEAD(&osd->o_requests);
564 INIT_LIST_HEAD(&osd->o_osd_lru);
565 osd->o_incarnation = 1;
567 ceph_con_init(osdc->client->msgr, &osd->o_con);
568 osd->o_con.private = osd;
569 osd->o_con.ops = &osd_con_ops;
570 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
572 INIT_LIST_HEAD(&osd->o_keepalive_item);
573 return osd;
576 static struct ceph_osd *get_osd(struct ceph_osd *osd)
578 if (atomic_inc_not_zero(&osd->o_ref)) {
579 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
580 atomic_read(&osd->o_ref));
581 return osd;
582 } else {
583 dout("get_osd %p FAIL\n", osd);
584 return NULL;
588 static void put_osd(struct ceph_osd *osd)
590 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
591 atomic_read(&osd->o_ref) - 1);
592 if (atomic_dec_and_test(&osd->o_ref)) {
593 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
595 if (osd->o_authorizer)
596 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
597 kfree(osd);
602 * remove an osd from our map
604 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
606 dout("__remove_osd %p\n", osd);
607 BUG_ON(!list_empty(&osd->o_requests));
608 rb_erase(&osd->o_node, &osdc->osds);
609 list_del_init(&osd->o_osd_lru);
610 ceph_con_close(&osd->o_con);
611 put_osd(osd);
614 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
615 struct ceph_osd *osd)
617 dout("__move_osd_to_lru %p\n", osd);
618 BUG_ON(!list_empty(&osd->o_osd_lru));
619 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
620 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
623 static void __remove_osd_from_lru(struct ceph_osd *osd)
625 dout("__remove_osd_from_lru %p\n", osd);
626 if (!list_empty(&osd->o_osd_lru))
627 list_del_init(&osd->o_osd_lru);
630 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
632 struct ceph_osd *osd, *nosd;
634 dout("__remove_old_osds %p\n", osdc);
635 mutex_lock(&osdc->request_mutex);
636 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
637 if (!remove_all && time_before(jiffies, osd->lru_ttl))
638 break;
639 __remove_osd(osdc, osd);
641 mutex_unlock(&osdc->request_mutex);
645 * reset osd connect
647 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
649 struct ceph_osd_request *req;
650 int ret = 0;
652 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
653 if (list_empty(&osd->o_requests)) {
654 __remove_osd(osdc, osd);
655 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
656 &osd->o_con.peer_addr,
657 sizeof(osd->o_con.peer_addr)) == 0 &&
658 !ceph_con_opened(&osd->o_con)) {
659 dout(" osd addr hasn't changed and connection never opened,"
660 " letting msgr retry");
661 /* touch each r_stamp for handle_timeout()'s benfit */
662 list_for_each_entry(req, &osd->o_requests, r_osd_item)
663 req->r_stamp = jiffies;
664 ret = -EAGAIN;
665 } else {
666 ceph_con_close(&osd->o_con);
667 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
668 osd->o_incarnation++;
670 return ret;
673 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
675 struct rb_node **p = &osdc->osds.rb_node;
676 struct rb_node *parent = NULL;
677 struct ceph_osd *osd = NULL;
679 while (*p) {
680 parent = *p;
681 osd = rb_entry(parent, struct ceph_osd, o_node);
682 if (new->o_osd < osd->o_osd)
683 p = &(*p)->rb_left;
684 else if (new->o_osd > osd->o_osd)
685 p = &(*p)->rb_right;
686 else
687 BUG();
690 rb_link_node(&new->o_node, parent, p);
691 rb_insert_color(&new->o_node, &osdc->osds);
694 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
696 struct ceph_osd *osd;
697 struct rb_node *n = osdc->osds.rb_node;
699 while (n) {
700 osd = rb_entry(n, struct ceph_osd, o_node);
701 if (o < osd->o_osd)
702 n = n->rb_left;
703 else if (o > osd->o_osd)
704 n = n->rb_right;
705 else
706 return osd;
708 return NULL;
711 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
713 schedule_delayed_work(&osdc->timeout_work,
714 osdc->client->options->osd_keepalive_timeout * HZ);
717 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
719 cancel_delayed_work(&osdc->timeout_work);
723 * Register request, assign tid. If this is the first request, set up
724 * the timeout event.
726 static void register_request(struct ceph_osd_client *osdc,
727 struct ceph_osd_request *req)
729 mutex_lock(&osdc->request_mutex);
730 req->r_tid = ++osdc->last_tid;
731 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
732 INIT_LIST_HEAD(&req->r_req_lru_item);
734 dout("register_request %p tid %lld\n", req, req->r_tid);
735 __insert_request(osdc, req);
736 ceph_osdc_get_request(req);
737 osdc->num_requests++;
739 if (osdc->num_requests == 1) {
740 dout(" first request, scheduling timeout\n");
741 __schedule_osd_timeout(osdc);
743 mutex_unlock(&osdc->request_mutex);
747 * called under osdc->request_mutex
749 static void __unregister_request(struct ceph_osd_client *osdc,
750 struct ceph_osd_request *req)
752 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
753 rb_erase(&req->r_node, &osdc->requests);
754 osdc->num_requests--;
756 if (req->r_osd) {
757 /* make sure the original request isn't in flight. */
758 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
760 list_del_init(&req->r_osd_item);
761 if (list_empty(&req->r_osd->o_requests))
762 __move_osd_to_lru(osdc, req->r_osd);
763 req->r_osd = NULL;
766 ceph_osdc_put_request(req);
768 list_del_init(&req->r_req_lru_item);
769 if (osdc->num_requests == 0) {
770 dout(" no requests, canceling timeout\n");
771 __cancel_osd_timeout(osdc);
776 * Cancel a previously queued request message
778 static void __cancel_request(struct ceph_osd_request *req)
780 if (req->r_sent && req->r_osd) {
781 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
782 req->r_sent = 0;
784 list_del_init(&req->r_req_lru_item);
788 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
789 * (as needed), and set the request r_osd appropriately. If there is
790 * no up osd, set r_osd to NULL.
792 * Return 0 if unchanged, 1 if changed, or negative on error.
794 * Caller should hold map_sem for read and request_mutex.
796 static int __map_osds(struct ceph_osd_client *osdc,
797 struct ceph_osd_request *req)
799 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
800 struct ceph_pg pgid;
801 int acting[CEPH_PG_MAX_SIZE];
802 int o = -1, num = 0;
803 int err;
805 dout("map_osds %p tid %lld\n", req, req->r_tid);
806 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
807 &req->r_file_layout, osdc->osdmap);
808 if (err)
809 return err;
810 pgid = reqhead->layout.ol_pgid;
811 req->r_pgid = pgid;
813 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
814 if (err > 0) {
815 o = acting[0];
816 num = err;
819 if ((req->r_osd && req->r_osd->o_osd == o &&
820 req->r_sent >= req->r_osd->o_incarnation &&
821 req->r_num_pg_osds == num &&
822 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
823 (req->r_osd == NULL && o == -1))
824 return 0; /* no change */
826 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
827 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
828 req->r_osd ? req->r_osd->o_osd : -1);
830 /* record full pg acting set */
831 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
832 req->r_num_pg_osds = num;
834 if (req->r_osd) {
835 __cancel_request(req);
836 list_del_init(&req->r_osd_item);
837 req->r_osd = NULL;
840 req->r_osd = __lookup_osd(osdc, o);
841 if (!req->r_osd && o >= 0) {
842 err = -ENOMEM;
843 req->r_osd = create_osd(osdc);
844 if (!req->r_osd)
845 goto out;
847 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
848 req->r_osd->o_osd = o;
849 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
850 __insert_osd(osdc, req->r_osd);
852 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
855 if (req->r_osd) {
856 __remove_osd_from_lru(req->r_osd);
857 list_add(&req->r_osd_item, &req->r_osd->o_requests);
859 err = 1; /* osd or pg changed */
861 out:
862 return err;
866 * caller should hold map_sem (for read) and request_mutex
868 static int __send_request(struct ceph_osd_client *osdc,
869 struct ceph_osd_request *req)
871 struct ceph_osd_request_head *reqhead;
872 int err;
874 err = __map_osds(osdc, req);
875 if (err < 0)
876 return err;
877 if (req->r_osd == NULL) {
878 dout("send_request %p no up osds in pg\n", req);
879 ceph_monc_request_next_osdmap(&osdc->client->monc);
880 return 0;
883 dout("send_request %p tid %llu to osd%d flags %d\n",
884 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
886 reqhead = req->r_request->front.iov_base;
887 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
888 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
889 reqhead->reassert_version = req->r_reassert_version;
891 req->r_stamp = jiffies;
892 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
894 ceph_msg_get(req->r_request); /* send consumes a ref */
895 ceph_con_send(&req->r_osd->o_con, req->r_request);
896 req->r_sent = req->r_osd->o_incarnation;
897 return 0;
901 * Timeout callback, called every N seconds when 1 or more osd
902 * requests has been active for more than N seconds. When this
903 * happens, we ping all OSDs with requests who have timed out to
904 * ensure any communications channel reset is detected. Reset the
905 * request timeouts another N seconds in the future as we go.
906 * Reschedule the timeout event another N seconds in future (unless
907 * there are no open requests).
909 static void handle_timeout(struct work_struct *work)
911 struct ceph_osd_client *osdc =
912 container_of(work, struct ceph_osd_client, timeout_work.work);
913 struct ceph_osd_request *req, *last_req = NULL;
914 struct ceph_osd *osd;
915 unsigned long timeout = osdc->client->options->osd_timeout * HZ;
916 unsigned long keepalive =
917 osdc->client->options->osd_keepalive_timeout * HZ;
918 unsigned long last_stamp = 0;
919 struct rb_node *p;
920 struct list_head slow_osds;
922 dout("timeout\n");
923 down_read(&osdc->map_sem);
925 ceph_monc_request_next_osdmap(&osdc->client->monc);
927 mutex_lock(&osdc->request_mutex);
928 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
929 req = rb_entry(p, struct ceph_osd_request, r_node);
931 if (req->r_resend) {
932 int err;
934 dout("osdc resending prev failed %lld\n", req->r_tid);
935 err = __send_request(osdc, req);
936 if (err)
937 dout("osdc failed again on %lld\n", req->r_tid);
938 else
939 req->r_resend = false;
940 continue;
945 * reset osds that appear to be _really_ unresponsive. this
946 * is a failsafe measure.. we really shouldn't be getting to
947 * this point if the system is working properly. the monitors
948 * should mark the osd as failed and we should find out about
949 * it from an updated osd map.
951 while (timeout && !list_empty(&osdc->req_lru)) {
952 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
953 r_req_lru_item);
955 if (time_before(jiffies, req->r_stamp + timeout))
956 break;
958 BUG_ON(req == last_req && req->r_stamp == last_stamp);
959 last_req = req;
960 last_stamp = req->r_stamp;
962 osd = req->r_osd;
963 BUG_ON(!osd);
964 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
965 req->r_tid, osd->o_osd);
966 __kick_requests(osdc, osd);
970 * ping osds that are a bit slow. this ensures that if there
971 * is a break in the TCP connection we will notice, and reopen
972 * a connection with that osd (from the fault callback).
974 INIT_LIST_HEAD(&slow_osds);
975 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
976 if (time_before(jiffies, req->r_stamp + keepalive))
977 break;
979 osd = req->r_osd;
980 BUG_ON(!osd);
981 dout(" tid %llu is slow, will send keepalive on osd%d\n",
982 req->r_tid, osd->o_osd);
983 list_move_tail(&osd->o_keepalive_item, &slow_osds);
985 while (!list_empty(&slow_osds)) {
986 osd = list_entry(slow_osds.next, struct ceph_osd,
987 o_keepalive_item);
988 list_del_init(&osd->o_keepalive_item);
989 ceph_con_keepalive(&osd->o_con);
992 __schedule_osd_timeout(osdc);
993 mutex_unlock(&osdc->request_mutex);
995 up_read(&osdc->map_sem);
998 static void handle_osds_timeout(struct work_struct *work)
1000 struct ceph_osd_client *osdc =
1001 container_of(work, struct ceph_osd_client,
1002 osds_timeout_work.work);
1003 unsigned long delay =
1004 osdc->client->options->osd_idle_ttl * HZ >> 2;
1006 dout("osds timeout\n");
1007 down_read(&osdc->map_sem);
1008 remove_old_osds(osdc, 0);
1009 up_read(&osdc->map_sem);
1011 schedule_delayed_work(&osdc->osds_timeout_work,
1012 round_jiffies_relative(delay));
1016 * handle osd op reply. either call the callback if it is specified,
1017 * or do the completion to wake up the waiting thread.
1019 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1020 struct ceph_connection *con)
1022 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1023 struct ceph_osd_request *req;
1024 u64 tid;
1025 int numops, object_len, flags;
1026 s32 result;
1028 tid = le64_to_cpu(msg->hdr.tid);
1029 if (msg->front.iov_len < sizeof(*rhead))
1030 goto bad;
1031 numops = le32_to_cpu(rhead->num_ops);
1032 object_len = le32_to_cpu(rhead->object_len);
1033 result = le32_to_cpu(rhead->result);
1034 if (msg->front.iov_len != sizeof(*rhead) + object_len +
1035 numops * sizeof(struct ceph_osd_op))
1036 goto bad;
1037 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1039 /* lookup */
1040 mutex_lock(&osdc->request_mutex);
1041 req = __lookup_request(osdc, tid);
1042 if (req == NULL) {
1043 dout("handle_reply tid %llu dne\n", tid);
1044 mutex_unlock(&osdc->request_mutex);
1045 return;
1047 ceph_osdc_get_request(req);
1048 flags = le32_to_cpu(rhead->flags);
1051 * if this connection filled our message, drop our reference now, to
1052 * avoid a (safe but slower) revoke later.
1054 if (req->r_con_filling_msg == con && req->r_reply == msg) {
1055 dout(" dropping con_filling_msg ref %p\n", con);
1056 req->r_con_filling_msg = NULL;
1057 ceph_con_put(con);
1060 if (!req->r_got_reply) {
1061 unsigned bytes;
1063 req->r_result = le32_to_cpu(rhead->result);
1064 bytes = le32_to_cpu(msg->hdr.data_len);
1065 dout("handle_reply result %d bytes %d\n", req->r_result,
1066 bytes);
1067 if (req->r_result == 0)
1068 req->r_result = bytes;
1070 /* in case this is a write and we need to replay, */
1071 req->r_reassert_version = rhead->reassert_version;
1073 req->r_got_reply = 1;
1074 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1075 dout("handle_reply tid %llu dup ack\n", tid);
1076 mutex_unlock(&osdc->request_mutex);
1077 goto done;
1080 dout("handle_reply tid %llu flags %d\n", tid, flags);
1082 /* either this is a read, or we got the safe response */
1083 if (result < 0 ||
1084 (flags & CEPH_OSD_FLAG_ONDISK) ||
1085 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1086 __unregister_request(osdc, req);
1088 mutex_unlock(&osdc->request_mutex);
1090 if (req->r_callback)
1091 req->r_callback(req, msg);
1092 else
1093 complete_all(&req->r_completion);
1095 if (flags & CEPH_OSD_FLAG_ONDISK) {
1096 if (req->r_safe_callback)
1097 req->r_safe_callback(req, msg);
1098 complete_all(&req->r_safe_completion); /* fsync waiter */
1101 done:
1102 ceph_osdc_put_request(req);
1103 return;
1105 bad:
1106 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1107 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1108 (int)sizeof(*rhead));
1109 ceph_msg_dump(msg);
1113 static int __kick_requests(struct ceph_osd_client *osdc,
1114 struct ceph_osd *kickosd)
1116 struct ceph_osd_request *req;
1117 struct rb_node *p, *n;
1118 int needmap = 0;
1119 int err;
1121 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
1122 if (kickosd) {
1123 err = __reset_osd(osdc, kickosd);
1124 if (err == -EAGAIN)
1125 return 1;
1126 } else {
1127 for (p = rb_first(&osdc->osds); p; p = n) {
1128 struct ceph_osd *osd =
1129 rb_entry(p, struct ceph_osd, o_node);
1131 n = rb_next(p);
1132 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1133 memcmp(&osd->o_con.peer_addr,
1134 ceph_osd_addr(osdc->osdmap,
1135 osd->o_osd),
1136 sizeof(struct ceph_entity_addr)) != 0)
1137 __reset_osd(osdc, osd);
1141 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1142 req = rb_entry(p, struct ceph_osd_request, r_node);
1144 if (req->r_resend) {
1145 dout(" r_resend set on tid %llu\n", req->r_tid);
1146 __cancel_request(req);
1147 goto kick;
1149 if (req->r_osd && kickosd == req->r_osd) {
1150 __cancel_request(req);
1151 goto kick;
1154 err = __map_osds(osdc, req);
1155 if (err == 0)
1156 continue; /* no change */
1157 if (err < 0) {
1159 * FIXME: really, we should set the request
1160 * error and fail if this isn't a 'nofail'
1161 * request, but that's a fair bit more
1162 * complicated to do. So retry!
1164 dout(" setting r_resend on %llu\n", req->r_tid);
1165 req->r_resend = true;
1166 continue;
1168 if (req->r_osd == NULL) {
1169 dout("tid %llu maps to no valid osd\n", req->r_tid);
1170 needmap++; /* request a newer map */
1171 continue;
1174 kick:
1175 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
1176 req->r_osd ? req->r_osd->o_osd : -1);
1177 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1178 err = __send_request(osdc, req);
1179 if (err) {
1180 dout(" setting r_resend on %llu\n", req->r_tid);
1181 req->r_resend = true;
1185 return needmap;
1189 * Resubmit osd requests whose osd or osd address has changed. Request
1190 * a new osd map if osds are down, or we are otherwise unable to determine
1191 * how to direct a request.
1193 * Close connections to down osds.
1195 * If @who is specified, resubmit requests for that specific osd.
1197 * Caller should hold map_sem for read and request_mutex.
1199 static void kick_requests(struct ceph_osd_client *osdc,
1200 struct ceph_osd *kickosd)
1202 int needmap;
1204 mutex_lock(&osdc->request_mutex);
1205 needmap = __kick_requests(osdc, kickosd);
1206 mutex_unlock(&osdc->request_mutex);
1208 if (needmap) {
1209 dout("%d requests for down osds, need new map\n", needmap);
1210 ceph_monc_request_next_osdmap(&osdc->client->monc);
1215 * Process updated osd map.
1217 * The message contains any number of incremental and full maps, normally
1218 * indicating some sort of topology change in the cluster. Kick requests
1219 * off to different OSDs as needed.
1221 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1223 void *p, *end, *next;
1224 u32 nr_maps, maplen;
1225 u32 epoch;
1226 struct ceph_osdmap *newmap = NULL, *oldmap;
1227 int err;
1228 struct ceph_fsid fsid;
1230 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1231 p = msg->front.iov_base;
1232 end = p + msg->front.iov_len;
1234 /* verify fsid */
1235 ceph_decode_need(&p, end, sizeof(fsid), bad);
1236 ceph_decode_copy(&p, &fsid, sizeof(fsid));
1237 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1238 return;
1240 down_write(&osdc->map_sem);
1242 /* incremental maps */
1243 ceph_decode_32_safe(&p, end, nr_maps, bad);
1244 dout(" %d inc maps\n", nr_maps);
1245 while (nr_maps > 0) {
1246 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1247 epoch = ceph_decode_32(&p);
1248 maplen = ceph_decode_32(&p);
1249 ceph_decode_need(&p, end, maplen, bad);
1250 next = p + maplen;
1251 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1252 dout("applying incremental map %u len %d\n",
1253 epoch, maplen);
1254 newmap = osdmap_apply_incremental(&p, next,
1255 osdc->osdmap,
1256 osdc->client->msgr);
1257 if (IS_ERR(newmap)) {
1258 err = PTR_ERR(newmap);
1259 goto bad;
1261 BUG_ON(!newmap);
1262 if (newmap != osdc->osdmap) {
1263 ceph_osdmap_destroy(osdc->osdmap);
1264 osdc->osdmap = newmap;
1266 } else {
1267 dout("ignoring incremental map %u len %d\n",
1268 epoch, maplen);
1270 p = next;
1271 nr_maps--;
1273 if (newmap)
1274 goto done;
1276 /* full maps */
1277 ceph_decode_32_safe(&p, end, nr_maps, bad);
1278 dout(" %d full maps\n", nr_maps);
1279 while (nr_maps) {
1280 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1281 epoch = ceph_decode_32(&p);
1282 maplen = ceph_decode_32(&p);
1283 ceph_decode_need(&p, end, maplen, bad);
1284 if (nr_maps > 1) {
1285 dout("skipping non-latest full map %u len %d\n",
1286 epoch, maplen);
1287 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1288 dout("skipping full map %u len %d, "
1289 "older than our %u\n", epoch, maplen,
1290 osdc->osdmap->epoch);
1291 } else {
1292 dout("taking full map %u len %d\n", epoch, maplen);
1293 newmap = osdmap_decode(&p, p+maplen);
1294 if (IS_ERR(newmap)) {
1295 err = PTR_ERR(newmap);
1296 goto bad;
1298 BUG_ON(!newmap);
1299 oldmap = osdc->osdmap;
1300 osdc->osdmap = newmap;
1301 if (oldmap)
1302 ceph_osdmap_destroy(oldmap);
1304 p += maplen;
1305 nr_maps--;
1308 done:
1309 downgrade_write(&osdc->map_sem);
1310 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1311 if (newmap)
1312 kick_requests(osdc, NULL);
1313 up_read(&osdc->map_sem);
1314 wake_up_all(&osdc->client->auth_wq);
1315 return;
1317 bad:
1318 pr_err("osdc handle_map corrupt msg\n");
1319 ceph_msg_dump(msg);
1320 up_write(&osdc->map_sem);
1321 return;
1325 * Register request, send initial attempt.
1327 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1328 struct ceph_osd_request *req,
1329 bool nofail)
1331 int rc = 0;
1333 req->r_request->pages = req->r_pages;
1334 req->r_request->nr_pages = req->r_num_pages;
1335 #ifdef CONFIG_BLOCK
1336 req->r_request->bio = req->r_bio;
1337 #endif
1338 req->r_request->trail = req->r_trail;
1340 register_request(osdc, req);
1342 down_read(&osdc->map_sem);
1343 mutex_lock(&osdc->request_mutex);
1345 * a racing kick_requests() may have sent the message for us
1346 * while we dropped request_mutex above, so only send now if
1347 * the request still han't been touched yet.
1349 if (req->r_sent == 0) {
1350 rc = __send_request(osdc, req);
1351 if (rc) {
1352 if (nofail) {
1353 dout("osdc_start_request failed send, "
1354 " marking %lld\n", req->r_tid);
1355 req->r_resend = true;
1356 rc = 0;
1357 } else {
1358 __unregister_request(osdc, req);
1362 mutex_unlock(&osdc->request_mutex);
1363 up_read(&osdc->map_sem);
1364 return rc;
1366 EXPORT_SYMBOL(ceph_osdc_start_request);
1369 * wait for a request to complete
1371 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1372 struct ceph_osd_request *req)
1374 int rc;
1376 rc = wait_for_completion_interruptible(&req->r_completion);
1377 if (rc < 0) {
1378 mutex_lock(&osdc->request_mutex);
1379 __cancel_request(req);
1380 __unregister_request(osdc, req);
1381 mutex_unlock(&osdc->request_mutex);
1382 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1383 return rc;
1386 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1387 return req->r_result;
1389 EXPORT_SYMBOL(ceph_osdc_wait_request);
1392 * sync - wait for all in-flight requests to flush. avoid starvation.
1394 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1396 struct ceph_osd_request *req;
1397 u64 last_tid, next_tid = 0;
1399 mutex_lock(&osdc->request_mutex);
1400 last_tid = osdc->last_tid;
1401 while (1) {
1402 req = __lookup_request_ge(osdc, next_tid);
1403 if (!req)
1404 break;
1405 if (req->r_tid > last_tid)
1406 break;
1408 next_tid = req->r_tid + 1;
1409 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1410 continue;
1412 ceph_osdc_get_request(req);
1413 mutex_unlock(&osdc->request_mutex);
1414 dout("sync waiting on tid %llu (last is %llu)\n",
1415 req->r_tid, last_tid);
1416 wait_for_completion(&req->r_safe_completion);
1417 mutex_lock(&osdc->request_mutex);
1418 ceph_osdc_put_request(req);
1420 mutex_unlock(&osdc->request_mutex);
1421 dout("sync done (thru tid %llu)\n", last_tid);
1423 EXPORT_SYMBOL(ceph_osdc_sync);
1426 * init, shutdown
1428 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1430 int err;
1432 dout("init\n");
1433 osdc->client = client;
1434 osdc->osdmap = NULL;
1435 init_rwsem(&osdc->map_sem);
1436 init_completion(&osdc->map_waiters);
1437 osdc->last_requested_map = 0;
1438 mutex_init(&osdc->request_mutex);
1439 osdc->last_tid = 0;
1440 osdc->osds = RB_ROOT;
1441 INIT_LIST_HEAD(&osdc->osd_lru);
1442 osdc->requests = RB_ROOT;
1443 INIT_LIST_HEAD(&osdc->req_lru);
1444 osdc->num_requests = 0;
1445 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1446 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1448 schedule_delayed_work(&osdc->osds_timeout_work,
1449 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1451 err = -ENOMEM;
1452 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1453 sizeof(struct ceph_osd_request));
1454 if (!osdc->req_mempool)
1455 goto out;
1457 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1458 "osd_op");
1459 if (err < 0)
1460 goto out_mempool;
1461 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1462 OSD_OPREPLY_FRONT_LEN, 10, true,
1463 "osd_op_reply");
1464 if (err < 0)
1465 goto out_msgpool;
1466 return 0;
1468 out_msgpool:
1469 ceph_msgpool_destroy(&osdc->msgpool_op);
1470 out_mempool:
1471 mempool_destroy(osdc->req_mempool);
1472 out:
1473 return err;
1475 EXPORT_SYMBOL(ceph_osdc_init);
1477 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1479 cancel_delayed_work_sync(&osdc->timeout_work);
1480 cancel_delayed_work_sync(&osdc->osds_timeout_work);
1481 if (osdc->osdmap) {
1482 ceph_osdmap_destroy(osdc->osdmap);
1483 osdc->osdmap = NULL;
1485 remove_old_osds(osdc, 1);
1486 mempool_destroy(osdc->req_mempool);
1487 ceph_msgpool_destroy(&osdc->msgpool_op);
1488 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1490 EXPORT_SYMBOL(ceph_osdc_stop);
1493 * Read some contiguous pages. If we cross a stripe boundary, shorten
1494 * *plen. Return number of bytes read, or error.
1496 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1497 struct ceph_vino vino, struct ceph_file_layout *layout,
1498 u64 off, u64 *plen,
1499 u32 truncate_seq, u64 truncate_size,
1500 struct page **pages, int num_pages, int page_align)
1502 struct ceph_osd_request *req;
1503 int rc = 0;
1505 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1506 vino.snap, off, *plen);
1507 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1508 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1509 NULL, 0, truncate_seq, truncate_size, NULL,
1510 false, 1, page_align);
1511 if (!req)
1512 return -ENOMEM;
1514 /* it may be a short read due to an object boundary */
1515 req->r_pages = pages;
1517 dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1518 off, *plen, req->r_num_pages, page_align);
1520 rc = ceph_osdc_start_request(osdc, req, false);
1521 if (!rc)
1522 rc = ceph_osdc_wait_request(osdc, req);
1524 ceph_osdc_put_request(req);
1525 dout("readpages result %d\n", rc);
1526 return rc;
1528 EXPORT_SYMBOL(ceph_osdc_readpages);
1531 * do a synchronous write on N pages
1533 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1534 struct ceph_file_layout *layout,
1535 struct ceph_snap_context *snapc,
1536 u64 off, u64 len,
1537 u32 truncate_seq, u64 truncate_size,
1538 struct timespec *mtime,
1539 struct page **pages, int num_pages,
1540 int flags, int do_sync, bool nofail)
1542 struct ceph_osd_request *req;
1543 int rc = 0;
1544 int page_align = off & ~PAGE_MASK;
1546 BUG_ON(vino.snap != CEPH_NOSNAP);
1547 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1548 CEPH_OSD_OP_WRITE,
1549 flags | CEPH_OSD_FLAG_ONDISK |
1550 CEPH_OSD_FLAG_WRITE,
1551 snapc, do_sync,
1552 truncate_seq, truncate_size, mtime,
1553 nofail, 1, page_align);
1554 if (!req)
1555 return -ENOMEM;
1557 /* it may be a short write due to an object boundary */
1558 req->r_pages = pages;
1559 dout("writepages %llu~%llu (%d pages)\n", off, len,
1560 req->r_num_pages);
1562 rc = ceph_osdc_start_request(osdc, req, nofail);
1563 if (!rc)
1564 rc = ceph_osdc_wait_request(osdc, req);
1566 ceph_osdc_put_request(req);
1567 if (rc == 0)
1568 rc = len;
1569 dout("writepages result %d\n", rc);
1570 return rc;
1572 EXPORT_SYMBOL(ceph_osdc_writepages);
1575 * handle incoming message
1577 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1579 struct ceph_osd *osd = con->private;
1580 struct ceph_osd_client *osdc;
1581 int type = le16_to_cpu(msg->hdr.type);
1583 if (!osd)
1584 goto out;
1585 osdc = osd->o_osdc;
1587 switch (type) {
1588 case CEPH_MSG_OSD_MAP:
1589 ceph_osdc_handle_map(osdc, msg);
1590 break;
1591 case CEPH_MSG_OSD_OPREPLY:
1592 handle_reply(osdc, msg, con);
1593 break;
1595 default:
1596 pr_err("received unknown message type %d %s\n", type,
1597 ceph_msg_type_name(type));
1599 out:
1600 ceph_msg_put(msg);
1604 * lookup and return message for incoming reply. set up reply message
1605 * pages.
1607 static struct ceph_msg *get_reply(struct ceph_connection *con,
1608 struct ceph_msg_header *hdr,
1609 int *skip)
1611 struct ceph_osd *osd = con->private;
1612 struct ceph_osd_client *osdc = osd->o_osdc;
1613 struct ceph_msg *m;
1614 struct ceph_osd_request *req;
1615 int front = le32_to_cpu(hdr->front_len);
1616 int data_len = le32_to_cpu(hdr->data_len);
1617 u64 tid;
1619 tid = le64_to_cpu(hdr->tid);
1620 mutex_lock(&osdc->request_mutex);
1621 req = __lookup_request(osdc, tid);
1622 if (!req) {
1623 *skip = 1;
1624 m = NULL;
1625 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1626 osd->o_osd);
1627 goto out;
1630 if (req->r_con_filling_msg) {
1631 dout("get_reply revoking msg %p from old con %p\n",
1632 req->r_reply, req->r_con_filling_msg);
1633 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1634 ceph_con_put(req->r_con_filling_msg);
1635 req->r_con_filling_msg = NULL;
1638 if (front > req->r_reply->front.iov_len) {
1639 pr_warning("get_reply front %d > preallocated %d\n",
1640 front, (int)req->r_reply->front.iov_len);
1641 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
1642 if (!m)
1643 goto out;
1644 ceph_msg_put(req->r_reply);
1645 req->r_reply = m;
1647 m = ceph_msg_get(req->r_reply);
1649 if (data_len > 0) {
1650 int want = calc_pages_for(req->r_page_alignment, data_len);
1652 if (unlikely(req->r_num_pages < want)) {
1653 pr_warning("tid %lld reply %d > expected %d pages\n",
1654 tid, want, m->nr_pages);
1655 *skip = 1;
1656 ceph_msg_put(m);
1657 m = NULL;
1658 goto out;
1660 m->pages = req->r_pages;
1661 m->nr_pages = req->r_num_pages;
1662 m->page_alignment = req->r_page_alignment;
1663 #ifdef CONFIG_BLOCK
1664 m->bio = req->r_bio;
1665 #endif
1667 *skip = 0;
1668 req->r_con_filling_msg = ceph_con_get(con);
1669 dout("get_reply tid %lld %p\n", tid, m);
1671 out:
1672 mutex_unlock(&osdc->request_mutex);
1673 return m;
1677 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1678 struct ceph_msg_header *hdr,
1679 int *skip)
1681 struct ceph_osd *osd = con->private;
1682 int type = le16_to_cpu(hdr->type);
1683 int front = le32_to_cpu(hdr->front_len);
1685 switch (type) {
1686 case CEPH_MSG_OSD_MAP:
1687 return ceph_msg_new(type, front, GFP_NOFS);
1688 case CEPH_MSG_OSD_OPREPLY:
1689 return get_reply(con, hdr, skip);
1690 default:
1691 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1692 osd->o_osd);
1693 *skip = 1;
1694 return NULL;
1699 * Wrappers to refcount containing ceph_osd struct
1701 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1703 struct ceph_osd *osd = con->private;
1704 if (get_osd(osd))
1705 return con;
1706 return NULL;
1709 static void put_osd_con(struct ceph_connection *con)
1711 struct ceph_osd *osd = con->private;
1712 put_osd(osd);
1716 * authentication
1718 static int get_authorizer(struct ceph_connection *con,
1719 void **buf, int *len, int *proto,
1720 void **reply_buf, int *reply_len, int force_new)
1722 struct ceph_osd *o = con->private;
1723 struct ceph_osd_client *osdc = o->o_osdc;
1724 struct ceph_auth_client *ac = osdc->client->monc.auth;
1725 int ret = 0;
1727 if (force_new && o->o_authorizer) {
1728 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1729 o->o_authorizer = NULL;
1731 if (o->o_authorizer == NULL) {
1732 ret = ac->ops->create_authorizer(
1733 ac, CEPH_ENTITY_TYPE_OSD,
1734 &o->o_authorizer,
1735 &o->o_authorizer_buf,
1736 &o->o_authorizer_buf_len,
1737 &o->o_authorizer_reply_buf,
1738 &o->o_authorizer_reply_buf_len);
1739 if (ret)
1740 return ret;
1743 *proto = ac->protocol;
1744 *buf = o->o_authorizer_buf;
1745 *len = o->o_authorizer_buf_len;
1746 *reply_buf = o->o_authorizer_reply_buf;
1747 *reply_len = o->o_authorizer_reply_buf_len;
1748 return 0;
1752 static int verify_authorizer_reply(struct ceph_connection *con, int len)
1754 struct ceph_osd *o = con->private;
1755 struct ceph_osd_client *osdc = o->o_osdc;
1756 struct ceph_auth_client *ac = osdc->client->monc.auth;
1758 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1761 static int invalidate_authorizer(struct ceph_connection *con)
1763 struct ceph_osd *o = con->private;
1764 struct ceph_osd_client *osdc = o->o_osdc;
1765 struct ceph_auth_client *ac = osdc->client->monc.auth;
1767 if (ac->ops->invalidate_authorizer)
1768 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1770 return ceph_monc_validate_auth(&osdc->client->monc);
1773 static const struct ceph_connection_operations osd_con_ops = {
1774 .get = get_osd_con,
1775 .put = put_osd_con,
1776 .dispatch = dispatch,
1777 .get_authorizer = get_authorizer,
1778 .verify_authorizer_reply = verify_authorizer_reply,
1779 .invalidate_authorizer = invalidate_authorizer,
1780 .alloc_msg = alloc_msg,
1781 .fault = osd_reset,