1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
15 #include "mds_client.h"
17 #include <linux/ceph/ceph_features.h>
18 #include <linux/ceph/messenger.h>
19 #include <linux/ceph/decode.h>
20 #include <linux/ceph/pagelist.h>
21 #include <linux/ceph/auth.h>
22 #include <linux/ceph/debugfs.h>
24 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
27 * A cluster of MDS (metadata server) daemons is responsible for
28 * managing the file system namespace (the directory hierarchy and
29 * inodes) and for coordinating shared access to storage. Metadata is
30 * partitioning hierarchically across a number of servers, and that
31 * partition varies over time as the cluster adjusts the distribution
32 * in order to balance load.
34 * The MDS client is primarily responsible to managing synchronous
35 * metadata requests for operations like open, unlink, and so forth.
36 * If there is a MDS failure, we find out about it when we (possibly
37 * request and) receive a new MDS map, and can resubmit affected
40 * For the most part, though, we take advantage of a lossless
41 * communications channel to the MDS, and do not need to worry about
42 * timing out or resubmitting requests.
44 * We maintain a stateful "session" with each MDS we interact with.
45 * Within each session, we sent periodic heartbeat messages to ensure
46 * any capabilities or leases we have been issues remain valid. If
47 * the session times out and goes stale, our leases and capabilities
48 * are no longer valid.
51 struct ceph_reconnect_state
{
52 struct ceph_mds_session
*session
;
53 int nr_caps
, nr_realms
;
54 struct ceph_pagelist
*pagelist
;
59 static void __wake_requests(struct ceph_mds_client
*mdsc
,
60 struct list_head
*head
);
61 static void ceph_cap_release_work(struct work_struct
*work
);
62 static void ceph_cap_reclaim_work(struct work_struct
*work
);
64 static const struct ceph_connection_operations mds_con_ops
;
71 static int parse_reply_info_quota(void **p
, void *end
,
72 struct ceph_mds_reply_info_in
*info
)
74 u8 struct_v
, struct_compat
;
77 ceph_decode_8_safe(p
, end
, struct_v
, bad
);
78 ceph_decode_8_safe(p
, end
, struct_compat
, bad
);
79 /* struct_v is expected to be >= 1. we only
80 * understand encoding with struct_compat == 1. */
81 if (!struct_v
|| struct_compat
!= 1)
83 ceph_decode_32_safe(p
, end
, struct_len
, bad
);
84 ceph_decode_need(p
, end
, struct_len
, bad
);
85 end
= *p
+ struct_len
;
86 ceph_decode_64_safe(p
, end
, info
->max_bytes
, bad
);
87 ceph_decode_64_safe(p
, end
, info
->max_files
, bad
);
95 * parse individual inode info
97 static int parse_reply_info_in(void **p
, void *end
,
98 struct ceph_mds_reply_info_in
*info
,
104 if (features
== (u64
)-1) {
107 ceph_decode_8_safe(p
, end
, struct_v
, bad
);
108 ceph_decode_8_safe(p
, end
, struct_compat
, bad
);
109 /* struct_v is expected to be >= 1. we only understand
110 * encoding with struct_compat == 1. */
111 if (!struct_v
|| struct_compat
!= 1)
113 ceph_decode_32_safe(p
, end
, struct_len
, bad
);
114 ceph_decode_need(p
, end
, struct_len
, bad
);
115 end
= *p
+ struct_len
;
118 ceph_decode_need(p
, end
, sizeof(struct ceph_mds_reply_inode
), bad
);
120 *p
+= sizeof(struct ceph_mds_reply_inode
) +
121 sizeof(*info
->in
->fragtree
.splits
) *
122 le32_to_cpu(info
->in
->fragtree
.nsplits
);
124 ceph_decode_32_safe(p
, end
, info
->symlink_len
, bad
);
125 ceph_decode_need(p
, end
, info
->symlink_len
, bad
);
127 *p
+= info
->symlink_len
;
129 ceph_decode_copy_safe(p
, end
, &info
->dir_layout
,
130 sizeof(info
->dir_layout
), bad
);
131 ceph_decode_32_safe(p
, end
, info
->xattr_len
, bad
);
132 ceph_decode_need(p
, end
, info
->xattr_len
, bad
);
133 info
->xattr_data
= *p
;
134 *p
+= info
->xattr_len
;
136 if (features
== (u64
)-1) {
138 ceph_decode_64_safe(p
, end
, info
->inline_version
, bad
);
139 ceph_decode_32_safe(p
, end
, info
->inline_len
, bad
);
140 ceph_decode_need(p
, end
, info
->inline_len
, bad
);
141 info
->inline_data
= *p
;
142 *p
+= info
->inline_len
;
144 err
= parse_reply_info_quota(p
, end
, info
);
148 ceph_decode_32_safe(p
, end
, info
->pool_ns_len
, bad
);
149 if (info
->pool_ns_len
> 0) {
150 ceph_decode_need(p
, end
, info
->pool_ns_len
, bad
);
151 info
->pool_ns_data
= *p
;
152 *p
+= info
->pool_ns_len
;
156 ceph_decode_need(p
, end
, sizeof(info
->btime
), bad
);
157 ceph_decode_copy(p
, &info
->btime
, sizeof(info
->btime
));
159 /* change attribute */
160 ceph_decode_64_safe(p
, end
, info
->change_attr
, bad
);
164 ceph_decode_32_safe(p
, end
, info
->dir_pin
, bad
);
166 info
->dir_pin
= -ENODATA
;
169 /* snapshot birth time, remains zero for v<=2 */
171 ceph_decode_need(p
, end
, sizeof(info
->snap_btime
), bad
);
172 ceph_decode_copy(p
, &info
->snap_btime
,
173 sizeof(info
->snap_btime
));
175 memset(&info
->snap_btime
, 0, sizeof(info
->snap_btime
));
180 if (features
& CEPH_FEATURE_MDS_INLINE_DATA
) {
181 ceph_decode_64_safe(p
, end
, info
->inline_version
, bad
);
182 ceph_decode_32_safe(p
, end
, info
->inline_len
, bad
);
183 ceph_decode_need(p
, end
, info
->inline_len
, bad
);
184 info
->inline_data
= *p
;
185 *p
+= info
->inline_len
;
187 info
->inline_version
= CEPH_INLINE_NONE
;
189 if (features
& CEPH_FEATURE_MDS_QUOTA
) {
190 err
= parse_reply_info_quota(p
, end
, info
);
198 info
->pool_ns_len
= 0;
199 info
->pool_ns_data
= NULL
;
200 if (features
& CEPH_FEATURE_FS_FILE_LAYOUT_V2
) {
201 ceph_decode_32_safe(p
, end
, info
->pool_ns_len
, bad
);
202 if (info
->pool_ns_len
> 0) {
203 ceph_decode_need(p
, end
, info
->pool_ns_len
, bad
);
204 info
->pool_ns_data
= *p
;
205 *p
+= info
->pool_ns_len
;
209 if (features
& CEPH_FEATURE_FS_BTIME
) {
210 ceph_decode_need(p
, end
, sizeof(info
->btime
), bad
);
211 ceph_decode_copy(p
, &info
->btime
, sizeof(info
->btime
));
212 ceph_decode_64_safe(p
, end
, info
->change_attr
, bad
);
215 info
->dir_pin
= -ENODATA
;
216 /* info->snap_btime remains zero */
225 static int parse_reply_info_dir(void **p
, void *end
,
226 struct ceph_mds_reply_dirfrag
**dirfrag
,
229 if (features
== (u64
)-1) {
230 u8 struct_v
, struct_compat
;
232 ceph_decode_8_safe(p
, end
, struct_v
, bad
);
233 ceph_decode_8_safe(p
, end
, struct_compat
, bad
);
234 /* struct_v is expected to be >= 1. we only understand
235 * encoding whose struct_compat == 1. */
236 if (!struct_v
|| struct_compat
!= 1)
238 ceph_decode_32_safe(p
, end
, struct_len
, bad
);
239 ceph_decode_need(p
, end
, struct_len
, bad
);
240 end
= *p
+ struct_len
;
243 ceph_decode_need(p
, end
, sizeof(**dirfrag
), bad
);
245 *p
+= sizeof(**dirfrag
) + sizeof(u32
) * le32_to_cpu((*dirfrag
)->ndist
);
246 if (unlikely(*p
> end
))
248 if (features
== (u64
)-1)
255 static int parse_reply_info_lease(void **p
, void *end
,
256 struct ceph_mds_reply_lease
**lease
,
259 if (features
== (u64
)-1) {
260 u8 struct_v
, struct_compat
;
262 ceph_decode_8_safe(p
, end
, struct_v
, bad
);
263 ceph_decode_8_safe(p
, end
, struct_compat
, bad
);
264 /* struct_v is expected to be >= 1. we only understand
265 * encoding whose struct_compat == 1. */
266 if (!struct_v
|| struct_compat
!= 1)
268 ceph_decode_32_safe(p
, end
, struct_len
, bad
);
269 ceph_decode_need(p
, end
, struct_len
, bad
);
270 end
= *p
+ struct_len
;
273 ceph_decode_need(p
, end
, sizeof(**lease
), bad
);
275 *p
+= sizeof(**lease
);
276 if (features
== (u64
)-1)
284 * parse a normal reply, which may contain a (dir+)dentry and/or a
287 static int parse_reply_info_trace(void **p
, void *end
,
288 struct ceph_mds_reply_info_parsed
*info
,
293 if (info
->head
->is_dentry
) {
294 err
= parse_reply_info_in(p
, end
, &info
->diri
, features
);
298 err
= parse_reply_info_dir(p
, end
, &info
->dirfrag
, features
);
302 ceph_decode_32_safe(p
, end
, info
->dname_len
, bad
);
303 ceph_decode_need(p
, end
, info
->dname_len
, bad
);
305 *p
+= info
->dname_len
;
307 err
= parse_reply_info_lease(p
, end
, &info
->dlease
, features
);
312 if (info
->head
->is_target
) {
313 err
= parse_reply_info_in(p
, end
, &info
->targeti
, features
);
318 if (unlikely(*p
!= end
))
325 pr_err("problem parsing mds trace %d\n", err
);
330 * parse readdir results
332 static int parse_reply_info_readdir(void **p
, void *end
,
333 struct ceph_mds_reply_info_parsed
*info
,
339 err
= parse_reply_info_dir(p
, end
, &info
->dir_dir
, features
);
343 ceph_decode_need(p
, end
, sizeof(num
) + 2, bad
);
344 num
= ceph_decode_32(p
);
346 u16 flags
= ceph_decode_16(p
);
347 info
->dir_end
= !!(flags
& CEPH_READDIR_FRAG_END
);
348 info
->dir_complete
= !!(flags
& CEPH_READDIR_FRAG_COMPLETE
);
349 info
->hash_order
= !!(flags
& CEPH_READDIR_HASH_ORDER
);
350 info
->offset_hash
= !!(flags
& CEPH_READDIR_OFFSET_HASH
);
355 BUG_ON(!info
->dir_entries
);
356 if ((unsigned long)(info
->dir_entries
+ num
) >
357 (unsigned long)info
->dir_entries
+ info
->dir_buf_size
) {
358 pr_err("dir contents are larger than expected\n");
365 struct ceph_mds_reply_dir_entry
*rde
= info
->dir_entries
+ i
;
367 ceph_decode_32_safe(p
, end
, rde
->name_len
, bad
);
368 ceph_decode_need(p
, end
, rde
->name_len
, bad
);
371 dout("parsed dir dname '%.*s'\n", rde
->name_len
, rde
->name
);
374 err
= parse_reply_info_lease(p
, end
, &rde
->lease
, features
);
378 err
= parse_reply_info_in(p
, end
, &rde
->inode
, features
);
381 /* ceph_readdir_prepopulate() will update it */
388 /* Skip over any unrecognized fields */
395 pr_err("problem parsing dir contents %d\n", err
);
400 * parse fcntl F_GETLK results
402 static int parse_reply_info_filelock(void **p
, void *end
,
403 struct ceph_mds_reply_info_parsed
*info
,
406 if (*p
+ sizeof(*info
->filelock_reply
) > end
)
409 info
->filelock_reply
= *p
;
411 /* Skip over any unrecognized fields */
419 * parse create results
421 static int parse_reply_info_create(void **p
, void *end
,
422 struct ceph_mds_reply_info_parsed
*info
,
425 if (features
== (u64
)-1 ||
426 (features
& CEPH_FEATURE_REPLY_CREATE_INODE
)) {
427 /* Malformed reply? */
429 info
->has_create_ino
= false;
431 info
->has_create_ino
= true;
432 ceph_decode_64_safe(p
, end
, info
->ino
, bad
);
439 /* Skip over any unrecognized fields */
447 * parse extra results
449 static int parse_reply_info_extra(void **p
, void *end
,
450 struct ceph_mds_reply_info_parsed
*info
,
453 u32 op
= le32_to_cpu(info
->head
->op
);
455 if (op
== CEPH_MDS_OP_GETFILELOCK
)
456 return parse_reply_info_filelock(p
, end
, info
, features
);
457 else if (op
== CEPH_MDS_OP_READDIR
|| op
== CEPH_MDS_OP_LSSNAP
)
458 return parse_reply_info_readdir(p
, end
, info
, features
);
459 else if (op
== CEPH_MDS_OP_CREATE
)
460 return parse_reply_info_create(p
, end
, info
, features
);
466 * parse entire mds reply
468 static int parse_reply_info(struct ceph_msg
*msg
,
469 struct ceph_mds_reply_info_parsed
*info
,
476 info
->head
= msg
->front
.iov_base
;
477 p
= msg
->front
.iov_base
+ sizeof(struct ceph_mds_reply_head
);
478 end
= p
+ msg
->front
.iov_len
- sizeof(struct ceph_mds_reply_head
);
481 ceph_decode_32_safe(&p
, end
, len
, bad
);
483 ceph_decode_need(&p
, end
, len
, bad
);
484 err
= parse_reply_info_trace(&p
, p
+len
, info
, features
);
490 ceph_decode_32_safe(&p
, end
, len
, bad
);
492 ceph_decode_need(&p
, end
, len
, bad
);
493 err
= parse_reply_info_extra(&p
, p
+len
, info
, features
);
499 ceph_decode_32_safe(&p
, end
, len
, bad
);
500 info
->snapblob_len
= len
;
511 pr_err("mds parse_reply err %d\n", err
);
515 static void destroy_reply_info(struct ceph_mds_reply_info_parsed
*info
)
517 if (!info
->dir_entries
)
519 free_pages((unsigned long)info
->dir_entries
, get_order(info
->dir_buf_size
));
526 const char *ceph_session_state_name(int s
)
529 case CEPH_MDS_SESSION_NEW
: return "new";
530 case CEPH_MDS_SESSION_OPENING
: return "opening";
531 case CEPH_MDS_SESSION_OPEN
: return "open";
532 case CEPH_MDS_SESSION_HUNG
: return "hung";
533 case CEPH_MDS_SESSION_CLOSING
: return "closing";
534 case CEPH_MDS_SESSION_CLOSED
: return "closed";
535 case CEPH_MDS_SESSION_RESTARTING
: return "restarting";
536 case CEPH_MDS_SESSION_RECONNECTING
: return "reconnecting";
537 case CEPH_MDS_SESSION_REJECTED
: return "rejected";
538 default: return "???";
542 struct ceph_mds_session
*ceph_get_mds_session(struct ceph_mds_session
*s
)
544 if (refcount_inc_not_zero(&s
->s_ref
)) {
545 dout("mdsc get_session %p %d -> %d\n", s
,
546 refcount_read(&s
->s_ref
)-1, refcount_read(&s
->s_ref
));
549 dout("mdsc get_session %p 0 -- FAIL\n", s
);
554 void ceph_put_mds_session(struct ceph_mds_session
*s
)
556 dout("mdsc put_session %p %d -> %d\n", s
,
557 refcount_read(&s
->s_ref
), refcount_read(&s
->s_ref
)-1);
558 if (refcount_dec_and_test(&s
->s_ref
)) {
559 if (s
->s_auth
.authorizer
)
560 ceph_auth_destroy_authorizer(s
->s_auth
.authorizer
);
566 * called under mdsc->mutex
568 struct ceph_mds_session
*__ceph_lookup_mds_session(struct ceph_mds_client
*mdsc
,
571 if (mds
>= mdsc
->max_sessions
|| !mdsc
->sessions
[mds
])
573 return ceph_get_mds_session(mdsc
->sessions
[mds
]);
576 static bool __have_session(struct ceph_mds_client
*mdsc
, int mds
)
578 if (mds
>= mdsc
->max_sessions
|| !mdsc
->sessions
[mds
])
584 static int __verify_registered_session(struct ceph_mds_client
*mdsc
,
585 struct ceph_mds_session
*s
)
587 if (s
->s_mds
>= mdsc
->max_sessions
||
588 mdsc
->sessions
[s
->s_mds
] != s
)
594 * create+register a new session for given mds.
595 * called under mdsc->mutex.
597 static struct ceph_mds_session
*register_session(struct ceph_mds_client
*mdsc
,
600 struct ceph_mds_session
*s
;
602 if (mds
>= mdsc
->mdsmap
->possible_max_rank
)
603 return ERR_PTR(-EINVAL
);
605 s
= kzalloc(sizeof(*s
), GFP_NOFS
);
607 return ERR_PTR(-ENOMEM
);
609 if (mds
>= mdsc
->max_sessions
) {
610 int newmax
= 1 << get_count_order(mds
+ 1);
611 struct ceph_mds_session
**sa
;
613 dout("%s: realloc to %d\n", __func__
, newmax
);
614 sa
= kcalloc(newmax
, sizeof(void *), GFP_NOFS
);
617 if (mdsc
->sessions
) {
618 memcpy(sa
, mdsc
->sessions
,
619 mdsc
->max_sessions
* sizeof(void *));
620 kfree(mdsc
->sessions
);
623 mdsc
->max_sessions
= newmax
;
626 dout("%s: mds%d\n", __func__
, mds
);
629 s
->s_state
= CEPH_MDS_SESSION_NEW
;
632 mutex_init(&s
->s_mutex
);
634 ceph_con_init(&s
->s_con
, s
, &mds_con_ops
, &mdsc
->fsc
->client
->msgr
);
636 spin_lock_init(&s
->s_gen_ttl_lock
);
638 s
->s_cap_ttl
= jiffies
- 1;
640 spin_lock_init(&s
->s_cap_lock
);
641 s
->s_renew_requested
= 0;
643 INIT_LIST_HEAD(&s
->s_caps
);
645 refcount_set(&s
->s_ref
, 1);
646 INIT_LIST_HEAD(&s
->s_waiting
);
647 INIT_LIST_HEAD(&s
->s_unsafe
);
648 s
->s_num_cap_releases
= 0;
649 s
->s_cap_reconnect
= 0;
650 s
->s_cap_iterator
= NULL
;
651 INIT_LIST_HEAD(&s
->s_cap_releases
);
652 INIT_WORK(&s
->s_cap_release_work
, ceph_cap_release_work
);
654 INIT_LIST_HEAD(&s
->s_cap_flushing
);
656 mdsc
->sessions
[mds
] = s
;
657 atomic_inc(&mdsc
->num_sessions
);
658 refcount_inc(&s
->s_ref
); /* one ref to sessions[], one to caller */
660 ceph_con_open(&s
->s_con
, CEPH_ENTITY_TYPE_MDS
, mds
,
661 ceph_mdsmap_get_addr(mdsc
->mdsmap
, mds
));
667 return ERR_PTR(-ENOMEM
);
671 * called under mdsc->mutex
673 static void __unregister_session(struct ceph_mds_client
*mdsc
,
674 struct ceph_mds_session
*s
)
676 dout("__unregister_session mds%d %p\n", s
->s_mds
, s
);
677 BUG_ON(mdsc
->sessions
[s
->s_mds
] != s
);
678 mdsc
->sessions
[s
->s_mds
] = NULL
;
679 ceph_con_close(&s
->s_con
);
680 ceph_put_mds_session(s
);
681 atomic_dec(&mdsc
->num_sessions
);
685 * drop session refs in request.
687 * should be last request ref, or hold mdsc->mutex
689 static void put_request_session(struct ceph_mds_request
*req
)
691 if (req
->r_session
) {
692 ceph_put_mds_session(req
->r_session
);
693 req
->r_session
= NULL
;
697 void ceph_mdsc_release_request(struct kref
*kref
)
699 struct ceph_mds_request
*req
= container_of(kref
,
700 struct ceph_mds_request
,
702 destroy_reply_info(&req
->r_reply_info
);
704 ceph_msg_put(req
->r_request
);
706 ceph_msg_put(req
->r_reply
);
708 ceph_put_cap_refs(ceph_inode(req
->r_inode
), CEPH_CAP_PIN
);
709 /* avoid calling iput_final() in mds dispatch threads */
710 ceph_async_iput(req
->r_inode
);
713 ceph_put_cap_refs(ceph_inode(req
->r_parent
), CEPH_CAP_PIN
);
714 ceph_async_iput(req
->r_parent
);
716 ceph_async_iput(req
->r_target_inode
);
719 if (req
->r_old_dentry
)
720 dput(req
->r_old_dentry
);
721 if (req
->r_old_dentry_dir
) {
723 * track (and drop pins for) r_old_dentry_dir
724 * separately, since r_old_dentry's d_parent may have
725 * changed between the dir mutex being dropped and
726 * this request being freed.
728 ceph_put_cap_refs(ceph_inode(req
->r_old_dentry_dir
),
730 ceph_async_iput(req
->r_old_dentry_dir
);
735 ceph_pagelist_release(req
->r_pagelist
);
736 put_request_session(req
);
737 ceph_unreserve_caps(req
->r_mdsc
, &req
->r_caps_reservation
);
738 WARN_ON_ONCE(!list_empty(&req
->r_wait
));
742 DEFINE_RB_FUNCS(request
, struct ceph_mds_request
, r_tid
, r_node
)
745 * lookup session, bump ref if found.
747 * called under mdsc->mutex.
749 static struct ceph_mds_request
*
750 lookup_get_request(struct ceph_mds_client
*mdsc
, u64 tid
)
752 struct ceph_mds_request
*req
;
754 req
= lookup_request(&mdsc
->request_tree
, tid
);
756 ceph_mdsc_get_request(req
);
762 * Register an in-flight request, and assign a tid. Link to directory
763 * are modifying (if any).
765 * Called under mdsc->mutex.
767 static void __register_request(struct ceph_mds_client
*mdsc
,
768 struct ceph_mds_request
*req
,
773 req
->r_tid
= ++mdsc
->last_tid
;
774 if (req
->r_num_caps
) {
775 ret
= ceph_reserve_caps(mdsc
, &req
->r_caps_reservation
,
778 pr_err("__register_request %p "
779 "failed to reserve caps: %d\n", req
, ret
);
780 /* set req->r_err to fail early from __do_request */
785 dout("__register_request %p tid %lld\n", req
, req
->r_tid
);
786 ceph_mdsc_get_request(req
);
787 insert_request(&mdsc
->request_tree
, req
);
789 req
->r_uid
= current_fsuid();
790 req
->r_gid
= current_fsgid();
792 if (mdsc
->oldest_tid
== 0 && req
->r_op
!= CEPH_MDS_OP_SETFILELOCK
)
793 mdsc
->oldest_tid
= req
->r_tid
;
797 req
->r_unsafe_dir
= dir
;
801 static void __unregister_request(struct ceph_mds_client
*mdsc
,
802 struct ceph_mds_request
*req
)
804 dout("__unregister_request %p tid %lld\n", req
, req
->r_tid
);
806 /* Never leave an unregistered request on an unsafe list! */
807 list_del_init(&req
->r_unsafe_item
);
809 if (req
->r_tid
== mdsc
->oldest_tid
) {
810 struct rb_node
*p
= rb_next(&req
->r_node
);
811 mdsc
->oldest_tid
= 0;
813 struct ceph_mds_request
*next_req
=
814 rb_entry(p
, struct ceph_mds_request
, r_node
);
815 if (next_req
->r_op
!= CEPH_MDS_OP_SETFILELOCK
) {
816 mdsc
->oldest_tid
= next_req
->r_tid
;
823 erase_request(&mdsc
->request_tree
, req
);
825 if (req
->r_unsafe_dir
&&
826 test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
827 struct ceph_inode_info
*ci
= ceph_inode(req
->r_unsafe_dir
);
828 spin_lock(&ci
->i_unsafe_lock
);
829 list_del_init(&req
->r_unsafe_dir_item
);
830 spin_unlock(&ci
->i_unsafe_lock
);
832 if (req
->r_target_inode
&&
833 test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
834 struct ceph_inode_info
*ci
= ceph_inode(req
->r_target_inode
);
835 spin_lock(&ci
->i_unsafe_lock
);
836 list_del_init(&req
->r_unsafe_target_item
);
837 spin_unlock(&ci
->i_unsafe_lock
);
840 if (req
->r_unsafe_dir
) {
841 /* avoid calling iput_final() in mds dispatch threads */
842 ceph_async_iput(req
->r_unsafe_dir
);
843 req
->r_unsafe_dir
= NULL
;
846 complete_all(&req
->r_safe_completion
);
848 ceph_mdsc_put_request(req
);
852 * Walk back up the dentry tree until we hit a dentry representing a
853 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
854 * when calling this) to ensure that the objects won't disappear while we're
855 * working with them. Once we hit a candidate dentry, we attempt to take a
856 * reference to it, and return that as the result.
858 static struct inode
*get_nonsnap_parent(struct dentry
*dentry
)
860 struct inode
*inode
= NULL
;
862 while (dentry
&& !IS_ROOT(dentry
)) {
863 inode
= d_inode_rcu(dentry
);
864 if (!inode
|| ceph_snap(inode
) == CEPH_NOSNAP
)
866 dentry
= dentry
->d_parent
;
869 inode
= igrab(inode
);
874 * Choose mds to send request to next. If there is a hint set in the
875 * request (e.g., due to a prior forward hint from the mds), use that.
876 * Otherwise, consult frag tree and/or caps to identify the
877 * appropriate mds. If all else fails, choose randomly.
879 * Called under mdsc->mutex.
881 static int __choose_mds(struct ceph_mds_client
*mdsc
,
882 struct ceph_mds_request
*req
,
886 struct ceph_inode_info
*ci
;
887 struct ceph_cap
*cap
;
888 int mode
= req
->r_direct_mode
;
890 u32 hash
= req
->r_direct_hash
;
891 bool is_hash
= test_bit(CEPH_MDS_R_DIRECT_IS_HASH
, &req
->r_req_flags
);
897 * is there a specific mds we should try? ignore hint if we have
898 * no session and the mds is not up (active or recovering).
900 if (req
->r_resend_mds
>= 0 &&
901 (__have_session(mdsc
, req
->r_resend_mds
) ||
902 ceph_mdsmap_get_state(mdsc
->mdsmap
, req
->r_resend_mds
) > 0)) {
903 dout("%s using resend_mds mds%d\n", __func__
,
905 return req
->r_resend_mds
;
908 if (mode
== USE_RANDOM_MDS
)
913 if (ceph_snap(req
->r_inode
) != CEPH_SNAPDIR
) {
914 inode
= req
->r_inode
;
917 /* req->r_dentry is non-null for LSSNAP request */
919 inode
= get_nonsnap_parent(req
->r_dentry
);
921 dout("%s using snapdir's parent %p\n", __func__
, inode
);
923 } else if (req
->r_dentry
) {
924 /* ignore race with rename; old or new d_parent is okay */
925 struct dentry
*parent
;
929 parent
= READ_ONCE(req
->r_dentry
->d_parent
);
930 dir
= req
->r_parent
? : d_inode_rcu(parent
);
932 if (!dir
|| dir
->i_sb
!= mdsc
->fsc
->sb
) {
933 /* not this fs or parent went negative */
934 inode
= d_inode(req
->r_dentry
);
937 } else if (ceph_snap(dir
) != CEPH_NOSNAP
) {
938 /* direct snapped/virtual snapdir requests
939 * based on parent dir inode */
940 inode
= get_nonsnap_parent(parent
);
941 dout("%s using nonsnap parent %p\n", __func__
, inode
);
944 inode
= d_inode(req
->r_dentry
);
945 if (!inode
|| mode
== USE_AUTH_MDS
) {
948 hash
= ceph_dentry_hash(dir
, req
->r_dentry
);
957 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__
, inode
, (int)is_hash
,
961 ci
= ceph_inode(inode
);
963 if (is_hash
&& S_ISDIR(inode
->i_mode
)) {
964 struct ceph_inode_frag frag
;
967 ceph_choose_frag(ci
, hash
, &frag
, &found
);
969 if (mode
== USE_ANY_MDS
&& frag
.ndist
> 0) {
972 /* choose a random replica */
973 get_random_bytes(&r
, 1);
976 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
977 __func__
, inode
, ceph_vinop(inode
),
978 frag
.frag
, mds
, (int)r
, frag
.ndist
);
979 if (ceph_mdsmap_get_state(mdsc
->mdsmap
, mds
) >=
980 CEPH_MDS_STATE_ACTIVE
&&
981 !ceph_mdsmap_is_laggy(mdsc
->mdsmap
, mds
))
985 /* since this file/dir wasn't known to be
986 * replicated, then we want to look for the
987 * authoritative mds. */
989 /* choose auth mds */
991 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
992 __func__
, inode
, ceph_vinop(inode
),
994 if (ceph_mdsmap_get_state(mdsc
->mdsmap
, mds
) >=
995 CEPH_MDS_STATE_ACTIVE
) {
996 if (mode
== USE_ANY_MDS
&&
997 !ceph_mdsmap_is_laggy(mdsc
->mdsmap
,
1002 mode
= USE_AUTH_MDS
;
1006 spin_lock(&ci
->i_ceph_lock
);
1008 if (mode
== USE_AUTH_MDS
)
1009 cap
= ci
->i_auth_cap
;
1010 if (!cap
&& !RB_EMPTY_ROOT(&ci
->i_caps
))
1011 cap
= rb_entry(rb_first(&ci
->i_caps
), struct ceph_cap
, ci_node
);
1013 spin_unlock(&ci
->i_ceph_lock
);
1014 ceph_async_iput(inode
);
1017 mds
= cap
->session
->s_mds
;
1018 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__
,
1019 inode
, ceph_vinop(inode
), mds
,
1020 cap
== ci
->i_auth_cap
? "auth " : "", cap
);
1021 spin_unlock(&ci
->i_ceph_lock
);
1023 /* avoid calling iput_final() while holding mdsc->mutex or
1024 * in mds dispatch threads */
1025 ceph_async_iput(inode
);
1032 mds
= ceph_mdsmap_get_random_mds(mdsc
->mdsmap
);
1033 dout("%s chose random mds%d\n", __func__
, mds
);
1041 static struct ceph_msg
*create_session_msg(u32 op
, u64 seq
)
1043 struct ceph_msg
*msg
;
1044 struct ceph_mds_session_head
*h
;
1046 msg
= ceph_msg_new(CEPH_MSG_CLIENT_SESSION
, sizeof(*h
), GFP_NOFS
,
1049 pr_err("create_session_msg ENOMEM creating msg\n");
1052 h
= msg
->front
.iov_base
;
1053 h
->op
= cpu_to_le32(op
);
1054 h
->seq
= cpu_to_le64(seq
);
1059 static const unsigned char feature_bits
[] = CEPHFS_FEATURES_CLIENT_SUPPORTED
;
1060 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1061 static void encode_supported_features(void **p
, void *end
)
1063 static const size_t count
= ARRAY_SIZE(feature_bits
);
1067 size_t size
= FEATURE_BYTES(count
);
1069 BUG_ON(*p
+ 4 + size
> end
);
1070 ceph_encode_32(p
, size
);
1071 memset(*p
, 0, size
);
1072 for (i
= 0; i
< count
; i
++)
1073 ((unsigned char*)(*p
))[i
/ 8] |= BIT(feature_bits
[i
] % 8);
1076 BUG_ON(*p
+ 4 > end
);
1077 ceph_encode_32(p
, 0);
1082 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1083 * to include additional client metadata fields.
1085 static struct ceph_msg
*create_session_open_msg(struct ceph_mds_client
*mdsc
, u64 seq
)
1087 struct ceph_msg
*msg
;
1088 struct ceph_mds_session_head
*h
;
1090 int extra_bytes
= 0;
1091 int metadata_key_count
= 0;
1092 struct ceph_options
*opt
= mdsc
->fsc
->client
->options
;
1093 struct ceph_mount_options
*fsopt
= mdsc
->fsc
->mount_options
;
1097 const char* metadata
[][2] = {
1098 {"hostname", mdsc
->nodename
},
1099 {"kernel_version", init_utsname()->release
},
1100 {"entity_id", opt
->name
? : ""},
1101 {"root", fsopt
->server_path
? : "/"},
1105 /* Calculate serialized length of metadata */
1106 extra_bytes
= 4; /* map length */
1107 for (i
= 0; metadata
[i
][0]; ++i
) {
1108 extra_bytes
+= 8 + strlen(metadata
[i
][0]) +
1109 strlen(metadata
[i
][1]);
1110 metadata_key_count
++;
1113 /* supported feature */
1115 count
= ARRAY_SIZE(feature_bits
);
1117 size
= FEATURE_BYTES(count
);
1118 extra_bytes
+= 4 + size
;
1120 /* Allocate the message */
1121 msg
= ceph_msg_new(CEPH_MSG_CLIENT_SESSION
, sizeof(*h
) + extra_bytes
,
1124 pr_err("create_session_msg ENOMEM creating msg\n");
1127 p
= msg
->front
.iov_base
;
1128 end
= p
+ msg
->front
.iov_len
;
1131 h
->op
= cpu_to_le32(CEPH_SESSION_REQUEST_OPEN
);
1132 h
->seq
= cpu_to_le64(seq
);
1135 * Serialize client metadata into waiting buffer space, using
1136 * the format that userspace expects for map<string, string>
1138 * ClientSession messages with metadata are v3
1140 msg
->hdr
.version
= cpu_to_le16(3);
1141 msg
->hdr
.compat_version
= cpu_to_le16(1);
1143 /* The write pointer, following the session_head structure */
1146 /* Number of entries in the map */
1147 ceph_encode_32(&p
, metadata_key_count
);
1149 /* Two length-prefixed strings for each entry in the map */
1150 for (i
= 0; metadata
[i
][0]; ++i
) {
1151 size_t const key_len
= strlen(metadata
[i
][0]);
1152 size_t const val_len
= strlen(metadata
[i
][1]);
1154 ceph_encode_32(&p
, key_len
);
1155 memcpy(p
, metadata
[i
][0], key_len
);
1157 ceph_encode_32(&p
, val_len
);
1158 memcpy(p
, metadata
[i
][1], val_len
);
1162 encode_supported_features(&p
, end
);
1163 msg
->front
.iov_len
= p
- msg
->front
.iov_base
;
1164 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
1170 * send session open request.
1172 * called under mdsc->mutex
1174 static int __open_session(struct ceph_mds_client
*mdsc
,
1175 struct ceph_mds_session
*session
)
1177 struct ceph_msg
*msg
;
1179 int mds
= session
->s_mds
;
1181 /* wait for mds to go active? */
1182 mstate
= ceph_mdsmap_get_state(mdsc
->mdsmap
, mds
);
1183 dout("open_session to mds%d (%s)\n", mds
,
1184 ceph_mds_state_name(mstate
));
1185 session
->s_state
= CEPH_MDS_SESSION_OPENING
;
1186 session
->s_renew_requested
= jiffies
;
1188 /* send connect message */
1189 msg
= create_session_open_msg(mdsc
, session
->s_seq
);
1192 ceph_con_send(&session
->s_con
, msg
);
1197 * open sessions for any export targets for the given mds
1199 * called under mdsc->mutex
1201 static struct ceph_mds_session
*
1202 __open_export_target_session(struct ceph_mds_client
*mdsc
, int target
)
1204 struct ceph_mds_session
*session
;
1206 session
= __ceph_lookup_mds_session(mdsc
, target
);
1208 session
= register_session(mdsc
, target
);
1209 if (IS_ERR(session
))
1212 if (session
->s_state
== CEPH_MDS_SESSION_NEW
||
1213 session
->s_state
== CEPH_MDS_SESSION_CLOSING
)
1214 __open_session(mdsc
, session
);
1219 struct ceph_mds_session
*
1220 ceph_mdsc_open_export_target_session(struct ceph_mds_client
*mdsc
, int target
)
1222 struct ceph_mds_session
*session
;
1224 dout("open_export_target_session to mds%d\n", target
);
1226 mutex_lock(&mdsc
->mutex
);
1227 session
= __open_export_target_session(mdsc
, target
);
1228 mutex_unlock(&mdsc
->mutex
);
1233 static void __open_export_target_sessions(struct ceph_mds_client
*mdsc
,
1234 struct ceph_mds_session
*session
)
1236 struct ceph_mds_info
*mi
;
1237 struct ceph_mds_session
*ts
;
1238 int i
, mds
= session
->s_mds
;
1240 if (mds
>= mdsc
->mdsmap
->possible_max_rank
)
1243 mi
= &mdsc
->mdsmap
->m_info
[mds
];
1244 dout("open_export_target_sessions for mds%d (%d targets)\n",
1245 session
->s_mds
, mi
->num_export_targets
);
1247 for (i
= 0; i
< mi
->num_export_targets
; i
++) {
1248 ts
= __open_export_target_session(mdsc
, mi
->export_targets
[i
]);
1250 ceph_put_mds_session(ts
);
1254 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client
*mdsc
,
1255 struct ceph_mds_session
*session
)
1257 mutex_lock(&mdsc
->mutex
);
1258 __open_export_target_sessions(mdsc
, session
);
1259 mutex_unlock(&mdsc
->mutex
);
1266 static void detach_cap_releases(struct ceph_mds_session
*session
,
1267 struct list_head
*target
)
1269 lockdep_assert_held(&session
->s_cap_lock
);
1271 list_splice_init(&session
->s_cap_releases
, target
);
1272 session
->s_num_cap_releases
= 0;
1273 dout("dispose_cap_releases mds%d\n", session
->s_mds
);
1276 static void dispose_cap_releases(struct ceph_mds_client
*mdsc
,
1277 struct list_head
*dispose
)
1279 while (!list_empty(dispose
)) {
1280 struct ceph_cap
*cap
;
1281 /* zero out the in-progress message */
1282 cap
= list_first_entry(dispose
, struct ceph_cap
, session_caps
);
1283 list_del(&cap
->session_caps
);
1284 ceph_put_cap(mdsc
, cap
);
1288 static void cleanup_session_requests(struct ceph_mds_client
*mdsc
,
1289 struct ceph_mds_session
*session
)
1291 struct ceph_mds_request
*req
;
1293 struct ceph_inode_info
*ci
;
1295 dout("cleanup_session_requests mds%d\n", session
->s_mds
);
1296 mutex_lock(&mdsc
->mutex
);
1297 while (!list_empty(&session
->s_unsafe
)) {
1298 req
= list_first_entry(&session
->s_unsafe
,
1299 struct ceph_mds_request
, r_unsafe_item
);
1300 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1302 if (req
->r_target_inode
) {
1303 /* dropping unsafe change of inode's attributes */
1304 ci
= ceph_inode(req
->r_target_inode
);
1305 errseq_set(&ci
->i_meta_err
, -EIO
);
1307 if (req
->r_unsafe_dir
) {
1308 /* dropping unsafe directory operation */
1309 ci
= ceph_inode(req
->r_unsafe_dir
);
1310 errseq_set(&ci
->i_meta_err
, -EIO
);
1312 __unregister_request(mdsc
, req
);
1314 /* zero r_attempts, so kick_requests() will re-send requests */
1315 p
= rb_first(&mdsc
->request_tree
);
1317 req
= rb_entry(p
, struct ceph_mds_request
, r_node
);
1319 if (req
->r_session
&&
1320 req
->r_session
->s_mds
== session
->s_mds
)
1321 req
->r_attempts
= 0;
1323 mutex_unlock(&mdsc
->mutex
);
1327 * Helper to safely iterate over all caps associated with a session, with
1328 * special care taken to handle a racing __ceph_remove_cap().
1330 * Caller must hold session s_mutex.
1332 int ceph_iterate_session_caps(struct ceph_mds_session
*session
,
1333 int (*cb
)(struct inode
*, struct ceph_cap
*,
1336 struct list_head
*p
;
1337 struct ceph_cap
*cap
;
1338 struct inode
*inode
, *last_inode
= NULL
;
1339 struct ceph_cap
*old_cap
= NULL
;
1342 dout("iterate_session_caps %p mds%d\n", session
, session
->s_mds
);
1343 spin_lock(&session
->s_cap_lock
);
1344 p
= session
->s_caps
.next
;
1345 while (p
!= &session
->s_caps
) {
1346 cap
= list_entry(p
, struct ceph_cap
, session_caps
);
1347 inode
= igrab(&cap
->ci
->vfs_inode
);
1352 session
->s_cap_iterator
= cap
;
1353 spin_unlock(&session
->s_cap_lock
);
1356 /* avoid calling iput_final() while holding
1357 * s_mutex or in mds dispatch threads */
1358 ceph_async_iput(last_inode
);
1362 ceph_put_cap(session
->s_mdsc
, old_cap
);
1366 ret
= cb(inode
, cap
, arg
);
1369 spin_lock(&session
->s_cap_lock
);
1372 dout("iterate_session_caps finishing cap %p removal\n",
1374 BUG_ON(cap
->session
!= session
);
1375 cap
->session
= NULL
;
1376 list_del_init(&cap
->session_caps
);
1377 session
->s_nr_caps
--;
1378 if (cap
->queue_release
)
1379 __ceph_queue_cap_release(session
, cap
);
1381 old_cap
= cap
; /* put_cap it w/o locks held */
1388 session
->s_cap_iterator
= NULL
;
1389 spin_unlock(&session
->s_cap_lock
);
1391 ceph_async_iput(last_inode
);
1393 ceph_put_cap(session
->s_mdsc
, old_cap
);
1398 static int remove_session_caps_cb(struct inode
*inode
, struct ceph_cap
*cap
,
1401 struct ceph_fs_client
*fsc
= (struct ceph_fs_client
*)arg
;
1402 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1403 LIST_HEAD(to_remove
);
1404 bool dirty_dropped
= false;
1405 bool invalidate
= false;
1407 dout("removing cap %p, ci is %p, inode is %p\n",
1408 cap
, ci
, &ci
->vfs_inode
);
1409 spin_lock(&ci
->i_ceph_lock
);
1410 if (cap
->mds_wanted
| cap
->issued
)
1411 ci
->i_ceph_flags
|= CEPH_I_CAP_DROPPED
;
1412 __ceph_remove_cap(cap
, false);
1413 if (!ci
->i_auth_cap
) {
1414 struct ceph_cap_flush
*cf
;
1415 struct ceph_mds_client
*mdsc
= fsc
->mdsc
;
1417 if (READ_ONCE(fsc
->mount_state
) == CEPH_MOUNT_SHUTDOWN
) {
1418 if (inode
->i_data
.nrpages
> 0)
1420 if (ci
->i_wrbuffer_ref
> 0)
1421 mapping_set_error(&inode
->i_data
, -EIO
);
1424 while (!list_empty(&ci
->i_cap_flush_list
)) {
1425 cf
= list_first_entry(&ci
->i_cap_flush_list
,
1426 struct ceph_cap_flush
, i_list
);
1427 list_move(&cf
->i_list
, &to_remove
);
1430 spin_lock(&mdsc
->cap_dirty_lock
);
1432 list_for_each_entry(cf
, &to_remove
, i_list
)
1433 list_del(&cf
->g_list
);
1435 if (!list_empty(&ci
->i_dirty_item
)) {
1436 pr_warn_ratelimited(
1437 " dropping dirty %s state for %p %lld\n",
1438 ceph_cap_string(ci
->i_dirty_caps
),
1439 inode
, ceph_ino(inode
));
1440 ci
->i_dirty_caps
= 0;
1441 list_del_init(&ci
->i_dirty_item
);
1442 dirty_dropped
= true;
1444 if (!list_empty(&ci
->i_flushing_item
)) {
1445 pr_warn_ratelimited(
1446 " dropping dirty+flushing %s state for %p %lld\n",
1447 ceph_cap_string(ci
->i_flushing_caps
),
1448 inode
, ceph_ino(inode
));
1449 ci
->i_flushing_caps
= 0;
1450 list_del_init(&ci
->i_flushing_item
);
1451 mdsc
->num_cap_flushing
--;
1452 dirty_dropped
= true;
1454 spin_unlock(&mdsc
->cap_dirty_lock
);
1456 if (dirty_dropped
) {
1457 errseq_set(&ci
->i_meta_err
, -EIO
);
1459 if (ci
->i_wrbuffer_ref_head
== 0 &&
1460 ci
->i_wr_ref
== 0 &&
1461 ci
->i_dirty_caps
== 0 &&
1462 ci
->i_flushing_caps
== 0) {
1463 ceph_put_snap_context(ci
->i_head_snapc
);
1464 ci
->i_head_snapc
= NULL
;
1468 if (atomic_read(&ci
->i_filelock_ref
) > 0) {
1469 /* make further file lock syscall return -EIO */
1470 ci
->i_ceph_flags
|= CEPH_I_ERROR_FILELOCK
;
1471 pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1472 inode
, ceph_ino(inode
));
1475 if (!ci
->i_dirty_caps
&& ci
->i_prealloc_cap_flush
) {
1476 list_add(&ci
->i_prealloc_cap_flush
->i_list
, &to_remove
);
1477 ci
->i_prealloc_cap_flush
= NULL
;
1480 spin_unlock(&ci
->i_ceph_lock
);
1481 while (!list_empty(&to_remove
)) {
1482 struct ceph_cap_flush
*cf
;
1483 cf
= list_first_entry(&to_remove
,
1484 struct ceph_cap_flush
, i_list
);
1485 list_del(&cf
->i_list
);
1486 ceph_free_cap_flush(cf
);
1489 wake_up_all(&ci
->i_cap_wq
);
1491 ceph_queue_invalidate(inode
);
1498 * caller must hold session s_mutex
1500 static void remove_session_caps(struct ceph_mds_session
*session
)
1502 struct ceph_fs_client
*fsc
= session
->s_mdsc
->fsc
;
1503 struct super_block
*sb
= fsc
->sb
;
1506 dout("remove_session_caps on %p\n", session
);
1507 ceph_iterate_session_caps(session
, remove_session_caps_cb
, fsc
);
1509 wake_up_all(&fsc
->mdsc
->cap_flushing_wq
);
1511 spin_lock(&session
->s_cap_lock
);
1512 if (session
->s_nr_caps
> 0) {
1513 struct inode
*inode
;
1514 struct ceph_cap
*cap
, *prev
= NULL
;
1515 struct ceph_vino vino
;
1517 * iterate_session_caps() skips inodes that are being
1518 * deleted, we need to wait until deletions are complete.
1519 * __wait_on_freeing_inode() is designed for the job,
1520 * but it is not exported, so use lookup inode function
1523 while (!list_empty(&session
->s_caps
)) {
1524 cap
= list_entry(session
->s_caps
.next
,
1525 struct ceph_cap
, session_caps
);
1529 vino
= cap
->ci
->i_vino
;
1530 spin_unlock(&session
->s_cap_lock
);
1532 inode
= ceph_find_inode(sb
, vino
);
1533 /* avoid calling iput_final() while holding s_mutex */
1534 ceph_async_iput(inode
);
1536 spin_lock(&session
->s_cap_lock
);
1540 // drop cap expires and unlock s_cap_lock
1541 detach_cap_releases(session
, &dispose
);
1543 BUG_ON(session
->s_nr_caps
> 0);
1544 BUG_ON(!list_empty(&session
->s_cap_flushing
));
1545 spin_unlock(&session
->s_cap_lock
);
1546 dispose_cap_releases(session
->s_mdsc
, &dispose
);
1556 * wake up any threads waiting on this session's caps. if the cap is
1557 * old (didn't get renewed on the client reconnect), remove it now.
1559 * caller must hold s_mutex.
1561 static int wake_up_session_cb(struct inode
*inode
, struct ceph_cap
*cap
,
1564 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1565 unsigned long ev
= (unsigned long)arg
;
1567 if (ev
== RECONNECT
) {
1568 spin_lock(&ci
->i_ceph_lock
);
1569 ci
->i_wanted_max_size
= 0;
1570 ci
->i_requested_max_size
= 0;
1571 spin_unlock(&ci
->i_ceph_lock
);
1572 } else if (ev
== RENEWCAPS
) {
1573 if (cap
->cap_gen
< cap
->session
->s_cap_gen
) {
1574 /* mds did not re-issue stale cap */
1575 spin_lock(&ci
->i_ceph_lock
);
1576 cap
->issued
= cap
->implemented
= CEPH_CAP_PIN
;
1577 /* make sure mds knows what we want */
1578 if (__ceph_caps_file_wanted(ci
) & ~cap
->mds_wanted
)
1579 ci
->i_ceph_flags
|= CEPH_I_CAP_DROPPED
;
1580 spin_unlock(&ci
->i_ceph_lock
);
1582 } else if (ev
== FORCE_RO
) {
1584 wake_up_all(&ci
->i_cap_wq
);
1588 static void wake_up_session_caps(struct ceph_mds_session
*session
, int ev
)
1590 dout("wake_up_session_caps %p mds%d\n", session
, session
->s_mds
);
1591 ceph_iterate_session_caps(session
, wake_up_session_cb
,
1592 (void *)(unsigned long)ev
);
1596 * Send periodic message to MDS renewing all currently held caps. The
1597 * ack will reset the expiration for all caps from this session.
1599 * caller holds s_mutex
1601 static int send_renew_caps(struct ceph_mds_client
*mdsc
,
1602 struct ceph_mds_session
*session
)
1604 struct ceph_msg
*msg
;
1607 if (time_after_eq(jiffies
, session
->s_cap_ttl
) &&
1608 time_after_eq(session
->s_cap_ttl
, session
->s_renew_requested
))
1609 pr_info("mds%d caps stale\n", session
->s_mds
);
1610 session
->s_renew_requested
= jiffies
;
1612 /* do not try to renew caps until a recovering mds has reconnected
1613 * with its clients. */
1614 state
= ceph_mdsmap_get_state(mdsc
->mdsmap
, session
->s_mds
);
1615 if (state
< CEPH_MDS_STATE_RECONNECT
) {
1616 dout("send_renew_caps ignoring mds%d (%s)\n",
1617 session
->s_mds
, ceph_mds_state_name(state
));
1621 dout("send_renew_caps to mds%d (%s)\n", session
->s_mds
,
1622 ceph_mds_state_name(state
));
1623 msg
= create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS
,
1624 ++session
->s_renew_seq
);
1627 ceph_con_send(&session
->s_con
, msg
);
1631 static int send_flushmsg_ack(struct ceph_mds_client
*mdsc
,
1632 struct ceph_mds_session
*session
, u64 seq
)
1634 struct ceph_msg
*msg
;
1636 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1637 session
->s_mds
, ceph_session_state_name(session
->s_state
), seq
);
1638 msg
= create_session_msg(CEPH_SESSION_FLUSHMSG_ACK
, seq
);
1641 ceph_con_send(&session
->s_con
, msg
);
1647 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1649 * Called under session->s_mutex
1651 static void renewed_caps(struct ceph_mds_client
*mdsc
,
1652 struct ceph_mds_session
*session
, int is_renew
)
1657 spin_lock(&session
->s_cap_lock
);
1658 was_stale
= is_renew
&& time_after_eq(jiffies
, session
->s_cap_ttl
);
1660 session
->s_cap_ttl
= session
->s_renew_requested
+
1661 mdsc
->mdsmap
->m_session_timeout
*HZ
;
1664 if (time_before(jiffies
, session
->s_cap_ttl
)) {
1665 pr_info("mds%d caps renewed\n", session
->s_mds
);
1668 pr_info("mds%d caps still stale\n", session
->s_mds
);
1671 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1672 session
->s_mds
, session
->s_cap_ttl
, was_stale
? "stale" : "fresh",
1673 time_before(jiffies
, session
->s_cap_ttl
) ? "stale" : "fresh");
1674 spin_unlock(&session
->s_cap_lock
);
1677 wake_up_session_caps(session
, RENEWCAPS
);
1681 * send a session close request
1683 static int request_close_session(struct ceph_mds_client
*mdsc
,
1684 struct ceph_mds_session
*session
)
1686 struct ceph_msg
*msg
;
1688 dout("request_close_session mds%d state %s seq %lld\n",
1689 session
->s_mds
, ceph_session_state_name(session
->s_state
),
1691 msg
= create_session_msg(CEPH_SESSION_REQUEST_CLOSE
, session
->s_seq
);
1694 ceph_con_send(&session
->s_con
, msg
);
1699 * Called with s_mutex held.
1701 static int __close_session(struct ceph_mds_client
*mdsc
,
1702 struct ceph_mds_session
*session
)
1704 if (session
->s_state
>= CEPH_MDS_SESSION_CLOSING
)
1706 session
->s_state
= CEPH_MDS_SESSION_CLOSING
;
1707 return request_close_session(mdsc
, session
);
1710 static bool drop_negative_children(struct dentry
*dentry
)
1712 struct dentry
*child
;
1713 bool all_negative
= true;
1715 if (!d_is_dir(dentry
))
1718 spin_lock(&dentry
->d_lock
);
1719 list_for_each_entry(child
, &dentry
->d_subdirs
, d_child
) {
1720 if (d_really_is_positive(child
)) {
1721 all_negative
= false;
1725 spin_unlock(&dentry
->d_lock
);
1728 shrink_dcache_parent(dentry
);
1730 return all_negative
;
1734 * Trim old(er) caps.
1736 * Because we can't cache an inode without one or more caps, we do
1737 * this indirectly: if a cap is unused, we prune its aliases, at which
1738 * point the inode will hopefully get dropped to.
1740 * Yes, this is a bit sloppy. Our only real goal here is to respond to
1741 * memory pressure from the MDS, though, so it needn't be perfect.
1743 static int trim_caps_cb(struct inode
*inode
, struct ceph_cap
*cap
, void *arg
)
1745 int *remaining
= arg
;
1746 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1747 int used
, wanted
, oissued
, mine
;
1749 if (*remaining
<= 0)
1752 spin_lock(&ci
->i_ceph_lock
);
1753 mine
= cap
->issued
| cap
->implemented
;
1754 used
= __ceph_caps_used(ci
);
1755 wanted
= __ceph_caps_file_wanted(ci
);
1756 oissued
= __ceph_caps_issued_other(ci
, cap
);
1758 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1759 inode
, cap
, ceph_cap_string(mine
), ceph_cap_string(oissued
),
1760 ceph_cap_string(used
), ceph_cap_string(wanted
));
1761 if (cap
== ci
->i_auth_cap
) {
1762 if (ci
->i_dirty_caps
|| ci
->i_flushing_caps
||
1763 !list_empty(&ci
->i_cap_snaps
))
1765 if ((used
| wanted
) & CEPH_CAP_ANY_WR
)
1767 /* Note: it's possible that i_filelock_ref becomes non-zero
1768 * after dropping auth caps. It doesn't hurt because reply
1769 * of lock mds request will re-add auth caps. */
1770 if (atomic_read(&ci
->i_filelock_ref
) > 0)
1773 /* The inode has cached pages, but it's no longer used.
1774 * we can safely drop it */
1775 if (wanted
== 0 && used
== CEPH_CAP_FILE_CACHE
&&
1776 !(oissued
& CEPH_CAP_FILE_CACHE
)) {
1780 if ((used
| wanted
) & ~oissued
& mine
)
1781 goto out
; /* we need these caps */
1784 /* we aren't the only cap.. just remove us */
1785 __ceph_remove_cap(cap
, true);
1788 struct dentry
*dentry
;
1789 /* try dropping referring dentries */
1790 spin_unlock(&ci
->i_ceph_lock
);
1791 dentry
= d_find_any_alias(inode
);
1792 if (dentry
&& drop_negative_children(dentry
)) {
1795 d_prune_aliases(inode
);
1796 count
= atomic_read(&inode
->i_count
);
1799 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1808 spin_unlock(&ci
->i_ceph_lock
);
1813 * Trim session cap count down to some max number.
1815 int ceph_trim_caps(struct ceph_mds_client
*mdsc
,
1816 struct ceph_mds_session
*session
,
1819 int trim_caps
= session
->s_nr_caps
- max_caps
;
1821 dout("trim_caps mds%d start: %d / %d, trim %d\n",
1822 session
->s_mds
, session
->s_nr_caps
, max_caps
, trim_caps
);
1823 if (trim_caps
> 0) {
1824 int remaining
= trim_caps
;
1826 ceph_iterate_session_caps(session
, trim_caps_cb
, &remaining
);
1827 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1828 session
->s_mds
, session
->s_nr_caps
, max_caps
,
1829 trim_caps
- remaining
);
1832 ceph_flush_cap_releases(mdsc
, session
);
1836 static int check_caps_flush(struct ceph_mds_client
*mdsc
,
1841 spin_lock(&mdsc
->cap_dirty_lock
);
1842 if (!list_empty(&mdsc
->cap_flush_list
)) {
1843 struct ceph_cap_flush
*cf
=
1844 list_first_entry(&mdsc
->cap_flush_list
,
1845 struct ceph_cap_flush
, g_list
);
1846 if (cf
->tid
<= want_flush_tid
) {
1847 dout("check_caps_flush still flushing tid "
1848 "%llu <= %llu\n", cf
->tid
, want_flush_tid
);
1852 spin_unlock(&mdsc
->cap_dirty_lock
);
1857 * flush all dirty inode data to disk.
1859 * returns true if we've flushed through want_flush_tid
1861 static void wait_caps_flush(struct ceph_mds_client
*mdsc
,
1864 dout("check_caps_flush want %llu\n", want_flush_tid
);
1866 wait_event(mdsc
->cap_flushing_wq
,
1867 check_caps_flush(mdsc
, want_flush_tid
));
1869 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid
);
1873 * called under s_mutex
1875 static void ceph_send_cap_releases(struct ceph_mds_client
*mdsc
,
1876 struct ceph_mds_session
*session
)
1878 struct ceph_msg
*msg
= NULL
;
1879 struct ceph_mds_cap_release
*head
;
1880 struct ceph_mds_cap_item
*item
;
1881 struct ceph_osd_client
*osdc
= &mdsc
->fsc
->client
->osdc
;
1882 struct ceph_cap
*cap
;
1883 LIST_HEAD(tmp_list
);
1884 int num_cap_releases
;
1885 __le32 barrier
, *cap_barrier
;
1887 down_read(&osdc
->lock
);
1888 barrier
= cpu_to_le32(osdc
->epoch_barrier
);
1889 up_read(&osdc
->lock
);
1891 spin_lock(&session
->s_cap_lock
);
1893 list_splice_init(&session
->s_cap_releases
, &tmp_list
);
1894 num_cap_releases
= session
->s_num_cap_releases
;
1895 session
->s_num_cap_releases
= 0;
1896 spin_unlock(&session
->s_cap_lock
);
1898 while (!list_empty(&tmp_list
)) {
1900 msg
= ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE
,
1901 PAGE_SIZE
, GFP_NOFS
, false);
1904 head
= msg
->front
.iov_base
;
1905 head
->num
= cpu_to_le32(0);
1906 msg
->front
.iov_len
= sizeof(*head
);
1908 msg
->hdr
.version
= cpu_to_le16(2);
1909 msg
->hdr
.compat_version
= cpu_to_le16(1);
1912 cap
= list_first_entry(&tmp_list
, struct ceph_cap
,
1914 list_del(&cap
->session_caps
);
1917 head
= msg
->front
.iov_base
;
1918 put_unaligned_le32(get_unaligned_le32(&head
->num
) + 1,
1920 item
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
1921 item
->ino
= cpu_to_le64(cap
->cap_ino
);
1922 item
->cap_id
= cpu_to_le64(cap
->cap_id
);
1923 item
->migrate_seq
= cpu_to_le32(cap
->mseq
);
1924 item
->seq
= cpu_to_le32(cap
->issue_seq
);
1925 msg
->front
.iov_len
+= sizeof(*item
);
1927 ceph_put_cap(mdsc
, cap
);
1929 if (le32_to_cpu(head
->num
) == CEPH_CAPS_PER_RELEASE
) {
1930 // Append cap_barrier field
1931 cap_barrier
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
1932 *cap_barrier
= barrier
;
1933 msg
->front
.iov_len
+= sizeof(*cap_barrier
);
1935 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
1936 dout("send_cap_releases mds%d %p\n", session
->s_mds
, msg
);
1937 ceph_con_send(&session
->s_con
, msg
);
1942 BUG_ON(num_cap_releases
!= 0);
1944 spin_lock(&session
->s_cap_lock
);
1945 if (!list_empty(&session
->s_cap_releases
))
1947 spin_unlock(&session
->s_cap_lock
);
1950 // Append cap_barrier field
1951 cap_barrier
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
1952 *cap_barrier
= barrier
;
1953 msg
->front
.iov_len
+= sizeof(*cap_barrier
);
1955 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
1956 dout("send_cap_releases mds%d %p\n", session
->s_mds
, msg
);
1957 ceph_con_send(&session
->s_con
, msg
);
1961 pr_err("send_cap_releases mds%d, failed to allocate message\n",
1963 spin_lock(&session
->s_cap_lock
);
1964 list_splice(&tmp_list
, &session
->s_cap_releases
);
1965 session
->s_num_cap_releases
+= num_cap_releases
;
1966 spin_unlock(&session
->s_cap_lock
);
1969 static void ceph_cap_release_work(struct work_struct
*work
)
1971 struct ceph_mds_session
*session
=
1972 container_of(work
, struct ceph_mds_session
, s_cap_release_work
);
1974 mutex_lock(&session
->s_mutex
);
1975 if (session
->s_state
== CEPH_MDS_SESSION_OPEN
||
1976 session
->s_state
== CEPH_MDS_SESSION_HUNG
)
1977 ceph_send_cap_releases(session
->s_mdsc
, session
);
1978 mutex_unlock(&session
->s_mutex
);
1979 ceph_put_mds_session(session
);
1982 void ceph_flush_cap_releases(struct ceph_mds_client
*mdsc
,
1983 struct ceph_mds_session
*session
)
1988 ceph_get_mds_session(session
);
1989 if (queue_work(mdsc
->fsc
->cap_wq
,
1990 &session
->s_cap_release_work
)) {
1991 dout("cap release work queued\n");
1993 ceph_put_mds_session(session
);
1994 dout("failed to queue cap release work\n");
1999 * caller holds session->s_cap_lock
2001 void __ceph_queue_cap_release(struct ceph_mds_session
*session
,
2002 struct ceph_cap
*cap
)
2004 list_add_tail(&cap
->session_caps
, &session
->s_cap_releases
);
2005 session
->s_num_cap_releases
++;
2007 if (!(session
->s_num_cap_releases
% CEPH_CAPS_PER_RELEASE
))
2008 ceph_flush_cap_releases(session
->s_mdsc
, session
);
2011 static void ceph_cap_reclaim_work(struct work_struct
*work
)
2013 struct ceph_mds_client
*mdsc
=
2014 container_of(work
, struct ceph_mds_client
, cap_reclaim_work
);
2015 int ret
= ceph_trim_dentries(mdsc
);
2017 ceph_queue_cap_reclaim_work(mdsc
);
2020 void ceph_queue_cap_reclaim_work(struct ceph_mds_client
*mdsc
)
2025 if (queue_work(mdsc
->fsc
->cap_wq
, &mdsc
->cap_reclaim_work
)) {
2026 dout("caps reclaim work queued\n");
2028 dout("failed to queue caps release work\n");
2032 void ceph_reclaim_caps_nr(struct ceph_mds_client
*mdsc
, int nr
)
2037 val
= atomic_add_return(nr
, &mdsc
->cap_reclaim_pending
);
2038 if ((val
% CEPH_CAPS_PER_RELEASE
) < nr
) {
2039 atomic_set(&mdsc
->cap_reclaim_pending
, 0);
2040 ceph_queue_cap_reclaim_work(mdsc
);
2048 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request
*req
,
2051 struct ceph_inode_info
*ci
= ceph_inode(dir
);
2052 struct ceph_mds_reply_info_parsed
*rinfo
= &req
->r_reply_info
;
2053 struct ceph_mount_options
*opt
= req
->r_mdsc
->fsc
->mount_options
;
2054 size_t size
= sizeof(struct ceph_mds_reply_dir_entry
);
2055 unsigned int num_entries
;
2058 spin_lock(&ci
->i_ceph_lock
);
2059 num_entries
= ci
->i_files
+ ci
->i_subdirs
;
2060 spin_unlock(&ci
->i_ceph_lock
);
2061 num_entries
= max(num_entries
, 1U);
2062 num_entries
= min(num_entries
, opt
->max_readdir
);
2064 order
= get_order(size
* num_entries
);
2065 while (order
>= 0) {
2066 rinfo
->dir_entries
= (void*)__get_free_pages(GFP_KERNEL
|
2069 if (rinfo
->dir_entries
)
2073 if (!rinfo
->dir_entries
)
2076 num_entries
= (PAGE_SIZE
<< order
) / size
;
2077 num_entries
= min(num_entries
, opt
->max_readdir
);
2079 rinfo
->dir_buf_size
= PAGE_SIZE
<< order
;
2080 req
->r_num_caps
= num_entries
+ 1;
2081 req
->r_args
.readdir
.max_entries
= cpu_to_le32(num_entries
);
2082 req
->r_args
.readdir
.max_bytes
= cpu_to_le32(opt
->max_readdir_bytes
);
2087 * Create an mds request.
2089 struct ceph_mds_request
*
2090 ceph_mdsc_create_request(struct ceph_mds_client
*mdsc
, int op
, int mode
)
2092 struct ceph_mds_request
*req
= kzalloc(sizeof(*req
), GFP_NOFS
);
2095 return ERR_PTR(-ENOMEM
);
2097 mutex_init(&req
->r_fill_mutex
);
2099 req
->r_started
= jiffies
;
2100 req
->r_resend_mds
= -1;
2101 INIT_LIST_HEAD(&req
->r_unsafe_dir_item
);
2102 INIT_LIST_HEAD(&req
->r_unsafe_target_item
);
2104 kref_init(&req
->r_kref
);
2105 RB_CLEAR_NODE(&req
->r_node
);
2106 INIT_LIST_HEAD(&req
->r_wait
);
2107 init_completion(&req
->r_completion
);
2108 init_completion(&req
->r_safe_completion
);
2109 INIT_LIST_HEAD(&req
->r_unsafe_item
);
2111 ktime_get_coarse_real_ts64(&req
->r_stamp
);
2114 req
->r_direct_mode
= mode
;
2119 * return oldest (lowest) request, tid in request tree, 0 if none.
2121 * called under mdsc->mutex.
2123 static struct ceph_mds_request
*__get_oldest_req(struct ceph_mds_client
*mdsc
)
2125 if (RB_EMPTY_ROOT(&mdsc
->request_tree
))
2127 return rb_entry(rb_first(&mdsc
->request_tree
),
2128 struct ceph_mds_request
, r_node
);
2131 static inline u64
__get_oldest_tid(struct ceph_mds_client
*mdsc
)
2133 return mdsc
->oldest_tid
;
2137 * Build a dentry's path. Allocate on heap; caller must kfree. Based
2138 * on build_path_from_dentry in fs/cifs/dir.c.
2140 * If @stop_on_nosnap, generate path relative to the first non-snapped
2143 * Encode hidden .snap dirs as a double /, i.e.
2144 * foo/.snap/bar -> foo//bar
2146 char *ceph_mdsc_build_path(struct dentry
*dentry
, int *plen
, u64
*pbase
,
2149 struct dentry
*temp
;
2156 return ERR_PTR(-EINVAL
);
2160 return ERR_PTR(-ENOMEM
);
2165 seq
= read_seqbegin(&rename_lock
);
2169 struct inode
*inode
;
2171 spin_lock(&temp
->d_lock
);
2172 inode
= d_inode(temp
);
2173 if (inode
&& ceph_snap(inode
) == CEPH_SNAPDIR
) {
2174 dout("build_path path+%d: %p SNAPDIR\n",
2176 } else if (stop_on_nosnap
&& inode
&& dentry
!= temp
&&
2177 ceph_snap(inode
) == CEPH_NOSNAP
) {
2178 spin_unlock(&temp
->d_lock
);
2179 pos
++; /* get rid of any prepended '/' */
2182 pos
-= temp
->d_name
.len
;
2184 spin_unlock(&temp
->d_lock
);
2187 memcpy(path
+ pos
, temp
->d_name
.name
, temp
->d_name
.len
);
2189 spin_unlock(&temp
->d_lock
);
2190 temp
= READ_ONCE(temp
->d_parent
);
2192 /* Are we at the root? */
2196 /* Are we out of buffer? */
2202 base
= ceph_ino(d_inode(temp
));
2205 if (read_seqretry(&rename_lock
, seq
))
2210 * A rename didn't occur, but somehow we didn't end up where
2211 * we thought we would. Throw a warning and try again.
2213 pr_warn("build_path did not end path lookup where "
2214 "expected, pos is %d\n", pos
);
2219 *plen
= PATH_MAX
- 1 - pos
;
2220 dout("build_path on %p %d built %llx '%.*s'\n",
2221 dentry
, d_count(dentry
), base
, *plen
, path
+ pos
);
2225 static int build_dentry_path(struct dentry
*dentry
, struct inode
*dir
,
2226 const char **ppath
, int *ppathlen
, u64
*pino
,
2227 bool *pfreepath
, bool parent_locked
)
2233 dir
= d_inode_rcu(dentry
->d_parent
);
2234 if (dir
&& parent_locked
&& ceph_snap(dir
) == CEPH_NOSNAP
) {
2235 *pino
= ceph_ino(dir
);
2237 *ppath
= dentry
->d_name
.name
;
2238 *ppathlen
= dentry
->d_name
.len
;
2242 path
= ceph_mdsc_build_path(dentry
, ppathlen
, pino
, 1);
2244 return PTR_ERR(path
);
2250 static int build_inode_path(struct inode
*inode
,
2251 const char **ppath
, int *ppathlen
, u64
*pino
,
2254 struct dentry
*dentry
;
2257 if (ceph_snap(inode
) == CEPH_NOSNAP
) {
2258 *pino
= ceph_ino(inode
);
2262 dentry
= d_find_alias(inode
);
2263 path
= ceph_mdsc_build_path(dentry
, ppathlen
, pino
, 1);
2266 return PTR_ERR(path
);
2273 * request arguments may be specified via an inode *, a dentry *, or
2274 * an explicit ino+path.
2276 static int set_request_path_attr(struct inode
*rinode
, struct dentry
*rdentry
,
2277 struct inode
*rdiri
, const char *rpath
,
2278 u64 rino
, const char **ppath
, int *pathlen
,
2279 u64
*ino
, bool *freepath
, bool parent_locked
)
2284 r
= build_inode_path(rinode
, ppath
, pathlen
, ino
, freepath
);
2285 dout(" inode %p %llx.%llx\n", rinode
, ceph_ino(rinode
),
2287 } else if (rdentry
) {
2288 r
= build_dentry_path(rdentry
, rdiri
, ppath
, pathlen
, ino
,
2289 freepath
, parent_locked
);
2290 dout(" dentry %p %llx/%.*s\n", rdentry
, *ino
, *pathlen
,
2292 } else if (rpath
|| rino
) {
2295 *pathlen
= rpath
? strlen(rpath
) : 0;
2296 dout(" path %.*s\n", *pathlen
, rpath
);
2303 * called under mdsc->mutex
2305 static struct ceph_msg
*create_request_message(struct ceph_mds_client
*mdsc
,
2306 struct ceph_mds_request
*req
,
2307 int mds
, bool drop_cap_releases
)
2309 struct ceph_msg
*msg
;
2310 struct ceph_mds_request_head
*head
;
2311 const char *path1
= NULL
;
2312 const char *path2
= NULL
;
2313 u64 ino1
= 0, ino2
= 0;
2314 int pathlen1
= 0, pathlen2
= 0;
2315 bool freepath1
= false, freepath2
= false;
2321 ret
= set_request_path_attr(req
->r_inode
, req
->r_dentry
,
2322 req
->r_parent
, req
->r_path1
, req
->r_ino1
.ino
,
2323 &path1
, &pathlen1
, &ino1
, &freepath1
,
2324 test_bit(CEPH_MDS_R_PARENT_LOCKED
,
2325 &req
->r_req_flags
));
2331 /* If r_old_dentry is set, then assume that its parent is locked */
2332 ret
= set_request_path_attr(NULL
, req
->r_old_dentry
,
2333 req
->r_old_dentry_dir
,
2334 req
->r_path2
, req
->r_ino2
.ino
,
2335 &path2
, &pathlen2
, &ino2
, &freepath2
, true);
2341 len
= sizeof(*head
) +
2342 pathlen1
+ pathlen2
+ 2*(1 + sizeof(u32
) + sizeof(u64
)) +
2343 sizeof(struct ceph_timespec
);
2345 /* calculate (max) length for cap releases */
2346 len
+= sizeof(struct ceph_mds_request_release
) *
2347 (!!req
->r_inode_drop
+ !!req
->r_dentry_drop
+
2348 !!req
->r_old_inode_drop
+ !!req
->r_old_dentry_drop
);
2349 if (req
->r_dentry_drop
)
2351 if (req
->r_old_dentry_drop
)
2354 msg
= ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST
, len
, 1, GFP_NOFS
, false);
2356 msg
= ERR_PTR(-ENOMEM
);
2360 msg
->hdr
.version
= cpu_to_le16(2);
2361 msg
->hdr
.tid
= cpu_to_le64(req
->r_tid
);
2363 head
= msg
->front
.iov_base
;
2364 p
= msg
->front
.iov_base
+ sizeof(*head
);
2365 end
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
2367 head
->mdsmap_epoch
= cpu_to_le32(mdsc
->mdsmap
->m_epoch
);
2368 head
->op
= cpu_to_le32(req
->r_op
);
2369 head
->caller_uid
= cpu_to_le32(from_kuid(&init_user_ns
, req
->r_uid
));
2370 head
->caller_gid
= cpu_to_le32(from_kgid(&init_user_ns
, req
->r_gid
));
2372 head
->args
= req
->r_args
;
2374 ceph_encode_filepath(&p
, end
, ino1
, path1
);
2375 ceph_encode_filepath(&p
, end
, ino2
, path2
);
2377 /* make note of release offset, in case we need to replay */
2378 req
->r_request_release_offset
= p
- msg
->front
.iov_base
;
2382 if (req
->r_inode_drop
)
2383 releases
+= ceph_encode_inode_release(&p
,
2384 req
->r_inode
? req
->r_inode
: d_inode(req
->r_dentry
),
2385 mds
, req
->r_inode_drop
, req
->r_inode_unless
, 0);
2386 if (req
->r_dentry_drop
)
2387 releases
+= ceph_encode_dentry_release(&p
, req
->r_dentry
,
2388 req
->r_parent
, mds
, req
->r_dentry_drop
,
2389 req
->r_dentry_unless
);
2390 if (req
->r_old_dentry_drop
)
2391 releases
+= ceph_encode_dentry_release(&p
, req
->r_old_dentry
,
2392 req
->r_old_dentry_dir
, mds
,
2393 req
->r_old_dentry_drop
,
2394 req
->r_old_dentry_unless
);
2395 if (req
->r_old_inode_drop
)
2396 releases
+= ceph_encode_inode_release(&p
,
2397 d_inode(req
->r_old_dentry
),
2398 mds
, req
->r_old_inode_drop
, req
->r_old_inode_unless
, 0);
2400 if (drop_cap_releases
) {
2402 p
= msg
->front
.iov_base
+ req
->r_request_release_offset
;
2405 head
->num_releases
= cpu_to_le16(releases
);
2409 struct ceph_timespec ts
;
2410 ceph_encode_timespec64(&ts
, &req
->r_stamp
);
2411 ceph_encode_copy(&p
, &ts
, sizeof(ts
));
2415 msg
->front
.iov_len
= p
- msg
->front
.iov_base
;
2416 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
2418 if (req
->r_pagelist
) {
2419 struct ceph_pagelist
*pagelist
= req
->r_pagelist
;
2420 ceph_msg_data_add_pagelist(msg
, pagelist
);
2421 msg
->hdr
.data_len
= cpu_to_le32(pagelist
->length
);
2423 msg
->hdr
.data_len
= 0;
2426 msg
->hdr
.data_off
= cpu_to_le16(0);
2430 ceph_mdsc_free_path((char *)path2
, pathlen2
);
2433 ceph_mdsc_free_path((char *)path1
, pathlen1
);
2439 * called under mdsc->mutex if error, under no mutex if
2442 static void complete_request(struct ceph_mds_client
*mdsc
,
2443 struct ceph_mds_request
*req
)
2445 if (req
->r_callback
)
2446 req
->r_callback(mdsc
, req
);
2447 complete_all(&req
->r_completion
);
2451 * called under mdsc->mutex
2453 static int __prepare_send_request(struct ceph_mds_client
*mdsc
,
2454 struct ceph_mds_request
*req
,
2455 int mds
, bool drop_cap_releases
)
2457 struct ceph_mds_request_head
*rhead
;
2458 struct ceph_msg
*msg
;
2463 struct ceph_cap
*cap
=
2464 ceph_get_cap_for_mds(ceph_inode(req
->r_inode
), mds
);
2467 req
->r_sent_on_mseq
= cap
->mseq
;
2469 req
->r_sent_on_mseq
= -1;
2471 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req
,
2472 req
->r_tid
, ceph_mds_op_name(req
->r_op
), req
->r_attempts
);
2474 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
2477 * Replay. Do not regenerate message (and rebuild
2478 * paths, etc.); just use the original message.
2479 * Rebuilding paths will break for renames because
2480 * d_move mangles the src name.
2482 msg
= req
->r_request
;
2483 rhead
= msg
->front
.iov_base
;
2485 flags
= le32_to_cpu(rhead
->flags
);
2486 flags
|= CEPH_MDS_FLAG_REPLAY
;
2487 rhead
->flags
= cpu_to_le32(flags
);
2489 if (req
->r_target_inode
)
2490 rhead
->ino
= cpu_to_le64(ceph_ino(req
->r_target_inode
));
2492 rhead
->num_retry
= req
->r_attempts
- 1;
2494 /* remove cap/dentry releases from message */
2495 rhead
->num_releases
= 0;
2498 p
= msg
->front
.iov_base
+ req
->r_request_release_offset
;
2500 struct ceph_timespec ts
;
2501 ceph_encode_timespec64(&ts
, &req
->r_stamp
);
2502 ceph_encode_copy(&p
, &ts
, sizeof(ts
));
2505 msg
->front
.iov_len
= p
- msg
->front
.iov_base
;
2506 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
2510 if (req
->r_request
) {
2511 ceph_msg_put(req
->r_request
);
2512 req
->r_request
= NULL
;
2514 msg
= create_request_message(mdsc
, req
, mds
, drop_cap_releases
);
2516 req
->r_err
= PTR_ERR(msg
);
2517 return PTR_ERR(msg
);
2519 req
->r_request
= msg
;
2521 rhead
= msg
->front
.iov_base
;
2522 rhead
->oldest_client_tid
= cpu_to_le64(__get_oldest_tid(mdsc
));
2523 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
))
2524 flags
|= CEPH_MDS_FLAG_REPLAY
;
2526 flags
|= CEPH_MDS_FLAG_WANT_DENTRY
;
2527 rhead
->flags
= cpu_to_le32(flags
);
2528 rhead
->num_fwd
= req
->r_num_fwd
;
2529 rhead
->num_retry
= req
->r_attempts
- 1;
2532 dout(" r_parent = %p\n", req
->r_parent
);
2537 * called under mdsc->mutex
2539 static int __send_request(struct ceph_mds_client
*mdsc
,
2540 struct ceph_mds_session
*session
,
2541 struct ceph_mds_request
*req
,
2542 bool drop_cap_releases
)
2546 err
= __prepare_send_request(mdsc
, req
, session
->s_mds
,
2549 ceph_msg_get(req
->r_request
);
2550 ceph_con_send(&session
->s_con
, req
->r_request
);
2557 * send request, or put it on the appropriate wait list.
2559 static void __do_request(struct ceph_mds_client
*mdsc
,
2560 struct ceph_mds_request
*req
)
2562 struct ceph_mds_session
*session
= NULL
;
2567 if (req
->r_err
|| test_bit(CEPH_MDS_R_GOT_RESULT
, &req
->r_req_flags
)) {
2568 if (test_bit(CEPH_MDS_R_ABORTED
, &req
->r_req_flags
))
2569 __unregister_request(mdsc
, req
);
2573 if (req
->r_timeout
&&
2574 time_after_eq(jiffies
, req
->r_started
+ req
->r_timeout
)) {
2575 dout("do_request timed out\n");
2579 if (READ_ONCE(mdsc
->fsc
->mount_state
) == CEPH_MOUNT_SHUTDOWN
) {
2580 dout("do_request forced umount\n");
2584 if (READ_ONCE(mdsc
->fsc
->mount_state
) == CEPH_MOUNT_MOUNTING
) {
2585 if (mdsc
->mdsmap_err
) {
2586 err
= mdsc
->mdsmap_err
;
2587 dout("do_request mdsmap err %d\n", err
);
2590 if (mdsc
->mdsmap
->m_epoch
== 0) {
2591 dout("do_request no mdsmap, waiting for map\n");
2592 list_add(&req
->r_wait
, &mdsc
->waiting_for_map
);
2595 if (!(mdsc
->fsc
->mount_options
->flags
&
2596 CEPH_MOUNT_OPT_MOUNTWAIT
) &&
2597 !ceph_mdsmap_is_cluster_available(mdsc
->mdsmap
)) {
2598 err
= -EHOSTUNREACH
;
2603 put_request_session(req
);
2605 mds
= __choose_mds(mdsc
, req
, &random
);
2607 ceph_mdsmap_get_state(mdsc
->mdsmap
, mds
) < CEPH_MDS_STATE_ACTIVE
) {
2608 dout("do_request no mds or not active, waiting for map\n");
2609 list_add(&req
->r_wait
, &mdsc
->waiting_for_map
);
2613 /* get, open session */
2614 session
= __ceph_lookup_mds_session(mdsc
, mds
);
2616 session
= register_session(mdsc
, mds
);
2617 if (IS_ERR(session
)) {
2618 err
= PTR_ERR(session
);
2622 req
->r_session
= ceph_get_mds_session(session
);
2624 dout("do_request mds%d session %p state %s\n", mds
, session
,
2625 ceph_session_state_name(session
->s_state
));
2626 if (session
->s_state
!= CEPH_MDS_SESSION_OPEN
&&
2627 session
->s_state
!= CEPH_MDS_SESSION_HUNG
) {
2628 if (session
->s_state
== CEPH_MDS_SESSION_REJECTED
) {
2632 if (session
->s_state
== CEPH_MDS_SESSION_NEW
||
2633 session
->s_state
== CEPH_MDS_SESSION_CLOSING
) {
2634 __open_session(mdsc
, session
);
2635 /* retry the same mds later */
2637 req
->r_resend_mds
= mds
;
2639 list_add(&req
->r_wait
, &session
->s_waiting
);
2644 req
->r_resend_mds
= -1; /* forget any previous mds hint */
2646 if (req
->r_request_started
== 0) /* note request start time */
2647 req
->r_request_started
= jiffies
;
2649 err
= __send_request(mdsc
, session
, req
, false);
2652 ceph_put_mds_session(session
);
2655 dout("__do_request early error %d\n", err
);
2657 complete_request(mdsc
, req
);
2658 __unregister_request(mdsc
, req
);
2664 * called under mdsc->mutex
2666 static void __wake_requests(struct ceph_mds_client
*mdsc
,
2667 struct list_head
*head
)
2669 struct ceph_mds_request
*req
;
2670 LIST_HEAD(tmp_list
);
2672 list_splice_init(head
, &tmp_list
);
2674 while (!list_empty(&tmp_list
)) {
2675 req
= list_entry(tmp_list
.next
,
2676 struct ceph_mds_request
, r_wait
);
2677 list_del_init(&req
->r_wait
);
2678 dout(" wake request %p tid %llu\n", req
, req
->r_tid
);
2679 __do_request(mdsc
, req
);
2684 * Wake up threads with requests pending for @mds, so that they can
2685 * resubmit their requests to a possibly different mds.
2687 static void kick_requests(struct ceph_mds_client
*mdsc
, int mds
)
2689 struct ceph_mds_request
*req
;
2690 struct rb_node
*p
= rb_first(&mdsc
->request_tree
);
2692 dout("kick_requests mds%d\n", mds
);
2694 req
= rb_entry(p
, struct ceph_mds_request
, r_node
);
2696 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
))
2698 if (req
->r_attempts
> 0)
2699 continue; /* only new requests */
2700 if (req
->r_session
&&
2701 req
->r_session
->s_mds
== mds
) {
2702 dout(" kicking tid %llu\n", req
->r_tid
);
2703 list_del_init(&req
->r_wait
);
2704 __do_request(mdsc
, req
);
2709 int ceph_mdsc_submit_request(struct ceph_mds_client
*mdsc
, struct inode
*dir
,
2710 struct ceph_mds_request
*req
)
2714 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2716 ceph_get_cap_refs(ceph_inode(req
->r_inode
), CEPH_CAP_PIN
);
2717 if (req
->r_parent
) {
2718 ceph_get_cap_refs(ceph_inode(req
->r_parent
), CEPH_CAP_PIN
);
2719 ihold(req
->r_parent
);
2721 if (req
->r_old_dentry_dir
)
2722 ceph_get_cap_refs(ceph_inode(req
->r_old_dentry_dir
),
2725 dout("submit_request on %p for inode %p\n", req
, dir
);
2726 mutex_lock(&mdsc
->mutex
);
2727 __register_request(mdsc
, req
, dir
);
2728 __do_request(mdsc
, req
);
2730 mutex_unlock(&mdsc
->mutex
);
2734 static int ceph_mdsc_wait_request(struct ceph_mds_client
*mdsc
,
2735 struct ceph_mds_request
*req
)
2740 dout("do_request waiting\n");
2741 if (!req
->r_timeout
&& req
->r_wait_for_completion
) {
2742 err
= req
->r_wait_for_completion(mdsc
, req
);
2744 long timeleft
= wait_for_completion_killable_timeout(
2746 ceph_timeout_jiffies(req
->r_timeout
));
2750 err
= -EIO
; /* timed out */
2752 err
= timeleft
; /* killed */
2754 dout("do_request waited, got %d\n", err
);
2755 mutex_lock(&mdsc
->mutex
);
2757 /* only abort if we didn't race with a real reply */
2758 if (test_bit(CEPH_MDS_R_GOT_RESULT
, &req
->r_req_flags
)) {
2759 err
= le32_to_cpu(req
->r_reply_info
.head
->result
);
2760 } else if (err
< 0) {
2761 dout("aborted request %lld with %d\n", req
->r_tid
, err
);
2764 * ensure we aren't running concurrently with
2765 * ceph_fill_trace or ceph_readdir_prepopulate, which
2766 * rely on locks (dir mutex) held by our caller.
2768 mutex_lock(&req
->r_fill_mutex
);
2770 set_bit(CEPH_MDS_R_ABORTED
, &req
->r_req_flags
);
2771 mutex_unlock(&req
->r_fill_mutex
);
2773 if (req
->r_parent
&&
2774 (req
->r_op
& CEPH_MDS_OP_WRITE
))
2775 ceph_invalidate_dir_request(req
);
2780 mutex_unlock(&mdsc
->mutex
);
2785 * Synchrously perform an mds request. Take care of all of the
2786 * session setup, forwarding, retry details.
2788 int ceph_mdsc_do_request(struct ceph_mds_client
*mdsc
,
2790 struct ceph_mds_request
*req
)
2794 dout("do_request on %p\n", req
);
2797 err
= ceph_mdsc_submit_request(mdsc
, dir
, req
);
2799 err
= ceph_mdsc_wait_request(mdsc
, req
);
2800 dout("do_request %p done, result %d\n", req
, err
);
2805 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2806 * namespace request.
2808 void ceph_invalidate_dir_request(struct ceph_mds_request
*req
)
2810 struct inode
*dir
= req
->r_parent
;
2811 struct inode
*old_dir
= req
->r_old_dentry_dir
;
2813 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir
, old_dir
);
2815 ceph_dir_clear_complete(dir
);
2817 ceph_dir_clear_complete(old_dir
);
2819 ceph_invalidate_dentry_lease(req
->r_dentry
);
2820 if (req
->r_old_dentry
)
2821 ceph_invalidate_dentry_lease(req
->r_old_dentry
);
2827 * We take the session mutex and parse and process the reply immediately.
2828 * This preserves the logical ordering of replies, capabilities, etc., sent
2829 * by the MDS as they are applied to our local cache.
2831 static void handle_reply(struct ceph_mds_session
*session
, struct ceph_msg
*msg
)
2833 struct ceph_mds_client
*mdsc
= session
->s_mdsc
;
2834 struct ceph_mds_request
*req
;
2835 struct ceph_mds_reply_head
*head
= msg
->front
.iov_base
;
2836 struct ceph_mds_reply_info_parsed
*rinfo
; /* parsed reply info */
2837 struct ceph_snap_realm
*realm
;
2840 int mds
= session
->s_mds
;
2842 if (msg
->front
.iov_len
< sizeof(*head
)) {
2843 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2848 /* get request, session */
2849 tid
= le64_to_cpu(msg
->hdr
.tid
);
2850 mutex_lock(&mdsc
->mutex
);
2851 req
= lookup_get_request(mdsc
, tid
);
2853 dout("handle_reply on unknown tid %llu\n", tid
);
2854 mutex_unlock(&mdsc
->mutex
);
2857 dout("handle_reply %p\n", req
);
2859 /* correct session? */
2860 if (req
->r_session
!= session
) {
2861 pr_err("mdsc_handle_reply got %llu on session mds%d"
2862 " not mds%d\n", tid
, session
->s_mds
,
2863 req
->r_session
? req
->r_session
->s_mds
: -1);
2864 mutex_unlock(&mdsc
->mutex
);
2869 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
) && !head
->safe
) ||
2870 (test_bit(CEPH_MDS_R_GOT_SAFE
, &req
->r_req_flags
) && head
->safe
)) {
2871 pr_warn("got a dup %s reply on %llu from mds%d\n",
2872 head
->safe
? "safe" : "unsafe", tid
, mds
);
2873 mutex_unlock(&mdsc
->mutex
);
2876 if (test_bit(CEPH_MDS_R_GOT_SAFE
, &req
->r_req_flags
)) {
2877 pr_warn("got unsafe after safe on %llu from mds%d\n",
2879 mutex_unlock(&mdsc
->mutex
);
2883 result
= le32_to_cpu(head
->result
);
2887 * if we're not talking to the authority, send to them
2888 * if the authority has changed while we weren't looking,
2889 * send to new authority
2890 * Otherwise we just have to return an ESTALE
2892 if (result
== -ESTALE
) {
2893 dout("got ESTALE on request %llu\n", req
->r_tid
);
2894 req
->r_resend_mds
= -1;
2895 if (req
->r_direct_mode
!= USE_AUTH_MDS
) {
2896 dout("not using auth, setting for that now\n");
2897 req
->r_direct_mode
= USE_AUTH_MDS
;
2898 __do_request(mdsc
, req
);
2899 mutex_unlock(&mdsc
->mutex
);
2902 int mds
= __choose_mds(mdsc
, req
, NULL
);
2903 if (mds
>= 0 && mds
!= req
->r_session
->s_mds
) {
2904 dout("but auth changed, so resending\n");
2905 __do_request(mdsc
, req
);
2906 mutex_unlock(&mdsc
->mutex
);
2910 dout("have to return ESTALE on request %llu\n", req
->r_tid
);
2915 set_bit(CEPH_MDS_R_GOT_SAFE
, &req
->r_req_flags
);
2916 __unregister_request(mdsc
, req
);
2918 /* last request during umount? */
2919 if (mdsc
->stopping
&& !__get_oldest_req(mdsc
))
2920 complete_all(&mdsc
->safe_umount_waiters
);
2922 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
2924 * We already handled the unsafe response, now do the
2925 * cleanup. No need to examine the response; the MDS
2926 * doesn't include any result info in the safe
2927 * response. And even if it did, there is nothing
2928 * useful we could do with a revised return value.
2930 dout("got safe reply %llu, mds%d\n", tid
, mds
);
2932 mutex_unlock(&mdsc
->mutex
);
2936 set_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
);
2937 list_add_tail(&req
->r_unsafe_item
, &req
->r_session
->s_unsafe
);
2938 if (req
->r_unsafe_dir
) {
2939 struct ceph_inode_info
*ci
=
2940 ceph_inode(req
->r_unsafe_dir
);
2941 spin_lock(&ci
->i_unsafe_lock
);
2942 list_add_tail(&req
->r_unsafe_dir_item
,
2943 &ci
->i_unsafe_dirops
);
2944 spin_unlock(&ci
->i_unsafe_lock
);
2948 dout("handle_reply tid %lld result %d\n", tid
, result
);
2949 rinfo
= &req
->r_reply_info
;
2950 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING
, &session
->s_features
))
2951 err
= parse_reply_info(msg
, rinfo
, (u64
)-1);
2953 err
= parse_reply_info(msg
, rinfo
, session
->s_con
.peer_features
);
2954 mutex_unlock(&mdsc
->mutex
);
2956 mutex_lock(&session
->s_mutex
);
2958 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds
, tid
);
2965 if (rinfo
->snapblob_len
) {
2966 down_write(&mdsc
->snap_rwsem
);
2967 ceph_update_snap_trace(mdsc
, rinfo
->snapblob
,
2968 rinfo
->snapblob
+ rinfo
->snapblob_len
,
2969 le32_to_cpu(head
->op
) == CEPH_MDS_OP_RMSNAP
,
2971 downgrade_write(&mdsc
->snap_rwsem
);
2973 down_read(&mdsc
->snap_rwsem
);
2976 /* insert trace into our cache */
2977 mutex_lock(&req
->r_fill_mutex
);
2978 current
->journal_info
= req
;
2979 err
= ceph_fill_trace(mdsc
->fsc
->sb
, req
);
2981 if (result
== 0 && (req
->r_op
== CEPH_MDS_OP_READDIR
||
2982 req
->r_op
== CEPH_MDS_OP_LSSNAP
))
2983 ceph_readdir_prepopulate(req
, req
->r_session
);
2985 current
->journal_info
= NULL
;
2986 mutex_unlock(&req
->r_fill_mutex
);
2988 up_read(&mdsc
->snap_rwsem
);
2990 ceph_put_snap_realm(mdsc
, realm
);
2993 if (req
->r_target_inode
&&
2994 test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
2995 struct ceph_inode_info
*ci
=
2996 ceph_inode(req
->r_target_inode
);
2997 spin_lock(&ci
->i_unsafe_lock
);
2998 list_add_tail(&req
->r_unsafe_target_item
,
2999 &ci
->i_unsafe_iops
);
3000 spin_unlock(&ci
->i_unsafe_lock
);
3003 ceph_unreserve_caps(mdsc
, &req
->r_caps_reservation
);
3006 mutex_lock(&mdsc
->mutex
);
3007 if (!test_bit(CEPH_MDS_R_ABORTED
, &req
->r_req_flags
)) {
3011 req
->r_reply
= ceph_msg_get(msg
);
3012 set_bit(CEPH_MDS_R_GOT_RESULT
, &req
->r_req_flags
);
3015 dout("reply arrived after request %lld was aborted\n", tid
);
3017 mutex_unlock(&mdsc
->mutex
);
3019 mutex_unlock(&session
->s_mutex
);
3021 /* kick calling process */
3022 complete_request(mdsc
, req
);
3024 ceph_mdsc_put_request(req
);
3031 * handle mds notification that our request has been forwarded.
3033 static void handle_forward(struct ceph_mds_client
*mdsc
,
3034 struct ceph_mds_session
*session
,
3035 struct ceph_msg
*msg
)
3037 struct ceph_mds_request
*req
;
3038 u64 tid
= le64_to_cpu(msg
->hdr
.tid
);
3042 void *p
= msg
->front
.iov_base
;
3043 void *end
= p
+ msg
->front
.iov_len
;
3045 ceph_decode_need(&p
, end
, 2*sizeof(u32
), bad
);
3046 next_mds
= ceph_decode_32(&p
);
3047 fwd_seq
= ceph_decode_32(&p
);
3049 mutex_lock(&mdsc
->mutex
);
3050 req
= lookup_get_request(mdsc
, tid
);
3052 dout("forward tid %llu to mds%d - req dne\n", tid
, next_mds
);
3053 goto out
; /* dup reply? */
3056 if (test_bit(CEPH_MDS_R_ABORTED
, &req
->r_req_flags
)) {
3057 dout("forward tid %llu aborted, unregistering\n", tid
);
3058 __unregister_request(mdsc
, req
);
3059 } else if (fwd_seq
<= req
->r_num_fwd
) {
3060 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3061 tid
, next_mds
, req
->r_num_fwd
, fwd_seq
);
3063 /* resend. forward race not possible; mds would drop */
3064 dout("forward tid %llu to mds%d (we resend)\n", tid
, next_mds
);
3066 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT
, &req
->r_req_flags
));
3067 req
->r_attempts
= 0;
3068 req
->r_num_fwd
= fwd_seq
;
3069 req
->r_resend_mds
= next_mds
;
3070 put_request_session(req
);
3071 __do_request(mdsc
, req
);
3073 ceph_mdsc_put_request(req
);
3075 mutex_unlock(&mdsc
->mutex
);
3079 pr_err("mdsc_handle_forward decode error err=%d\n", err
);
3082 static int __decode_session_metadata(void **p
, void *end
,
3085 /* map<string,string> */
3088 ceph_decode_32_safe(p
, end
, n
, bad
);
3091 ceph_decode_32_safe(p
, end
, len
, bad
);
3092 ceph_decode_need(p
, end
, len
, bad
);
3093 err_str
= !strncmp(*p
, "error_string", len
);
3095 ceph_decode_32_safe(p
, end
, len
, bad
);
3096 ceph_decode_need(p
, end
, len
, bad
);
3097 if (err_str
&& strnstr(*p
, "blacklisted", len
))
3098 *blacklisted
= true;
3107 * handle a mds session control message
3109 static void handle_session(struct ceph_mds_session
*session
,
3110 struct ceph_msg
*msg
)
3112 struct ceph_mds_client
*mdsc
= session
->s_mdsc
;
3113 int mds
= session
->s_mds
;
3114 int msg_version
= le16_to_cpu(msg
->hdr
.version
);
3115 void *p
= msg
->front
.iov_base
;
3116 void *end
= p
+ msg
->front
.iov_len
;
3117 struct ceph_mds_session_head
*h
;
3120 unsigned long features
= 0;
3122 bool blacklisted
= false;
3125 ceph_decode_need(&p
, end
, sizeof(*h
), bad
);
3129 op
= le32_to_cpu(h
->op
);
3130 seq
= le64_to_cpu(h
->seq
);
3132 if (msg_version
>= 3) {
3134 /* version >= 2, metadata */
3135 if (__decode_session_metadata(&p
, end
, &blacklisted
) < 0)
3137 /* version >= 3, feature bits */
3138 ceph_decode_32_safe(&p
, end
, len
, bad
);
3139 ceph_decode_need(&p
, end
, len
, bad
);
3140 memcpy(&features
, p
, min_t(size_t, len
, sizeof(features
)));
3144 mutex_lock(&mdsc
->mutex
);
3145 if (op
== CEPH_SESSION_CLOSE
) {
3146 ceph_get_mds_session(session
);
3147 __unregister_session(mdsc
, session
);
3149 /* FIXME: this ttl calculation is generous */
3150 session
->s_ttl
= jiffies
+ HZ
*mdsc
->mdsmap
->m_session_autoclose
;
3151 mutex_unlock(&mdsc
->mutex
);
3153 mutex_lock(&session
->s_mutex
);
3155 dout("handle_session mds%d %s %p state %s seq %llu\n",
3156 mds
, ceph_session_op_name(op
), session
,
3157 ceph_session_state_name(session
->s_state
), seq
);
3159 if (session
->s_state
== CEPH_MDS_SESSION_HUNG
) {
3160 session
->s_state
= CEPH_MDS_SESSION_OPEN
;
3161 pr_info("mds%d came back\n", session
->s_mds
);
3165 case CEPH_SESSION_OPEN
:
3166 if (session
->s_state
== CEPH_MDS_SESSION_RECONNECTING
)
3167 pr_info("mds%d reconnect success\n", session
->s_mds
);
3168 session
->s_state
= CEPH_MDS_SESSION_OPEN
;
3169 session
->s_features
= features
;
3170 renewed_caps(mdsc
, session
, 0);
3173 __close_session(mdsc
, session
);
3176 case CEPH_SESSION_RENEWCAPS
:
3177 if (session
->s_renew_seq
== seq
)
3178 renewed_caps(mdsc
, session
, 1);
3181 case CEPH_SESSION_CLOSE
:
3182 if (session
->s_state
== CEPH_MDS_SESSION_RECONNECTING
)
3183 pr_info("mds%d reconnect denied\n", session
->s_mds
);
3184 session
->s_state
= CEPH_MDS_SESSION_CLOSED
;
3185 cleanup_session_requests(mdsc
, session
);
3186 remove_session_caps(session
);
3187 wake
= 2; /* for good measure */
3188 wake_up_all(&mdsc
->session_close_wq
);
3191 case CEPH_SESSION_STALE
:
3192 pr_info("mds%d caps went stale, renewing\n",
3194 spin_lock(&session
->s_gen_ttl_lock
);
3195 session
->s_cap_gen
++;
3196 session
->s_cap_ttl
= jiffies
- 1;
3197 spin_unlock(&session
->s_gen_ttl_lock
);
3198 send_renew_caps(mdsc
, session
);
3201 case CEPH_SESSION_RECALL_STATE
:
3202 ceph_trim_caps(mdsc
, session
, le32_to_cpu(h
->max_caps
));
3205 case CEPH_SESSION_FLUSHMSG
:
3206 send_flushmsg_ack(mdsc
, session
, seq
);
3209 case CEPH_SESSION_FORCE_RO
:
3210 dout("force_session_readonly %p\n", session
);
3211 spin_lock(&session
->s_cap_lock
);
3212 session
->s_readonly
= true;
3213 spin_unlock(&session
->s_cap_lock
);
3214 wake_up_session_caps(session
, FORCE_RO
);
3217 case CEPH_SESSION_REJECT
:
3218 WARN_ON(session
->s_state
!= CEPH_MDS_SESSION_OPENING
);
3219 pr_info("mds%d rejected session\n", session
->s_mds
);
3220 session
->s_state
= CEPH_MDS_SESSION_REJECTED
;
3221 cleanup_session_requests(mdsc
, session
);
3222 remove_session_caps(session
);
3224 mdsc
->fsc
->blacklisted
= true;
3225 wake
= 2; /* for good measure */
3229 pr_err("mdsc_handle_session bad op %d mds%d\n", op
, mds
);
3233 mutex_unlock(&session
->s_mutex
);
3235 mutex_lock(&mdsc
->mutex
);
3236 __wake_requests(mdsc
, &session
->s_waiting
);
3238 kick_requests(mdsc
, mds
);
3239 mutex_unlock(&mdsc
->mutex
);
3241 if (op
== CEPH_SESSION_CLOSE
)
3242 ceph_put_mds_session(session
);
3246 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds
,
3247 (int)msg
->front
.iov_len
);
3253 * called under session->mutex.
3255 static void replay_unsafe_requests(struct ceph_mds_client
*mdsc
,
3256 struct ceph_mds_session
*session
)
3258 struct ceph_mds_request
*req
, *nreq
;
3261 dout("replay_unsafe_requests mds%d\n", session
->s_mds
);
3263 mutex_lock(&mdsc
->mutex
);
3264 list_for_each_entry_safe(req
, nreq
, &session
->s_unsafe
, r_unsafe_item
)
3265 __send_request(mdsc
, session
, req
, true);
3268 * also re-send old requests when MDS enters reconnect stage. So that MDS
3269 * can process completed request in clientreplay stage.
3271 p
= rb_first(&mdsc
->request_tree
);
3273 req
= rb_entry(p
, struct ceph_mds_request
, r_node
);
3275 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
))
3277 if (req
->r_attempts
== 0)
3278 continue; /* only old requests */
3279 if (req
->r_session
&&
3280 req
->r_session
->s_mds
== session
->s_mds
)
3281 __send_request(mdsc
, session
, req
, true);
3283 mutex_unlock(&mdsc
->mutex
);
3286 static int send_reconnect_partial(struct ceph_reconnect_state
*recon_state
)
3288 struct ceph_msg
*reply
;
3289 struct ceph_pagelist
*_pagelist
;
3294 if (!recon_state
->allow_multi
)
3297 /* can't handle message that contains both caps and realm */
3298 BUG_ON(!recon_state
->nr_caps
== !recon_state
->nr_realms
);
3300 /* pre-allocate new pagelist */
3301 _pagelist
= ceph_pagelist_alloc(GFP_NOFS
);
3305 reply
= ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT
, 0, 1, GFP_NOFS
, false);
3309 /* placeholder for nr_caps */
3310 err
= ceph_pagelist_encode_32(_pagelist
, 0);
3314 if (recon_state
->nr_caps
) {
3315 /* currently encoding caps */
3316 err
= ceph_pagelist_encode_32(recon_state
->pagelist
, 0);
3320 /* placeholder for nr_realms (currently encoding relams) */
3321 err
= ceph_pagelist_encode_32(_pagelist
, 0);
3326 err
= ceph_pagelist_encode_8(recon_state
->pagelist
, 1);
3330 page
= list_first_entry(&recon_state
->pagelist
->head
, struct page
, lru
);
3331 addr
= kmap_atomic(page
);
3332 if (recon_state
->nr_caps
) {
3333 /* currently encoding caps */
3334 *addr
= cpu_to_le32(recon_state
->nr_caps
);
3336 /* currently encoding relams */
3337 *(addr
+ 1) = cpu_to_le32(recon_state
->nr_realms
);
3339 kunmap_atomic(addr
);
3341 reply
->hdr
.version
= cpu_to_le16(5);
3342 reply
->hdr
.compat_version
= cpu_to_le16(4);
3344 reply
->hdr
.data_len
= cpu_to_le32(recon_state
->pagelist
->length
);
3345 ceph_msg_data_add_pagelist(reply
, recon_state
->pagelist
);
3347 ceph_con_send(&recon_state
->session
->s_con
, reply
);
3348 ceph_pagelist_release(recon_state
->pagelist
);
3350 recon_state
->pagelist
= _pagelist
;
3351 recon_state
->nr_caps
= 0;
3352 recon_state
->nr_realms
= 0;
3353 recon_state
->msg_version
= 5;
3356 ceph_msg_put(reply
);
3358 ceph_pagelist_release(_pagelist
);
3363 * Encode information about a cap for a reconnect with the MDS.
3365 static int encode_caps_cb(struct inode
*inode
, struct ceph_cap
*cap
,
3369 struct ceph_mds_cap_reconnect v2
;
3370 struct ceph_mds_cap_reconnect_v1 v1
;
3372 struct ceph_inode_info
*ci
= cap
->ci
;
3373 struct ceph_reconnect_state
*recon_state
= arg
;
3374 struct ceph_pagelist
*pagelist
= recon_state
->pagelist
;
3378 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3379 inode
, ceph_vinop(inode
), cap
, cap
->cap_id
,
3380 ceph_cap_string(cap
->issued
));
3382 spin_lock(&ci
->i_ceph_lock
);
3383 cap
->seq
= 0; /* reset cap seq */
3384 cap
->issue_seq
= 0; /* and issue_seq */
3385 cap
->mseq
= 0; /* and migrate_seq */
3386 cap
->cap_gen
= cap
->session
->s_cap_gen
;
3388 if (recon_state
->msg_version
>= 2) {
3389 rec
.v2
.cap_id
= cpu_to_le64(cap
->cap_id
);
3390 rec
.v2
.wanted
= cpu_to_le32(__ceph_caps_wanted(ci
));
3391 rec
.v2
.issued
= cpu_to_le32(cap
->issued
);
3392 rec
.v2
.snaprealm
= cpu_to_le64(ci
->i_snap_realm
->ino
);
3393 rec
.v2
.pathbase
= 0;
3394 rec
.v2
.flock_len
= (__force __le32
)
3395 ((ci
->i_ceph_flags
& CEPH_I_ERROR_FILELOCK
) ? 0 : 1);
3397 rec
.v1
.cap_id
= cpu_to_le64(cap
->cap_id
);
3398 rec
.v1
.wanted
= cpu_to_le32(__ceph_caps_wanted(ci
));
3399 rec
.v1
.issued
= cpu_to_le32(cap
->issued
);
3400 rec
.v1
.size
= cpu_to_le64(inode
->i_size
);
3401 ceph_encode_timespec64(&rec
.v1
.mtime
, &inode
->i_mtime
);
3402 ceph_encode_timespec64(&rec
.v1
.atime
, &inode
->i_atime
);
3403 rec
.v1
.snaprealm
= cpu_to_le64(ci
->i_snap_realm
->ino
);
3404 rec
.v1
.pathbase
= 0;
3407 if (list_empty(&ci
->i_cap_snaps
)) {
3408 snap_follows
= ci
->i_head_snapc
? ci
->i_head_snapc
->seq
: 0;
3410 struct ceph_cap_snap
*capsnap
=
3411 list_first_entry(&ci
->i_cap_snaps
,
3412 struct ceph_cap_snap
, ci_item
);
3413 snap_follows
= capsnap
->follows
;
3415 spin_unlock(&ci
->i_ceph_lock
);
3417 if (recon_state
->msg_version
>= 2) {
3418 int num_fcntl_locks
, num_flock_locks
;
3419 struct ceph_filelock
*flocks
= NULL
;
3420 size_t struct_len
, total_len
= sizeof(u64
);
3424 if (rec
.v2
.flock_len
) {
3425 ceph_count_locks(inode
, &num_fcntl_locks
, &num_flock_locks
);
3427 num_fcntl_locks
= 0;
3428 num_flock_locks
= 0;
3430 if (num_fcntl_locks
+ num_flock_locks
> 0) {
3431 flocks
= kmalloc_array(num_fcntl_locks
+ num_flock_locks
,
3432 sizeof(struct ceph_filelock
),
3438 err
= ceph_encode_locks_to_buffer(inode
, flocks
,
3453 if (recon_state
->msg_version
>= 3) {
3454 /* version, compat_version and struct_len */
3455 total_len
+= 2 * sizeof(u8
) + sizeof(u32
);
3459 * number of encoded locks is stable, so copy to pagelist
3461 struct_len
= 2 * sizeof(u32
) +
3462 (num_fcntl_locks
+ num_flock_locks
) *
3463 sizeof(struct ceph_filelock
);
3464 rec
.v2
.flock_len
= cpu_to_le32(struct_len
);
3466 struct_len
+= sizeof(u32
) + sizeof(rec
.v2
);
3469 struct_len
+= sizeof(u64
); /* snap_follows */
3471 total_len
+= struct_len
;
3473 if (pagelist
->length
+ total_len
> RECONNECT_MAX_SIZE
) {
3474 err
= send_reconnect_partial(recon_state
);
3476 goto out_freeflocks
;
3477 pagelist
= recon_state
->pagelist
;
3480 err
= ceph_pagelist_reserve(pagelist
, total_len
);
3482 goto out_freeflocks
;
3484 ceph_pagelist_encode_64(pagelist
, ceph_ino(inode
));
3485 if (recon_state
->msg_version
>= 3) {
3486 ceph_pagelist_encode_8(pagelist
, struct_v
);
3487 ceph_pagelist_encode_8(pagelist
, 1);
3488 ceph_pagelist_encode_32(pagelist
, struct_len
);
3490 ceph_pagelist_encode_string(pagelist
, NULL
, 0);
3491 ceph_pagelist_append(pagelist
, &rec
, sizeof(rec
.v2
));
3492 ceph_locks_to_pagelist(flocks
, pagelist
,
3493 num_fcntl_locks
, num_flock_locks
);
3495 ceph_pagelist_encode_64(pagelist
, snap_follows
);
3502 struct dentry
*dentry
;
3504 dentry
= d_find_alias(inode
);
3506 path
= ceph_mdsc_build_path(dentry
,
3507 &pathlen
, &pathbase
, 0);
3510 err
= PTR_ERR(path
);
3513 rec
.v1
.pathbase
= cpu_to_le64(pathbase
);
3516 err
= ceph_pagelist_reserve(pagelist
,
3517 sizeof(u64
) + sizeof(u32
) +
3518 pathlen
+ sizeof(rec
.v1
));
3523 ceph_pagelist_encode_64(pagelist
, ceph_ino(inode
));
3524 ceph_pagelist_encode_string(pagelist
, path
, pathlen
);
3525 ceph_pagelist_append(pagelist
, &rec
, sizeof(rec
.v1
));
3527 ceph_mdsc_free_path(path
, pathlen
);
3532 recon_state
->nr_caps
++;
3536 static int encode_snap_realms(struct ceph_mds_client
*mdsc
,
3537 struct ceph_reconnect_state
*recon_state
)
3540 struct ceph_pagelist
*pagelist
= recon_state
->pagelist
;
3543 if (recon_state
->msg_version
>= 4) {
3544 err
= ceph_pagelist_encode_32(pagelist
, mdsc
->num_snap_realms
);
3550 * snaprealms. we provide mds with the ino, seq (version), and
3551 * parent for all of our realms. If the mds has any newer info,
3554 for (p
= rb_first(&mdsc
->snap_realms
); p
; p
= rb_next(p
)) {
3555 struct ceph_snap_realm
*realm
=
3556 rb_entry(p
, struct ceph_snap_realm
, node
);
3557 struct ceph_mds_snaprealm_reconnect sr_rec
;
3559 if (recon_state
->msg_version
>= 4) {
3560 size_t need
= sizeof(u8
) * 2 + sizeof(u32
) +
3563 if (pagelist
->length
+ need
> RECONNECT_MAX_SIZE
) {
3564 err
= send_reconnect_partial(recon_state
);
3567 pagelist
= recon_state
->pagelist
;
3570 err
= ceph_pagelist_reserve(pagelist
, need
);
3574 ceph_pagelist_encode_8(pagelist
, 1);
3575 ceph_pagelist_encode_8(pagelist
, 1);
3576 ceph_pagelist_encode_32(pagelist
, sizeof(sr_rec
));
3579 dout(" adding snap realm %llx seq %lld parent %llx\n",
3580 realm
->ino
, realm
->seq
, realm
->parent_ino
);
3581 sr_rec
.ino
= cpu_to_le64(realm
->ino
);
3582 sr_rec
.seq
= cpu_to_le64(realm
->seq
);
3583 sr_rec
.parent
= cpu_to_le64(realm
->parent_ino
);
3585 err
= ceph_pagelist_append(pagelist
, &sr_rec
, sizeof(sr_rec
));
3589 recon_state
->nr_realms
++;
3597 * If an MDS fails and recovers, clients need to reconnect in order to
3598 * reestablish shared state. This includes all caps issued through
3599 * this session _and_ the snap_realm hierarchy. Because it's not
3600 * clear which snap realms the mds cares about, we send everything we
3601 * know about.. that ensures we'll then get any new info the
3602 * recovering MDS might have.
3604 * This is a relatively heavyweight operation, but it's rare.
3606 * called with mdsc->mutex held.
3608 static void send_mds_reconnect(struct ceph_mds_client
*mdsc
,
3609 struct ceph_mds_session
*session
)
3611 struct ceph_msg
*reply
;
3612 int mds
= session
->s_mds
;
3614 struct ceph_reconnect_state recon_state
= {
3619 pr_info("mds%d reconnect start\n", mds
);
3621 recon_state
.pagelist
= ceph_pagelist_alloc(GFP_NOFS
);
3622 if (!recon_state
.pagelist
)
3623 goto fail_nopagelist
;
3625 reply
= ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT
, 0, 1, GFP_NOFS
, false);
3629 mutex_lock(&session
->s_mutex
);
3630 session
->s_state
= CEPH_MDS_SESSION_RECONNECTING
;
3633 dout("session %p state %s\n", session
,
3634 ceph_session_state_name(session
->s_state
));
3636 spin_lock(&session
->s_gen_ttl_lock
);
3637 session
->s_cap_gen
++;
3638 spin_unlock(&session
->s_gen_ttl_lock
);
3640 spin_lock(&session
->s_cap_lock
);
3641 /* don't know if session is readonly */
3642 session
->s_readonly
= 0;
3644 * notify __ceph_remove_cap() that we are composing cap reconnect.
3645 * If a cap get released before being added to the cap reconnect,
3646 * __ceph_remove_cap() should skip queuing cap release.
3648 session
->s_cap_reconnect
= 1;
3649 /* drop old cap expires; we're about to reestablish that state */
3650 detach_cap_releases(session
, &dispose
);
3651 spin_unlock(&session
->s_cap_lock
);
3652 dispose_cap_releases(mdsc
, &dispose
);
3654 /* trim unused caps to reduce MDS's cache rejoin time */
3655 if (mdsc
->fsc
->sb
->s_root
)
3656 shrink_dcache_parent(mdsc
->fsc
->sb
->s_root
);
3658 ceph_con_close(&session
->s_con
);
3659 ceph_con_open(&session
->s_con
,
3660 CEPH_ENTITY_TYPE_MDS
, mds
,
3661 ceph_mdsmap_get_addr(mdsc
->mdsmap
, mds
));
3663 /* replay unsafe requests */
3664 replay_unsafe_requests(mdsc
, session
);
3666 ceph_early_kick_flushing_caps(mdsc
, session
);
3668 down_read(&mdsc
->snap_rwsem
);
3670 /* placeholder for nr_caps */
3671 err
= ceph_pagelist_encode_32(recon_state
.pagelist
, 0);
3675 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT
, &session
->s_features
)) {
3676 recon_state
.msg_version
= 3;
3677 recon_state
.allow_multi
= true;
3678 } else if (session
->s_con
.peer_features
& CEPH_FEATURE_MDSENC
) {
3679 recon_state
.msg_version
= 3;
3681 recon_state
.msg_version
= 2;
3683 /* trsaverse this session's caps */
3684 err
= ceph_iterate_session_caps(session
, encode_caps_cb
, &recon_state
);
3686 spin_lock(&session
->s_cap_lock
);
3687 session
->s_cap_reconnect
= 0;
3688 spin_unlock(&session
->s_cap_lock
);
3693 /* check if all realms can be encoded into current message */
3694 if (mdsc
->num_snap_realms
) {
3696 recon_state
.pagelist
->length
+
3697 mdsc
->num_snap_realms
*
3698 sizeof(struct ceph_mds_snaprealm_reconnect
);
3699 if (recon_state
.msg_version
>= 4) {
3700 /* number of realms */
3701 total_len
+= sizeof(u32
);
3702 /* version, compat_version and struct_len */
3703 total_len
+= mdsc
->num_snap_realms
*
3704 (2 * sizeof(u8
) + sizeof(u32
));
3706 if (total_len
> RECONNECT_MAX_SIZE
) {
3707 if (!recon_state
.allow_multi
) {
3711 if (recon_state
.nr_caps
) {
3712 err
= send_reconnect_partial(&recon_state
);
3716 recon_state
.msg_version
= 5;
3720 err
= encode_snap_realms(mdsc
, &recon_state
);
3724 if (recon_state
.msg_version
>= 5) {
3725 err
= ceph_pagelist_encode_8(recon_state
.pagelist
, 0);
3730 if (recon_state
.nr_caps
|| recon_state
.nr_realms
) {
3732 list_first_entry(&recon_state
.pagelist
->head
,
3734 __le32
*addr
= kmap_atomic(page
);
3735 if (recon_state
.nr_caps
) {
3736 WARN_ON(recon_state
.nr_realms
!= mdsc
->num_snap_realms
);
3737 *addr
= cpu_to_le32(recon_state
.nr_caps
);
3738 } else if (recon_state
.msg_version
>= 4) {
3739 *(addr
+ 1) = cpu_to_le32(recon_state
.nr_realms
);
3741 kunmap_atomic(addr
);
3744 reply
->hdr
.version
= cpu_to_le16(recon_state
.msg_version
);
3745 if (recon_state
.msg_version
>= 4)
3746 reply
->hdr
.compat_version
= cpu_to_le16(4);
3748 reply
->hdr
.data_len
= cpu_to_le32(recon_state
.pagelist
->length
);
3749 ceph_msg_data_add_pagelist(reply
, recon_state
.pagelist
);
3751 ceph_con_send(&session
->s_con
, reply
);
3753 mutex_unlock(&session
->s_mutex
);
3755 mutex_lock(&mdsc
->mutex
);
3756 __wake_requests(mdsc
, &session
->s_waiting
);
3757 mutex_unlock(&mdsc
->mutex
);
3759 up_read(&mdsc
->snap_rwsem
);
3760 ceph_pagelist_release(recon_state
.pagelist
);
3764 ceph_msg_put(reply
);
3765 up_read(&mdsc
->snap_rwsem
);
3766 mutex_unlock(&session
->s_mutex
);
3768 ceph_pagelist_release(recon_state
.pagelist
);
3770 pr_err("error %d preparing reconnect for mds%d\n", err
, mds
);
3776 * compare old and new mdsmaps, kicking requests
3777 * and closing out old connections as necessary
3779 * called under mdsc->mutex.
3781 static void check_new_map(struct ceph_mds_client
*mdsc
,
3782 struct ceph_mdsmap
*newmap
,
3783 struct ceph_mdsmap
*oldmap
)
3786 int oldstate
, newstate
;
3787 struct ceph_mds_session
*s
;
3789 dout("check_new_map new %u old %u\n",
3790 newmap
->m_epoch
, oldmap
->m_epoch
);
3792 for (i
= 0; i
< oldmap
->possible_max_rank
&& i
< mdsc
->max_sessions
; i
++) {
3793 if (!mdsc
->sessions
[i
])
3795 s
= mdsc
->sessions
[i
];
3796 oldstate
= ceph_mdsmap_get_state(oldmap
, i
);
3797 newstate
= ceph_mdsmap_get_state(newmap
, i
);
3799 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3800 i
, ceph_mds_state_name(oldstate
),
3801 ceph_mdsmap_is_laggy(oldmap
, i
) ? " (laggy)" : "",
3802 ceph_mds_state_name(newstate
),
3803 ceph_mdsmap_is_laggy(newmap
, i
) ? " (laggy)" : "",
3804 ceph_session_state_name(s
->s_state
));
3806 if (i
>= newmap
->possible_max_rank
) {
3807 /* force close session for stopped mds */
3808 ceph_get_mds_session(s
);
3809 __unregister_session(mdsc
, s
);
3810 __wake_requests(mdsc
, &s
->s_waiting
);
3811 mutex_unlock(&mdsc
->mutex
);
3813 mutex_lock(&s
->s_mutex
);
3814 cleanup_session_requests(mdsc
, s
);
3815 remove_session_caps(s
);
3816 mutex_unlock(&s
->s_mutex
);
3818 ceph_put_mds_session(s
);
3820 mutex_lock(&mdsc
->mutex
);
3821 kick_requests(mdsc
, i
);
3825 if (memcmp(ceph_mdsmap_get_addr(oldmap
, i
),
3826 ceph_mdsmap_get_addr(newmap
, i
),
3827 sizeof(struct ceph_entity_addr
))) {
3829 mutex_unlock(&mdsc
->mutex
);
3830 mutex_lock(&s
->s_mutex
);
3831 mutex_lock(&mdsc
->mutex
);
3832 ceph_con_close(&s
->s_con
);
3833 mutex_unlock(&s
->s_mutex
);
3834 s
->s_state
= CEPH_MDS_SESSION_RESTARTING
;
3835 } else if (oldstate
== newstate
) {
3836 continue; /* nothing new with this mds */
3842 if (s
->s_state
== CEPH_MDS_SESSION_RESTARTING
&&
3843 newstate
>= CEPH_MDS_STATE_RECONNECT
) {
3844 mutex_unlock(&mdsc
->mutex
);
3845 send_mds_reconnect(mdsc
, s
);
3846 mutex_lock(&mdsc
->mutex
);
3850 * kick request on any mds that has gone active.
3852 if (oldstate
< CEPH_MDS_STATE_ACTIVE
&&
3853 newstate
>= CEPH_MDS_STATE_ACTIVE
) {
3854 if (oldstate
!= CEPH_MDS_STATE_CREATING
&&
3855 oldstate
!= CEPH_MDS_STATE_STARTING
)
3856 pr_info("mds%d recovery completed\n", s
->s_mds
);
3857 kick_requests(mdsc
, i
);
3858 ceph_kick_flushing_caps(mdsc
, s
);
3859 wake_up_session_caps(s
, RECONNECT
);
3863 for (i
= 0; i
< newmap
->possible_max_rank
&& i
< mdsc
->max_sessions
; i
++) {
3864 s
= mdsc
->sessions
[i
];
3867 if (!ceph_mdsmap_is_laggy(newmap
, i
))
3869 if (s
->s_state
== CEPH_MDS_SESSION_OPEN
||
3870 s
->s_state
== CEPH_MDS_SESSION_HUNG
||
3871 s
->s_state
== CEPH_MDS_SESSION_CLOSING
) {
3872 dout(" connecting to export targets of laggy mds%d\n",
3874 __open_export_target_sessions(mdsc
, s
);
3886 * caller must hold session s_mutex, dentry->d_lock
3888 void __ceph_mdsc_drop_dentry_lease(struct dentry
*dentry
)
3890 struct ceph_dentry_info
*di
= ceph_dentry(dentry
);
3892 ceph_put_mds_session(di
->lease_session
);
3893 di
->lease_session
= NULL
;
3896 static void handle_lease(struct ceph_mds_client
*mdsc
,
3897 struct ceph_mds_session
*session
,
3898 struct ceph_msg
*msg
)
3900 struct super_block
*sb
= mdsc
->fsc
->sb
;
3901 struct inode
*inode
;
3902 struct dentry
*parent
, *dentry
;
3903 struct ceph_dentry_info
*di
;
3904 int mds
= session
->s_mds
;
3905 struct ceph_mds_lease
*h
= msg
->front
.iov_base
;
3907 struct ceph_vino vino
;
3911 dout("handle_lease from mds%d\n", mds
);
3914 if (msg
->front
.iov_len
< sizeof(*h
) + sizeof(u32
))
3916 vino
.ino
= le64_to_cpu(h
->ino
);
3917 vino
.snap
= CEPH_NOSNAP
;
3918 seq
= le32_to_cpu(h
->seq
);
3919 dname
.len
= get_unaligned_le32(h
+ 1);
3920 if (msg
->front
.iov_len
< sizeof(*h
) + sizeof(u32
) + dname
.len
)
3922 dname
.name
= (void *)(h
+ 1) + sizeof(u32
);
3925 inode
= ceph_find_inode(sb
, vino
);
3926 dout("handle_lease %s, ino %llx %p %.*s\n",
3927 ceph_lease_op_name(h
->action
), vino
.ino
, inode
,
3928 dname
.len
, dname
.name
);
3930 mutex_lock(&session
->s_mutex
);
3934 dout("handle_lease no inode %llx\n", vino
.ino
);
3939 parent
= d_find_alias(inode
);
3941 dout("no parent dentry on inode %p\n", inode
);
3943 goto release
; /* hrm... */
3945 dname
.hash
= full_name_hash(parent
, dname
.name
, dname
.len
);
3946 dentry
= d_lookup(parent
, &dname
);
3951 spin_lock(&dentry
->d_lock
);
3952 di
= ceph_dentry(dentry
);
3953 switch (h
->action
) {
3954 case CEPH_MDS_LEASE_REVOKE
:
3955 if (di
->lease_session
== session
) {
3956 if (ceph_seq_cmp(di
->lease_seq
, seq
) > 0)
3957 h
->seq
= cpu_to_le32(di
->lease_seq
);
3958 __ceph_mdsc_drop_dentry_lease(dentry
);
3963 case CEPH_MDS_LEASE_RENEW
:
3964 if (di
->lease_session
== session
&&
3965 di
->lease_gen
== session
->s_cap_gen
&&
3966 di
->lease_renew_from
&&
3967 di
->lease_renew_after
== 0) {
3968 unsigned long duration
=
3969 msecs_to_jiffies(le32_to_cpu(h
->duration_ms
));
3971 di
->lease_seq
= seq
;
3972 di
->time
= di
->lease_renew_from
+ duration
;
3973 di
->lease_renew_after
= di
->lease_renew_from
+
3975 di
->lease_renew_from
= 0;
3979 spin_unlock(&dentry
->d_lock
);
3986 /* let's just reuse the same message */
3987 h
->action
= CEPH_MDS_LEASE_REVOKE_ACK
;
3989 ceph_con_send(&session
->s_con
, msg
);
3992 mutex_unlock(&session
->s_mutex
);
3993 /* avoid calling iput_final() in mds dispatch threads */
3994 ceph_async_iput(inode
);
3998 pr_err("corrupt lease message\n");
4002 void ceph_mdsc_lease_send_msg(struct ceph_mds_session
*session
,
4003 struct dentry
*dentry
, char action
,
4006 struct ceph_msg
*msg
;
4007 struct ceph_mds_lease
*lease
;
4009 int len
= sizeof(*lease
) + sizeof(u32
) + NAME_MAX
;
4011 dout("lease_send_msg identry %p %s to mds%d\n",
4012 dentry
, ceph_lease_op_name(action
), session
->s_mds
);
4014 msg
= ceph_msg_new(CEPH_MSG_CLIENT_LEASE
, len
, GFP_NOFS
, false);
4017 lease
= msg
->front
.iov_base
;
4018 lease
->action
= action
;
4019 lease
->seq
= cpu_to_le32(seq
);
4021 spin_lock(&dentry
->d_lock
);
4022 dir
= d_inode(dentry
->d_parent
);
4023 lease
->ino
= cpu_to_le64(ceph_ino(dir
));
4024 lease
->first
= lease
->last
= cpu_to_le64(ceph_snap(dir
));
4026 put_unaligned_le32(dentry
->d_name
.len
, lease
+ 1);
4027 memcpy((void *)(lease
+ 1) + 4,
4028 dentry
->d_name
.name
, dentry
->d_name
.len
);
4029 spin_unlock(&dentry
->d_lock
);
4031 * if this is a preemptive lease RELEASE, no need to
4032 * flush request stream, since the actual request will
4035 msg
->more_to_follow
= (action
== CEPH_MDS_LEASE_RELEASE
);
4037 ceph_con_send(&session
->s_con
, msg
);
4041 * lock unlock sessions, to wait ongoing session activities
4043 static void lock_unlock_sessions(struct ceph_mds_client
*mdsc
)
4047 mutex_lock(&mdsc
->mutex
);
4048 for (i
= 0; i
< mdsc
->max_sessions
; i
++) {
4049 struct ceph_mds_session
*s
= __ceph_lookup_mds_session(mdsc
, i
);
4052 mutex_unlock(&mdsc
->mutex
);
4053 mutex_lock(&s
->s_mutex
);
4054 mutex_unlock(&s
->s_mutex
);
4055 ceph_put_mds_session(s
);
4056 mutex_lock(&mdsc
->mutex
);
4058 mutex_unlock(&mdsc
->mutex
);
4061 static void maybe_recover_session(struct ceph_mds_client
*mdsc
)
4063 struct ceph_fs_client
*fsc
= mdsc
->fsc
;
4065 if (!ceph_test_mount_opt(fsc
, CLEANRECOVER
))
4068 if (READ_ONCE(fsc
->mount_state
) != CEPH_MOUNT_MOUNTED
)
4071 if (!READ_ONCE(fsc
->blacklisted
))
4074 if (fsc
->last_auto_reconnect
&&
4075 time_before(jiffies
, fsc
->last_auto_reconnect
+ HZ
* 60 * 30))
4078 pr_info("auto reconnect after blacklisted\n");
4079 fsc
->last_auto_reconnect
= jiffies
;
4080 ceph_force_reconnect(fsc
->sb
);
4084 * delayed work -- periodically trim expired leases, renew caps with mds
4086 static void schedule_delayed(struct ceph_mds_client
*mdsc
)
4089 unsigned hz
= round_jiffies_relative(HZ
* delay
);
4090 schedule_delayed_work(&mdsc
->delayed_work
, hz
);
4093 static void delayed_work(struct work_struct
*work
)
4096 struct ceph_mds_client
*mdsc
=
4097 container_of(work
, struct ceph_mds_client
, delayed_work
.work
);
4101 dout("mdsc delayed_work\n");
4103 mutex_lock(&mdsc
->mutex
);
4104 renew_interval
= mdsc
->mdsmap
->m_session_timeout
>> 2;
4105 renew_caps
= time_after_eq(jiffies
, HZ
*renew_interval
+
4106 mdsc
->last_renew_caps
);
4108 mdsc
->last_renew_caps
= jiffies
;
4110 for (i
= 0; i
< mdsc
->max_sessions
; i
++) {
4111 struct ceph_mds_session
*s
= __ceph_lookup_mds_session(mdsc
, i
);
4114 if (s
->s_state
== CEPH_MDS_SESSION_CLOSING
) {
4115 dout("resending session close request for mds%d\n",
4117 request_close_session(mdsc
, s
);
4118 ceph_put_mds_session(s
);
4121 if (s
->s_ttl
&& time_after(jiffies
, s
->s_ttl
)) {
4122 if (s
->s_state
== CEPH_MDS_SESSION_OPEN
) {
4123 s
->s_state
= CEPH_MDS_SESSION_HUNG
;
4124 pr_info("mds%d hung\n", s
->s_mds
);
4127 if (s
->s_state
== CEPH_MDS_SESSION_NEW
||
4128 s
->s_state
== CEPH_MDS_SESSION_RESTARTING
||
4129 s
->s_state
== CEPH_MDS_SESSION_REJECTED
) {
4130 /* this mds is failed or recovering, just wait */
4131 ceph_put_mds_session(s
);
4134 mutex_unlock(&mdsc
->mutex
);
4136 mutex_lock(&s
->s_mutex
);
4138 send_renew_caps(mdsc
, s
);
4140 ceph_con_keepalive(&s
->s_con
);
4141 if (s
->s_state
== CEPH_MDS_SESSION_OPEN
||
4142 s
->s_state
== CEPH_MDS_SESSION_HUNG
)
4143 ceph_send_cap_releases(mdsc
, s
);
4144 mutex_unlock(&s
->s_mutex
);
4145 ceph_put_mds_session(s
);
4147 mutex_lock(&mdsc
->mutex
);
4149 mutex_unlock(&mdsc
->mutex
);
4151 ceph_check_delayed_caps(mdsc
);
4153 ceph_queue_cap_reclaim_work(mdsc
);
4155 ceph_trim_snapid_map(mdsc
);
4157 maybe_recover_session(mdsc
);
4159 schedule_delayed(mdsc
);
4162 int ceph_mdsc_init(struct ceph_fs_client
*fsc
)
4165 struct ceph_mds_client
*mdsc
;
4167 mdsc
= kzalloc(sizeof(struct ceph_mds_client
), GFP_NOFS
);
4171 mutex_init(&mdsc
->mutex
);
4172 mdsc
->mdsmap
= kzalloc(sizeof(*mdsc
->mdsmap
), GFP_NOFS
);
4173 if (!mdsc
->mdsmap
) {
4179 init_completion(&mdsc
->safe_umount_waiters
);
4180 init_waitqueue_head(&mdsc
->session_close_wq
);
4181 INIT_LIST_HEAD(&mdsc
->waiting_for_map
);
4182 mdsc
->sessions
= NULL
;
4183 atomic_set(&mdsc
->num_sessions
, 0);
4184 mdsc
->max_sessions
= 0;
4186 atomic64_set(&mdsc
->quotarealms_count
, 0);
4187 mdsc
->quotarealms_inodes
= RB_ROOT
;
4188 mutex_init(&mdsc
->quotarealms_inodes_mutex
);
4189 mdsc
->last_snap_seq
= 0;
4190 init_rwsem(&mdsc
->snap_rwsem
);
4191 mdsc
->snap_realms
= RB_ROOT
;
4192 INIT_LIST_HEAD(&mdsc
->snap_empty
);
4193 mdsc
->num_snap_realms
= 0;
4194 spin_lock_init(&mdsc
->snap_empty_lock
);
4196 mdsc
->oldest_tid
= 0;
4197 mdsc
->request_tree
= RB_ROOT
;
4198 INIT_DELAYED_WORK(&mdsc
->delayed_work
, delayed_work
);
4199 mdsc
->last_renew_caps
= jiffies
;
4200 INIT_LIST_HEAD(&mdsc
->cap_delay_list
);
4201 INIT_LIST_HEAD(&mdsc
->cap_wait_list
);
4202 spin_lock_init(&mdsc
->cap_delay_lock
);
4203 INIT_LIST_HEAD(&mdsc
->snap_flush_list
);
4204 spin_lock_init(&mdsc
->snap_flush_lock
);
4205 mdsc
->last_cap_flush_tid
= 1;
4206 INIT_LIST_HEAD(&mdsc
->cap_flush_list
);
4207 INIT_LIST_HEAD(&mdsc
->cap_dirty
);
4208 INIT_LIST_HEAD(&mdsc
->cap_dirty_migrating
);
4209 mdsc
->num_cap_flushing
= 0;
4210 spin_lock_init(&mdsc
->cap_dirty_lock
);
4211 init_waitqueue_head(&mdsc
->cap_flushing_wq
);
4212 INIT_WORK(&mdsc
->cap_reclaim_work
, ceph_cap_reclaim_work
);
4213 atomic_set(&mdsc
->cap_reclaim_pending
, 0);
4215 spin_lock_init(&mdsc
->dentry_list_lock
);
4216 INIT_LIST_HEAD(&mdsc
->dentry_leases
);
4217 INIT_LIST_HEAD(&mdsc
->dentry_dir_leases
);
4219 ceph_caps_init(mdsc
);
4220 ceph_adjust_caps_max_min(mdsc
, fsc
->mount_options
);
4222 spin_lock_init(&mdsc
->snapid_map_lock
);
4223 mdsc
->snapid_map_tree
= RB_ROOT
;
4224 INIT_LIST_HEAD(&mdsc
->snapid_map_lru
);
4226 init_rwsem(&mdsc
->pool_perm_rwsem
);
4227 mdsc
->pool_perm_tree
= RB_ROOT
;
4229 strscpy(mdsc
->nodename
, utsname()->nodename
,
4230 sizeof(mdsc
->nodename
));
4235 * Wait for safe replies on open mds requests. If we time out, drop
4236 * all requests from the tree to avoid dangling dentry refs.
4238 static void wait_requests(struct ceph_mds_client
*mdsc
)
4240 struct ceph_options
*opts
= mdsc
->fsc
->client
->options
;
4241 struct ceph_mds_request
*req
;
4243 mutex_lock(&mdsc
->mutex
);
4244 if (__get_oldest_req(mdsc
)) {
4245 mutex_unlock(&mdsc
->mutex
);
4247 dout("wait_requests waiting for requests\n");
4248 wait_for_completion_timeout(&mdsc
->safe_umount_waiters
,
4249 ceph_timeout_jiffies(opts
->mount_timeout
));
4251 /* tear down remaining requests */
4252 mutex_lock(&mdsc
->mutex
);
4253 while ((req
= __get_oldest_req(mdsc
))) {
4254 dout("wait_requests timed out on tid %llu\n",
4256 list_del_init(&req
->r_wait
);
4257 __unregister_request(mdsc
, req
);
4260 mutex_unlock(&mdsc
->mutex
);
4261 dout("wait_requests done\n");
4265 * called before mount is ro, and before dentries are torn down.
4266 * (hmm, does this still race with new lookups?)
4268 void ceph_mdsc_pre_umount(struct ceph_mds_client
*mdsc
)
4270 dout("pre_umount\n");
4273 lock_unlock_sessions(mdsc
);
4274 ceph_flush_dirty_caps(mdsc
);
4275 wait_requests(mdsc
);
4278 * wait for reply handlers to drop their request refs and
4279 * their inode/dcache refs
4283 ceph_cleanup_quotarealms_inodes(mdsc
);
4287 * wait for all write mds requests to flush.
4289 static void wait_unsafe_requests(struct ceph_mds_client
*mdsc
, u64 want_tid
)
4291 struct ceph_mds_request
*req
= NULL
, *nextreq
;
4294 mutex_lock(&mdsc
->mutex
);
4295 dout("wait_unsafe_requests want %lld\n", want_tid
);
4297 req
= __get_oldest_req(mdsc
);
4298 while (req
&& req
->r_tid
<= want_tid
) {
4299 /* find next request */
4300 n
= rb_next(&req
->r_node
);
4302 nextreq
= rb_entry(n
, struct ceph_mds_request
, r_node
);
4305 if (req
->r_op
!= CEPH_MDS_OP_SETFILELOCK
&&
4306 (req
->r_op
& CEPH_MDS_OP_WRITE
)) {
4308 ceph_mdsc_get_request(req
);
4310 ceph_mdsc_get_request(nextreq
);
4311 mutex_unlock(&mdsc
->mutex
);
4312 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
4313 req
->r_tid
, want_tid
);
4314 wait_for_completion(&req
->r_safe_completion
);
4315 mutex_lock(&mdsc
->mutex
);
4316 ceph_mdsc_put_request(req
);
4318 break; /* next dne before, so we're done! */
4319 if (RB_EMPTY_NODE(&nextreq
->r_node
)) {
4320 /* next request was removed from tree */
4321 ceph_mdsc_put_request(nextreq
);
4324 ceph_mdsc_put_request(nextreq
); /* won't go away */
4328 mutex_unlock(&mdsc
->mutex
);
4329 dout("wait_unsafe_requests done\n");
4332 void ceph_mdsc_sync(struct ceph_mds_client
*mdsc
)
4334 u64 want_tid
, want_flush
;
4336 if (READ_ONCE(mdsc
->fsc
->mount_state
) == CEPH_MOUNT_SHUTDOWN
)
4340 mutex_lock(&mdsc
->mutex
);
4341 want_tid
= mdsc
->last_tid
;
4342 mutex_unlock(&mdsc
->mutex
);
4344 ceph_flush_dirty_caps(mdsc
);
4345 spin_lock(&mdsc
->cap_dirty_lock
);
4346 want_flush
= mdsc
->last_cap_flush_tid
;
4347 if (!list_empty(&mdsc
->cap_flush_list
)) {
4348 struct ceph_cap_flush
*cf
=
4349 list_last_entry(&mdsc
->cap_flush_list
,
4350 struct ceph_cap_flush
, g_list
);
4353 spin_unlock(&mdsc
->cap_dirty_lock
);
4355 dout("sync want tid %lld flush_seq %lld\n",
4356 want_tid
, want_flush
);
4358 wait_unsafe_requests(mdsc
, want_tid
);
4359 wait_caps_flush(mdsc
, want_flush
);
4363 * true if all sessions are closed, or we force unmount
4365 static bool done_closing_sessions(struct ceph_mds_client
*mdsc
, int skipped
)
4367 if (READ_ONCE(mdsc
->fsc
->mount_state
) == CEPH_MOUNT_SHUTDOWN
)
4369 return atomic_read(&mdsc
->num_sessions
) <= skipped
;
4373 * called after sb is ro.
4375 void ceph_mdsc_close_sessions(struct ceph_mds_client
*mdsc
)
4377 struct ceph_options
*opts
= mdsc
->fsc
->client
->options
;
4378 struct ceph_mds_session
*session
;
4382 dout("close_sessions\n");
4384 /* close sessions */
4385 mutex_lock(&mdsc
->mutex
);
4386 for (i
= 0; i
< mdsc
->max_sessions
; i
++) {
4387 session
= __ceph_lookup_mds_session(mdsc
, i
);
4390 mutex_unlock(&mdsc
->mutex
);
4391 mutex_lock(&session
->s_mutex
);
4392 if (__close_session(mdsc
, session
) <= 0)
4394 mutex_unlock(&session
->s_mutex
);
4395 ceph_put_mds_session(session
);
4396 mutex_lock(&mdsc
->mutex
);
4398 mutex_unlock(&mdsc
->mutex
);
4400 dout("waiting for sessions to close\n");
4401 wait_event_timeout(mdsc
->session_close_wq
,
4402 done_closing_sessions(mdsc
, skipped
),
4403 ceph_timeout_jiffies(opts
->mount_timeout
));
4405 /* tear down remaining sessions */
4406 mutex_lock(&mdsc
->mutex
);
4407 for (i
= 0; i
< mdsc
->max_sessions
; i
++) {
4408 if (mdsc
->sessions
[i
]) {
4409 session
= ceph_get_mds_session(mdsc
->sessions
[i
]);
4410 __unregister_session(mdsc
, session
);
4411 mutex_unlock(&mdsc
->mutex
);
4412 mutex_lock(&session
->s_mutex
);
4413 remove_session_caps(session
);
4414 mutex_unlock(&session
->s_mutex
);
4415 ceph_put_mds_session(session
);
4416 mutex_lock(&mdsc
->mutex
);
4419 WARN_ON(!list_empty(&mdsc
->cap_delay_list
));
4420 mutex_unlock(&mdsc
->mutex
);
4422 ceph_cleanup_snapid_map(mdsc
);
4423 ceph_cleanup_empty_realms(mdsc
);
4425 cancel_work_sync(&mdsc
->cap_reclaim_work
);
4426 cancel_delayed_work_sync(&mdsc
->delayed_work
); /* cancel timer */
4431 void ceph_mdsc_force_umount(struct ceph_mds_client
*mdsc
)
4433 struct ceph_mds_session
*session
;
4436 dout("force umount\n");
4438 mutex_lock(&mdsc
->mutex
);
4439 for (mds
= 0; mds
< mdsc
->max_sessions
; mds
++) {
4440 session
= __ceph_lookup_mds_session(mdsc
, mds
);
4444 if (session
->s_state
== CEPH_MDS_SESSION_REJECTED
)
4445 __unregister_session(mdsc
, session
);
4446 __wake_requests(mdsc
, &session
->s_waiting
);
4447 mutex_unlock(&mdsc
->mutex
);
4449 mutex_lock(&session
->s_mutex
);
4450 __close_session(mdsc
, session
);
4451 if (session
->s_state
== CEPH_MDS_SESSION_CLOSING
) {
4452 cleanup_session_requests(mdsc
, session
);
4453 remove_session_caps(session
);
4455 mutex_unlock(&session
->s_mutex
);
4456 ceph_put_mds_session(session
);
4458 mutex_lock(&mdsc
->mutex
);
4459 kick_requests(mdsc
, mds
);
4461 __wake_requests(mdsc
, &mdsc
->waiting_for_map
);
4462 mutex_unlock(&mdsc
->mutex
);
4465 static void ceph_mdsc_stop(struct ceph_mds_client
*mdsc
)
4468 cancel_delayed_work_sync(&mdsc
->delayed_work
); /* cancel timer */
4470 ceph_mdsmap_destroy(mdsc
->mdsmap
);
4471 kfree(mdsc
->sessions
);
4472 ceph_caps_finalize(mdsc
);
4473 ceph_pool_perm_destroy(mdsc
);
4476 void ceph_mdsc_destroy(struct ceph_fs_client
*fsc
)
4478 struct ceph_mds_client
*mdsc
= fsc
->mdsc
;
4479 dout("mdsc_destroy %p\n", mdsc
);
4484 /* flush out any connection work with references to us */
4487 ceph_mdsc_stop(mdsc
);
4491 dout("mdsc_destroy %p done\n", mdsc
);
4494 void ceph_mdsc_handle_fsmap(struct ceph_mds_client
*mdsc
, struct ceph_msg
*msg
)
4496 struct ceph_fs_client
*fsc
= mdsc
->fsc
;
4497 const char *mds_namespace
= fsc
->mount_options
->mds_namespace
;
4498 void *p
= msg
->front
.iov_base
;
4499 void *end
= p
+ msg
->front
.iov_len
;
4503 u32 mount_fscid
= (u32
)-1;
4504 u8 struct_v
, struct_cv
;
4507 ceph_decode_need(&p
, end
, sizeof(u32
), bad
);
4508 epoch
= ceph_decode_32(&p
);
4510 dout("handle_fsmap epoch %u\n", epoch
);
4512 ceph_decode_need(&p
, end
, 2 + sizeof(u32
), bad
);
4513 struct_v
= ceph_decode_8(&p
);
4514 struct_cv
= ceph_decode_8(&p
);
4515 map_len
= ceph_decode_32(&p
);
4517 ceph_decode_need(&p
, end
, sizeof(u32
) * 3, bad
);
4518 p
+= sizeof(u32
) * 2; /* skip epoch and legacy_client_fscid */
4520 num_fs
= ceph_decode_32(&p
);
4521 while (num_fs
-- > 0) {
4522 void *info_p
, *info_end
;
4527 ceph_decode_need(&p
, end
, 2 + sizeof(u32
), bad
);
4528 info_v
= ceph_decode_8(&p
);
4529 info_cv
= ceph_decode_8(&p
);
4530 info_len
= ceph_decode_32(&p
);
4531 ceph_decode_need(&p
, end
, info_len
, bad
);
4533 info_end
= p
+ info_len
;
4536 ceph_decode_need(&info_p
, info_end
, sizeof(u32
) * 2, bad
);
4537 fscid
= ceph_decode_32(&info_p
);
4538 namelen
= ceph_decode_32(&info_p
);
4539 ceph_decode_need(&info_p
, info_end
, namelen
, bad
);
4541 if (mds_namespace
&&
4542 strlen(mds_namespace
) == namelen
&&
4543 !strncmp(mds_namespace
, (char *)info_p
, namelen
)) {
4544 mount_fscid
= fscid
;
4549 ceph_monc_got_map(&fsc
->client
->monc
, CEPH_SUB_FSMAP
, epoch
);
4550 if (mount_fscid
!= (u32
)-1) {
4551 fsc
->client
->monc
.fs_cluster_id
= mount_fscid
;
4552 ceph_monc_want_map(&fsc
->client
->monc
, CEPH_SUB_MDSMAP
,
4554 ceph_monc_renew_subs(&fsc
->client
->monc
);
4562 pr_err("error decoding fsmap\n");
4564 mutex_lock(&mdsc
->mutex
);
4565 mdsc
->mdsmap_err
= err
;
4566 __wake_requests(mdsc
, &mdsc
->waiting_for_map
);
4567 mutex_unlock(&mdsc
->mutex
);
4571 * handle mds map update.
4573 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client
*mdsc
, struct ceph_msg
*msg
)
4577 void *p
= msg
->front
.iov_base
;
4578 void *end
= p
+ msg
->front
.iov_len
;
4579 struct ceph_mdsmap
*newmap
, *oldmap
;
4580 struct ceph_fsid fsid
;
4583 ceph_decode_need(&p
, end
, sizeof(fsid
)+2*sizeof(u32
), bad
);
4584 ceph_decode_copy(&p
, &fsid
, sizeof(fsid
));
4585 if (ceph_check_fsid(mdsc
->fsc
->client
, &fsid
) < 0)
4587 epoch
= ceph_decode_32(&p
);
4588 maplen
= ceph_decode_32(&p
);
4589 dout("handle_map epoch %u len %d\n", epoch
, (int)maplen
);
4591 /* do we need it? */
4592 mutex_lock(&mdsc
->mutex
);
4593 if (mdsc
->mdsmap
&& epoch
<= mdsc
->mdsmap
->m_epoch
) {
4594 dout("handle_map epoch %u <= our %u\n",
4595 epoch
, mdsc
->mdsmap
->m_epoch
);
4596 mutex_unlock(&mdsc
->mutex
);
4600 newmap
= ceph_mdsmap_decode(&p
, end
);
4601 if (IS_ERR(newmap
)) {
4602 err
= PTR_ERR(newmap
);
4606 /* swap into place */
4608 oldmap
= mdsc
->mdsmap
;
4609 mdsc
->mdsmap
= newmap
;
4610 check_new_map(mdsc
, newmap
, oldmap
);
4611 ceph_mdsmap_destroy(oldmap
);
4613 mdsc
->mdsmap
= newmap
; /* first mds map */
4615 mdsc
->fsc
->max_file_size
= min((loff_t
)mdsc
->mdsmap
->m_max_file_size
,
4618 __wake_requests(mdsc
, &mdsc
->waiting_for_map
);
4619 ceph_monc_got_map(&mdsc
->fsc
->client
->monc
, CEPH_SUB_MDSMAP
,
4620 mdsc
->mdsmap
->m_epoch
);
4622 mutex_unlock(&mdsc
->mutex
);
4623 schedule_delayed(mdsc
);
4627 mutex_unlock(&mdsc
->mutex
);
4629 pr_err("error decoding mdsmap %d\n", err
);
4633 static struct ceph_connection
*con_get(struct ceph_connection
*con
)
4635 struct ceph_mds_session
*s
= con
->private;
4637 if (ceph_get_mds_session(s
))
4642 static void con_put(struct ceph_connection
*con
)
4644 struct ceph_mds_session
*s
= con
->private;
4646 ceph_put_mds_session(s
);
4650 * if the client is unresponsive for long enough, the mds will kill
4651 * the session entirely.
4653 static void peer_reset(struct ceph_connection
*con
)
4655 struct ceph_mds_session
*s
= con
->private;
4656 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4658 pr_warn("mds%d closed our session\n", s
->s_mds
);
4659 send_mds_reconnect(mdsc
, s
);
4662 static void dispatch(struct ceph_connection
*con
, struct ceph_msg
*msg
)
4664 struct ceph_mds_session
*s
= con
->private;
4665 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4666 int type
= le16_to_cpu(msg
->hdr
.type
);
4668 mutex_lock(&mdsc
->mutex
);
4669 if (__verify_registered_session(mdsc
, s
) < 0) {
4670 mutex_unlock(&mdsc
->mutex
);
4673 mutex_unlock(&mdsc
->mutex
);
4676 case CEPH_MSG_MDS_MAP
:
4677 ceph_mdsc_handle_mdsmap(mdsc
, msg
);
4679 case CEPH_MSG_FS_MAP_USER
:
4680 ceph_mdsc_handle_fsmap(mdsc
, msg
);
4682 case CEPH_MSG_CLIENT_SESSION
:
4683 handle_session(s
, msg
);
4685 case CEPH_MSG_CLIENT_REPLY
:
4686 handle_reply(s
, msg
);
4688 case CEPH_MSG_CLIENT_REQUEST_FORWARD
:
4689 handle_forward(mdsc
, s
, msg
);
4691 case CEPH_MSG_CLIENT_CAPS
:
4692 ceph_handle_caps(s
, msg
);
4694 case CEPH_MSG_CLIENT_SNAP
:
4695 ceph_handle_snap(mdsc
, s
, msg
);
4697 case CEPH_MSG_CLIENT_LEASE
:
4698 handle_lease(mdsc
, s
, msg
);
4700 case CEPH_MSG_CLIENT_QUOTA
:
4701 ceph_handle_quota(mdsc
, s
, msg
);
4705 pr_err("received unknown message type %d %s\n", type
,
4706 ceph_msg_type_name(type
));
4717 * Note: returned pointer is the address of a structure that's
4718 * managed separately. Caller must *not* attempt to free it.
4720 static struct ceph_auth_handshake
*get_authorizer(struct ceph_connection
*con
,
4721 int *proto
, int force_new
)
4723 struct ceph_mds_session
*s
= con
->private;
4724 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4725 struct ceph_auth_client
*ac
= mdsc
->fsc
->client
->monc
.auth
;
4726 struct ceph_auth_handshake
*auth
= &s
->s_auth
;
4728 if (force_new
&& auth
->authorizer
) {
4729 ceph_auth_destroy_authorizer(auth
->authorizer
);
4730 auth
->authorizer
= NULL
;
4732 if (!auth
->authorizer
) {
4733 int ret
= ceph_auth_create_authorizer(ac
, CEPH_ENTITY_TYPE_MDS
,
4736 return ERR_PTR(ret
);
4738 int ret
= ceph_auth_update_authorizer(ac
, CEPH_ENTITY_TYPE_MDS
,
4741 return ERR_PTR(ret
);
4743 *proto
= ac
->protocol
;
4748 static int add_authorizer_challenge(struct ceph_connection
*con
,
4749 void *challenge_buf
, int challenge_buf_len
)
4751 struct ceph_mds_session
*s
= con
->private;
4752 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4753 struct ceph_auth_client
*ac
= mdsc
->fsc
->client
->monc
.auth
;
4755 return ceph_auth_add_authorizer_challenge(ac
, s
->s_auth
.authorizer
,
4756 challenge_buf
, challenge_buf_len
);
4759 static int verify_authorizer_reply(struct ceph_connection
*con
)
4761 struct ceph_mds_session
*s
= con
->private;
4762 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4763 struct ceph_auth_client
*ac
= mdsc
->fsc
->client
->monc
.auth
;
4765 return ceph_auth_verify_authorizer_reply(ac
, s
->s_auth
.authorizer
);
4768 static int invalidate_authorizer(struct ceph_connection
*con
)
4770 struct ceph_mds_session
*s
= con
->private;
4771 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4772 struct ceph_auth_client
*ac
= mdsc
->fsc
->client
->monc
.auth
;
4774 ceph_auth_invalidate_authorizer(ac
, CEPH_ENTITY_TYPE_MDS
);
4776 return ceph_monc_validate_auth(&mdsc
->fsc
->client
->monc
);
4779 static struct ceph_msg
*mds_alloc_msg(struct ceph_connection
*con
,
4780 struct ceph_msg_header
*hdr
, int *skip
)
4782 struct ceph_msg
*msg
;
4783 int type
= (int) le16_to_cpu(hdr
->type
);
4784 int front_len
= (int) le32_to_cpu(hdr
->front_len
);
4790 msg
= ceph_msg_new(type
, front_len
, GFP_NOFS
, false);
4792 pr_err("unable to allocate msg type %d len %d\n",
4800 static int mds_sign_message(struct ceph_msg
*msg
)
4802 struct ceph_mds_session
*s
= msg
->con
->private;
4803 struct ceph_auth_handshake
*auth
= &s
->s_auth
;
4805 return ceph_auth_sign_message(auth
, msg
);
4808 static int mds_check_message_signature(struct ceph_msg
*msg
)
4810 struct ceph_mds_session
*s
= msg
->con
->private;
4811 struct ceph_auth_handshake
*auth
= &s
->s_auth
;
4813 return ceph_auth_check_message_signature(auth
, msg
);
4816 static const struct ceph_connection_operations mds_con_ops
= {
4819 .dispatch
= dispatch
,
4820 .get_authorizer
= get_authorizer
,
4821 .add_authorizer_challenge
= add_authorizer_challenge
,
4822 .verify_authorizer_reply
= verify_authorizer_reply
,
4823 .invalidate_authorizer
= invalidate_authorizer
,
4824 .peer_reset
= peer_reset
,
4825 .alloc_msg
= mds_alloc_msg
,
4826 .sign_message
= mds_sign_message
,
4827 .check_message_signature
= mds_check_message_signature
,