1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
14 #include "mds_client.h"
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/messenger.h>
18 #include <linux/ceph/decode.h>
19 #include <linux/ceph/pagelist.h>
20 #include <linux/ceph/auth.h>
21 #include <linux/ceph/debugfs.h>
23 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26 * A cluster of MDS (metadata server) daemons is responsible for
27 * managing the file system namespace (the directory hierarchy and
28 * inodes) and for coordinating shared access to storage. Metadata is
29 * partitioning hierarchically across a number of servers, and that
30 * partition varies over time as the cluster adjusts the distribution
31 * in order to balance load.
33 * The MDS client is primarily responsible to managing synchronous
34 * metadata requests for operations like open, unlink, and so forth.
35 * If there is a MDS failure, we find out about it when we (possibly
36 * request and) receive a new MDS map, and can resubmit affected
39 * For the most part, though, we take advantage of a lossless
40 * communications channel to the MDS, and do not need to worry about
41 * timing out or resubmitting requests.
43 * We maintain a stateful "session" with each MDS we interact with.
44 * Within each session, we sent periodic heartbeat messages to ensure
45 * any capabilities or leases we have been issues remain valid. If
46 * the session times out and goes stale, our leases and capabilities
47 * are no longer valid.
50 struct ceph_reconnect_state
{
51 struct ceph_mds_session
*session
;
52 int nr_caps
, nr_realms
;
53 struct ceph_pagelist
*pagelist
;
58 static void __wake_requests(struct ceph_mds_client
*mdsc
,
59 struct list_head
*head
);
60 static void ceph_cap_release_work(struct work_struct
*work
);
61 static void ceph_cap_reclaim_work(struct work_struct
*work
);
63 static const struct ceph_connection_operations mds_con_ops
;
70 static int parse_reply_info_quota(void **p
, void *end
,
71 struct ceph_mds_reply_info_in
*info
)
73 u8 struct_v
, struct_compat
;
76 ceph_decode_8_safe(p
, end
, struct_v
, bad
);
77 ceph_decode_8_safe(p
, end
, struct_compat
, bad
);
78 /* struct_v is expected to be >= 1. we only
79 * understand encoding with struct_compat == 1. */
80 if (!struct_v
|| struct_compat
!= 1)
82 ceph_decode_32_safe(p
, end
, struct_len
, bad
);
83 ceph_decode_need(p
, end
, struct_len
, bad
);
84 end
= *p
+ struct_len
;
85 ceph_decode_64_safe(p
, end
, info
->max_bytes
, bad
);
86 ceph_decode_64_safe(p
, end
, info
->max_files
, bad
);
94 * parse individual inode info
96 static int parse_reply_info_in(void **p
, void *end
,
97 struct ceph_mds_reply_info_in
*info
,
103 if (features
== (u64
)-1) {
106 ceph_decode_8_safe(p
, end
, struct_v
, bad
);
107 ceph_decode_8_safe(p
, end
, struct_compat
, bad
);
108 /* struct_v is expected to be >= 1. we only understand
109 * encoding with struct_compat == 1. */
110 if (!struct_v
|| struct_compat
!= 1)
112 ceph_decode_32_safe(p
, end
, struct_len
, bad
);
113 ceph_decode_need(p
, end
, struct_len
, bad
);
114 end
= *p
+ struct_len
;
117 ceph_decode_need(p
, end
, sizeof(struct ceph_mds_reply_inode
), bad
);
119 *p
+= sizeof(struct ceph_mds_reply_inode
) +
120 sizeof(*info
->in
->fragtree
.splits
) *
121 le32_to_cpu(info
->in
->fragtree
.nsplits
);
123 ceph_decode_32_safe(p
, end
, info
->symlink_len
, bad
);
124 ceph_decode_need(p
, end
, info
->symlink_len
, bad
);
126 *p
+= info
->symlink_len
;
128 ceph_decode_copy_safe(p
, end
, &info
->dir_layout
,
129 sizeof(info
->dir_layout
), bad
);
130 ceph_decode_32_safe(p
, end
, info
->xattr_len
, bad
);
131 ceph_decode_need(p
, end
, info
->xattr_len
, bad
);
132 info
->xattr_data
= *p
;
133 *p
+= info
->xattr_len
;
135 if (features
== (u64
)-1) {
137 ceph_decode_64_safe(p
, end
, info
->inline_version
, bad
);
138 ceph_decode_32_safe(p
, end
, info
->inline_len
, bad
);
139 ceph_decode_need(p
, end
, info
->inline_len
, bad
);
140 info
->inline_data
= *p
;
141 *p
+= info
->inline_len
;
143 err
= parse_reply_info_quota(p
, end
, info
);
147 ceph_decode_32_safe(p
, end
, info
->pool_ns_len
, bad
);
148 if (info
->pool_ns_len
> 0) {
149 ceph_decode_need(p
, end
, info
->pool_ns_len
, bad
);
150 info
->pool_ns_data
= *p
;
151 *p
+= info
->pool_ns_len
;
155 ceph_decode_need(p
, end
, sizeof(info
->btime
), bad
);
156 ceph_decode_copy(p
, &info
->btime
, sizeof(info
->btime
));
158 /* change attribute */
159 ceph_decode_64_safe(p
, end
, info
->change_attr
, bad
);
163 ceph_decode_32_safe(p
, end
, info
->dir_pin
, bad
);
165 info
->dir_pin
= -ENODATA
;
168 /* snapshot birth time, remains zero for v<=2 */
170 ceph_decode_need(p
, end
, sizeof(info
->snap_btime
), bad
);
171 ceph_decode_copy(p
, &info
->snap_btime
,
172 sizeof(info
->snap_btime
));
174 memset(&info
->snap_btime
, 0, sizeof(info
->snap_btime
));
179 if (features
& CEPH_FEATURE_MDS_INLINE_DATA
) {
180 ceph_decode_64_safe(p
, end
, info
->inline_version
, bad
);
181 ceph_decode_32_safe(p
, end
, info
->inline_len
, bad
);
182 ceph_decode_need(p
, end
, info
->inline_len
, bad
);
183 info
->inline_data
= *p
;
184 *p
+= info
->inline_len
;
186 info
->inline_version
= CEPH_INLINE_NONE
;
188 if (features
& CEPH_FEATURE_MDS_QUOTA
) {
189 err
= parse_reply_info_quota(p
, end
, info
);
197 info
->pool_ns_len
= 0;
198 info
->pool_ns_data
= NULL
;
199 if (features
& CEPH_FEATURE_FS_FILE_LAYOUT_V2
) {
200 ceph_decode_32_safe(p
, end
, info
->pool_ns_len
, bad
);
201 if (info
->pool_ns_len
> 0) {
202 ceph_decode_need(p
, end
, info
->pool_ns_len
, bad
);
203 info
->pool_ns_data
= *p
;
204 *p
+= info
->pool_ns_len
;
208 if (features
& CEPH_FEATURE_FS_BTIME
) {
209 ceph_decode_need(p
, end
, sizeof(info
->btime
), bad
);
210 ceph_decode_copy(p
, &info
->btime
, sizeof(info
->btime
));
211 ceph_decode_64_safe(p
, end
, info
->change_attr
, bad
);
214 info
->dir_pin
= -ENODATA
;
215 /* info->snap_btime remains zero */
224 static int parse_reply_info_dir(void **p
, void *end
,
225 struct ceph_mds_reply_dirfrag
**dirfrag
,
228 if (features
== (u64
)-1) {
229 u8 struct_v
, struct_compat
;
231 ceph_decode_8_safe(p
, end
, struct_v
, bad
);
232 ceph_decode_8_safe(p
, end
, struct_compat
, bad
);
233 /* struct_v is expected to be >= 1. we only understand
234 * encoding whose struct_compat == 1. */
235 if (!struct_v
|| struct_compat
!= 1)
237 ceph_decode_32_safe(p
, end
, struct_len
, bad
);
238 ceph_decode_need(p
, end
, struct_len
, bad
);
239 end
= *p
+ struct_len
;
242 ceph_decode_need(p
, end
, sizeof(**dirfrag
), bad
);
244 *p
+= sizeof(**dirfrag
) + sizeof(u32
) * le32_to_cpu((*dirfrag
)->ndist
);
245 if (unlikely(*p
> end
))
247 if (features
== (u64
)-1)
254 static int parse_reply_info_lease(void **p
, void *end
,
255 struct ceph_mds_reply_lease
**lease
,
258 if (features
== (u64
)-1) {
259 u8 struct_v
, struct_compat
;
261 ceph_decode_8_safe(p
, end
, struct_v
, bad
);
262 ceph_decode_8_safe(p
, end
, struct_compat
, bad
);
263 /* struct_v is expected to be >= 1. we only understand
264 * encoding whose struct_compat == 1. */
265 if (!struct_v
|| struct_compat
!= 1)
267 ceph_decode_32_safe(p
, end
, struct_len
, bad
);
268 ceph_decode_need(p
, end
, struct_len
, bad
);
269 end
= *p
+ struct_len
;
272 ceph_decode_need(p
, end
, sizeof(**lease
), bad
);
274 *p
+= sizeof(**lease
);
275 if (features
== (u64
)-1)
283 * parse a normal reply, which may contain a (dir+)dentry and/or a
286 static int parse_reply_info_trace(void **p
, void *end
,
287 struct ceph_mds_reply_info_parsed
*info
,
292 if (info
->head
->is_dentry
) {
293 err
= parse_reply_info_in(p
, end
, &info
->diri
, features
);
297 err
= parse_reply_info_dir(p
, end
, &info
->dirfrag
, features
);
301 ceph_decode_32_safe(p
, end
, info
->dname_len
, bad
);
302 ceph_decode_need(p
, end
, info
->dname_len
, bad
);
304 *p
+= info
->dname_len
;
306 err
= parse_reply_info_lease(p
, end
, &info
->dlease
, features
);
311 if (info
->head
->is_target
) {
312 err
= parse_reply_info_in(p
, end
, &info
->targeti
, features
);
317 if (unlikely(*p
!= end
))
324 pr_err("problem parsing mds trace %d\n", err
);
329 * parse readdir results
331 static int parse_reply_info_readdir(void **p
, void *end
,
332 struct ceph_mds_reply_info_parsed
*info
,
338 err
= parse_reply_info_dir(p
, end
, &info
->dir_dir
, features
);
342 ceph_decode_need(p
, end
, sizeof(num
) + 2, bad
);
343 num
= ceph_decode_32(p
);
345 u16 flags
= ceph_decode_16(p
);
346 info
->dir_end
= !!(flags
& CEPH_READDIR_FRAG_END
);
347 info
->dir_complete
= !!(flags
& CEPH_READDIR_FRAG_COMPLETE
);
348 info
->hash_order
= !!(flags
& CEPH_READDIR_HASH_ORDER
);
349 info
->offset_hash
= !!(flags
& CEPH_READDIR_OFFSET_HASH
);
354 BUG_ON(!info
->dir_entries
);
355 if ((unsigned long)(info
->dir_entries
+ num
) >
356 (unsigned long)info
->dir_entries
+ info
->dir_buf_size
) {
357 pr_err("dir contents are larger than expected\n");
364 struct ceph_mds_reply_dir_entry
*rde
= info
->dir_entries
+ i
;
366 ceph_decode_32_safe(p
, end
, rde
->name_len
, bad
);
367 ceph_decode_need(p
, end
, rde
->name_len
, bad
);
370 dout("parsed dir dname '%.*s'\n", rde
->name_len
, rde
->name
);
373 err
= parse_reply_info_lease(p
, end
, &rde
->lease
, features
);
377 err
= parse_reply_info_in(p
, end
, &rde
->inode
, features
);
380 /* ceph_readdir_prepopulate() will update it */
387 /* Skip over any unrecognized fields */
394 pr_err("problem parsing dir contents %d\n", err
);
399 * parse fcntl F_GETLK results
401 static int parse_reply_info_filelock(void **p
, void *end
,
402 struct ceph_mds_reply_info_parsed
*info
,
405 if (*p
+ sizeof(*info
->filelock_reply
) > end
)
408 info
->filelock_reply
= *p
;
410 /* Skip over any unrecognized fields */
418 * parse create results
420 static int parse_reply_info_create(void **p
, void *end
,
421 struct ceph_mds_reply_info_parsed
*info
,
424 if (features
== (u64
)-1 ||
425 (features
& CEPH_FEATURE_REPLY_CREATE_INODE
)) {
426 /* Malformed reply? */
428 info
->has_create_ino
= false;
430 info
->has_create_ino
= true;
431 ceph_decode_64_safe(p
, end
, info
->ino
, bad
);
438 /* Skip over any unrecognized fields */
446 * parse extra results
448 static int parse_reply_info_extra(void **p
, void *end
,
449 struct ceph_mds_reply_info_parsed
*info
,
452 u32 op
= le32_to_cpu(info
->head
->op
);
454 if (op
== CEPH_MDS_OP_GETFILELOCK
)
455 return parse_reply_info_filelock(p
, end
, info
, features
);
456 else if (op
== CEPH_MDS_OP_READDIR
|| op
== CEPH_MDS_OP_LSSNAP
)
457 return parse_reply_info_readdir(p
, end
, info
, features
);
458 else if (op
== CEPH_MDS_OP_CREATE
)
459 return parse_reply_info_create(p
, end
, info
, features
);
465 * parse entire mds reply
467 static int parse_reply_info(struct ceph_msg
*msg
,
468 struct ceph_mds_reply_info_parsed
*info
,
475 info
->head
= msg
->front
.iov_base
;
476 p
= msg
->front
.iov_base
+ sizeof(struct ceph_mds_reply_head
);
477 end
= p
+ msg
->front
.iov_len
- sizeof(struct ceph_mds_reply_head
);
480 ceph_decode_32_safe(&p
, end
, len
, bad
);
482 ceph_decode_need(&p
, end
, len
, bad
);
483 err
= parse_reply_info_trace(&p
, p
+len
, info
, features
);
489 ceph_decode_32_safe(&p
, end
, len
, bad
);
491 ceph_decode_need(&p
, end
, len
, bad
);
492 err
= parse_reply_info_extra(&p
, p
+len
, info
, features
);
498 ceph_decode_32_safe(&p
, end
, len
, bad
);
499 info
->snapblob_len
= len
;
510 pr_err("mds parse_reply err %d\n", err
);
514 static void destroy_reply_info(struct ceph_mds_reply_info_parsed
*info
)
516 if (!info
->dir_entries
)
518 free_pages((unsigned long)info
->dir_entries
, get_order(info
->dir_buf_size
));
525 const char *ceph_session_state_name(int s
)
528 case CEPH_MDS_SESSION_NEW
: return "new";
529 case CEPH_MDS_SESSION_OPENING
: return "opening";
530 case CEPH_MDS_SESSION_OPEN
: return "open";
531 case CEPH_MDS_SESSION_HUNG
: return "hung";
532 case CEPH_MDS_SESSION_CLOSING
: return "closing";
533 case CEPH_MDS_SESSION_RESTARTING
: return "restarting";
534 case CEPH_MDS_SESSION_RECONNECTING
: return "reconnecting";
535 case CEPH_MDS_SESSION_REJECTED
: return "rejected";
536 default: return "???";
540 static struct ceph_mds_session
*get_session(struct ceph_mds_session
*s
)
542 if (refcount_inc_not_zero(&s
->s_ref
)) {
543 dout("mdsc get_session %p %d -> %d\n", s
,
544 refcount_read(&s
->s_ref
)-1, refcount_read(&s
->s_ref
));
547 dout("mdsc get_session %p 0 -- FAIL\n", s
);
552 void ceph_put_mds_session(struct ceph_mds_session
*s
)
554 dout("mdsc put_session %p %d -> %d\n", s
,
555 refcount_read(&s
->s_ref
), refcount_read(&s
->s_ref
)-1);
556 if (refcount_dec_and_test(&s
->s_ref
)) {
557 if (s
->s_auth
.authorizer
)
558 ceph_auth_destroy_authorizer(s
->s_auth
.authorizer
);
564 * called under mdsc->mutex
566 struct ceph_mds_session
*__ceph_lookup_mds_session(struct ceph_mds_client
*mdsc
,
569 if (mds
>= mdsc
->max_sessions
|| !mdsc
->sessions
[mds
])
571 return get_session(mdsc
->sessions
[mds
]);
574 static bool __have_session(struct ceph_mds_client
*mdsc
, int mds
)
576 if (mds
>= mdsc
->max_sessions
|| !mdsc
->sessions
[mds
])
582 static int __verify_registered_session(struct ceph_mds_client
*mdsc
,
583 struct ceph_mds_session
*s
)
585 if (s
->s_mds
>= mdsc
->max_sessions
||
586 mdsc
->sessions
[s
->s_mds
] != s
)
592 * create+register a new session for given mds.
593 * called under mdsc->mutex.
595 static struct ceph_mds_session
*register_session(struct ceph_mds_client
*mdsc
,
598 struct ceph_mds_session
*s
;
600 if (mds
>= mdsc
->mdsmap
->m_num_mds
)
601 return ERR_PTR(-EINVAL
);
603 s
= kzalloc(sizeof(*s
), GFP_NOFS
);
605 return ERR_PTR(-ENOMEM
);
607 if (mds
>= mdsc
->max_sessions
) {
608 int newmax
= 1 << get_count_order(mds
+ 1);
609 struct ceph_mds_session
**sa
;
611 dout("%s: realloc to %d\n", __func__
, newmax
);
612 sa
= kcalloc(newmax
, sizeof(void *), GFP_NOFS
);
615 if (mdsc
->sessions
) {
616 memcpy(sa
, mdsc
->sessions
,
617 mdsc
->max_sessions
* sizeof(void *));
618 kfree(mdsc
->sessions
);
621 mdsc
->max_sessions
= newmax
;
624 dout("%s: mds%d\n", __func__
, mds
);
627 s
->s_state
= CEPH_MDS_SESSION_NEW
;
630 mutex_init(&s
->s_mutex
);
632 ceph_con_init(&s
->s_con
, s
, &mds_con_ops
, &mdsc
->fsc
->client
->msgr
);
634 spin_lock_init(&s
->s_gen_ttl_lock
);
636 s
->s_cap_ttl
= jiffies
- 1;
638 spin_lock_init(&s
->s_cap_lock
);
639 s
->s_renew_requested
= 0;
641 INIT_LIST_HEAD(&s
->s_caps
);
643 refcount_set(&s
->s_ref
, 1);
644 INIT_LIST_HEAD(&s
->s_waiting
);
645 INIT_LIST_HEAD(&s
->s_unsafe
);
646 s
->s_num_cap_releases
= 0;
647 s
->s_cap_reconnect
= 0;
648 s
->s_cap_iterator
= NULL
;
649 INIT_LIST_HEAD(&s
->s_cap_releases
);
650 INIT_WORK(&s
->s_cap_release_work
, ceph_cap_release_work
);
652 INIT_LIST_HEAD(&s
->s_cap_flushing
);
654 mdsc
->sessions
[mds
] = s
;
655 atomic_inc(&mdsc
->num_sessions
);
656 refcount_inc(&s
->s_ref
); /* one ref to sessions[], one to caller */
658 ceph_con_open(&s
->s_con
, CEPH_ENTITY_TYPE_MDS
, mds
,
659 ceph_mdsmap_get_addr(mdsc
->mdsmap
, mds
));
665 return ERR_PTR(-ENOMEM
);
669 * called under mdsc->mutex
671 static void __unregister_session(struct ceph_mds_client
*mdsc
,
672 struct ceph_mds_session
*s
)
674 dout("__unregister_session mds%d %p\n", s
->s_mds
, s
);
675 BUG_ON(mdsc
->sessions
[s
->s_mds
] != s
);
676 mdsc
->sessions
[s
->s_mds
] = NULL
;
678 ceph_con_close(&s
->s_con
);
679 ceph_put_mds_session(s
);
680 atomic_dec(&mdsc
->num_sessions
);
684 * drop session refs in request.
686 * should be last request ref, or hold mdsc->mutex
688 static void put_request_session(struct ceph_mds_request
*req
)
690 if (req
->r_session
) {
691 ceph_put_mds_session(req
->r_session
);
692 req
->r_session
= NULL
;
696 void ceph_mdsc_release_request(struct kref
*kref
)
698 struct ceph_mds_request
*req
= container_of(kref
,
699 struct ceph_mds_request
,
701 destroy_reply_info(&req
->r_reply_info
);
703 ceph_msg_put(req
->r_request
);
705 ceph_msg_put(req
->r_reply
);
707 ceph_put_cap_refs(ceph_inode(req
->r_inode
), CEPH_CAP_PIN
);
708 /* avoid calling iput_final() in mds dispatch threads */
709 ceph_async_iput(req
->r_inode
);
712 ceph_put_cap_refs(ceph_inode(req
->r_parent
), CEPH_CAP_PIN
);
713 ceph_async_iput(req
->r_parent
);
715 ceph_async_iput(req
->r_target_inode
);
718 if (req
->r_old_dentry
)
719 dput(req
->r_old_dentry
);
720 if (req
->r_old_dentry_dir
) {
722 * track (and drop pins for) r_old_dentry_dir
723 * separately, since r_old_dentry's d_parent may have
724 * changed between the dir mutex being dropped and
725 * this request being freed.
727 ceph_put_cap_refs(ceph_inode(req
->r_old_dentry_dir
),
729 ceph_async_iput(req
->r_old_dentry_dir
);
734 ceph_pagelist_release(req
->r_pagelist
);
735 put_request_session(req
);
736 ceph_unreserve_caps(req
->r_mdsc
, &req
->r_caps_reservation
);
737 WARN_ON_ONCE(!list_empty(&req
->r_wait
));
741 DEFINE_RB_FUNCS(request
, struct ceph_mds_request
, r_tid
, r_node
)
744 * lookup session, bump ref if found.
746 * called under mdsc->mutex.
748 static struct ceph_mds_request
*
749 lookup_get_request(struct ceph_mds_client
*mdsc
, u64 tid
)
751 struct ceph_mds_request
*req
;
753 req
= lookup_request(&mdsc
->request_tree
, tid
);
755 ceph_mdsc_get_request(req
);
761 * Register an in-flight request, and assign a tid. Link to directory
762 * are modifying (if any).
764 * Called under mdsc->mutex.
766 static void __register_request(struct ceph_mds_client
*mdsc
,
767 struct ceph_mds_request
*req
,
772 req
->r_tid
= ++mdsc
->last_tid
;
773 if (req
->r_num_caps
) {
774 ret
= ceph_reserve_caps(mdsc
, &req
->r_caps_reservation
,
777 pr_err("__register_request %p "
778 "failed to reserve caps: %d\n", req
, ret
);
779 /* set req->r_err to fail early from __do_request */
784 dout("__register_request %p tid %lld\n", req
, req
->r_tid
);
785 ceph_mdsc_get_request(req
);
786 insert_request(&mdsc
->request_tree
, req
);
788 req
->r_uid
= current_fsuid();
789 req
->r_gid
= current_fsgid();
791 if (mdsc
->oldest_tid
== 0 && req
->r_op
!= CEPH_MDS_OP_SETFILELOCK
)
792 mdsc
->oldest_tid
= req
->r_tid
;
796 req
->r_unsafe_dir
= dir
;
800 static void __unregister_request(struct ceph_mds_client
*mdsc
,
801 struct ceph_mds_request
*req
)
803 dout("__unregister_request %p tid %lld\n", req
, req
->r_tid
);
805 /* Never leave an unregistered request on an unsafe list! */
806 list_del_init(&req
->r_unsafe_item
);
808 if (req
->r_tid
== mdsc
->oldest_tid
) {
809 struct rb_node
*p
= rb_next(&req
->r_node
);
810 mdsc
->oldest_tid
= 0;
812 struct ceph_mds_request
*next_req
=
813 rb_entry(p
, struct ceph_mds_request
, r_node
);
814 if (next_req
->r_op
!= CEPH_MDS_OP_SETFILELOCK
) {
815 mdsc
->oldest_tid
= next_req
->r_tid
;
822 erase_request(&mdsc
->request_tree
, req
);
824 if (req
->r_unsafe_dir
&&
825 test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
826 struct ceph_inode_info
*ci
= ceph_inode(req
->r_unsafe_dir
);
827 spin_lock(&ci
->i_unsafe_lock
);
828 list_del_init(&req
->r_unsafe_dir_item
);
829 spin_unlock(&ci
->i_unsafe_lock
);
831 if (req
->r_target_inode
&&
832 test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
833 struct ceph_inode_info
*ci
= ceph_inode(req
->r_target_inode
);
834 spin_lock(&ci
->i_unsafe_lock
);
835 list_del_init(&req
->r_unsafe_target_item
);
836 spin_unlock(&ci
->i_unsafe_lock
);
839 if (req
->r_unsafe_dir
) {
840 /* avoid calling iput_final() in mds dispatch threads */
841 ceph_async_iput(req
->r_unsafe_dir
);
842 req
->r_unsafe_dir
= NULL
;
845 complete_all(&req
->r_safe_completion
);
847 ceph_mdsc_put_request(req
);
851 * Walk back up the dentry tree until we hit a dentry representing a
852 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
853 * when calling this) to ensure that the objects won't disappear while we're
854 * working with them. Once we hit a candidate dentry, we attempt to take a
855 * reference to it, and return that as the result.
857 static struct inode
*get_nonsnap_parent(struct dentry
*dentry
)
859 struct inode
*inode
= NULL
;
861 while (dentry
&& !IS_ROOT(dentry
)) {
862 inode
= d_inode_rcu(dentry
);
863 if (!inode
|| ceph_snap(inode
) == CEPH_NOSNAP
)
865 dentry
= dentry
->d_parent
;
868 inode
= igrab(inode
);
873 * Choose mds to send request to next. If there is a hint set in the
874 * request (e.g., due to a prior forward hint from the mds), use that.
875 * Otherwise, consult frag tree and/or caps to identify the
876 * appropriate mds. If all else fails, choose randomly.
878 * Called under mdsc->mutex.
880 static int __choose_mds(struct ceph_mds_client
*mdsc
,
881 struct ceph_mds_request
*req
)
884 struct ceph_inode_info
*ci
;
885 struct ceph_cap
*cap
;
886 int mode
= req
->r_direct_mode
;
888 u32 hash
= req
->r_direct_hash
;
889 bool is_hash
= test_bit(CEPH_MDS_R_DIRECT_IS_HASH
, &req
->r_req_flags
);
892 * is there a specific mds we should try? ignore hint if we have
893 * no session and the mds is not up (active or recovering).
895 if (req
->r_resend_mds
>= 0 &&
896 (__have_session(mdsc
, req
->r_resend_mds
) ||
897 ceph_mdsmap_get_state(mdsc
->mdsmap
, req
->r_resend_mds
) > 0)) {
898 dout("choose_mds using resend_mds mds%d\n",
900 return req
->r_resend_mds
;
903 if (mode
== USE_RANDOM_MDS
)
908 if (ceph_snap(req
->r_inode
) != CEPH_SNAPDIR
) {
909 inode
= req
->r_inode
;
912 /* req->r_dentry is non-null for LSSNAP request */
914 inode
= get_nonsnap_parent(req
->r_dentry
);
916 dout("__choose_mds using snapdir's parent %p\n", inode
);
918 } else if (req
->r_dentry
) {
919 /* ignore race with rename; old or new d_parent is okay */
920 struct dentry
*parent
;
924 parent
= READ_ONCE(req
->r_dentry
->d_parent
);
925 dir
= req
->r_parent
? : d_inode_rcu(parent
);
927 if (!dir
|| dir
->i_sb
!= mdsc
->fsc
->sb
) {
928 /* not this fs or parent went negative */
929 inode
= d_inode(req
->r_dentry
);
932 } else if (ceph_snap(dir
) != CEPH_NOSNAP
) {
933 /* direct snapped/virtual snapdir requests
934 * based on parent dir inode */
935 inode
= get_nonsnap_parent(parent
);
936 dout("__choose_mds using nonsnap parent %p\n", inode
);
939 inode
= d_inode(req
->r_dentry
);
940 if (!inode
|| mode
== USE_AUTH_MDS
) {
943 hash
= ceph_dentry_hash(dir
, req
->r_dentry
);
952 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode
, (int)is_hash
,
956 ci
= ceph_inode(inode
);
958 if (is_hash
&& S_ISDIR(inode
->i_mode
)) {
959 struct ceph_inode_frag frag
;
962 ceph_choose_frag(ci
, hash
, &frag
, &found
);
964 if (mode
== USE_ANY_MDS
&& frag
.ndist
> 0) {
967 /* choose a random replica */
968 get_random_bytes(&r
, 1);
971 dout("choose_mds %p %llx.%llx "
972 "frag %u mds%d (%d/%d)\n",
973 inode
, ceph_vinop(inode
),
976 if (ceph_mdsmap_get_state(mdsc
->mdsmap
, mds
) >=
977 CEPH_MDS_STATE_ACTIVE
)
981 /* since this file/dir wasn't known to be
982 * replicated, then we want to look for the
983 * authoritative mds. */
986 /* choose auth mds */
988 dout("choose_mds %p %llx.%llx "
989 "frag %u mds%d (auth)\n",
990 inode
, ceph_vinop(inode
), frag
.frag
, mds
);
991 if (ceph_mdsmap_get_state(mdsc
->mdsmap
, mds
) >=
992 CEPH_MDS_STATE_ACTIVE
)
998 spin_lock(&ci
->i_ceph_lock
);
1000 if (mode
== USE_AUTH_MDS
)
1001 cap
= ci
->i_auth_cap
;
1002 if (!cap
&& !RB_EMPTY_ROOT(&ci
->i_caps
))
1003 cap
= rb_entry(rb_first(&ci
->i_caps
), struct ceph_cap
, ci_node
);
1005 spin_unlock(&ci
->i_ceph_lock
);
1006 ceph_async_iput(inode
);
1009 mds
= cap
->session
->s_mds
;
1010 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
1011 inode
, ceph_vinop(inode
), mds
,
1012 cap
== ci
->i_auth_cap
? "auth " : "", cap
);
1013 spin_unlock(&ci
->i_ceph_lock
);
1015 /* avoid calling iput_final() while holding mdsc->mutex or
1016 * in mds dispatch threads */
1017 ceph_async_iput(inode
);
1021 mds
= ceph_mdsmap_get_random_mds(mdsc
->mdsmap
);
1022 dout("choose_mds chose random mds%d\n", mds
);
1030 static struct ceph_msg
*create_session_msg(u32 op
, u64 seq
)
1032 struct ceph_msg
*msg
;
1033 struct ceph_mds_session_head
*h
;
1035 msg
= ceph_msg_new(CEPH_MSG_CLIENT_SESSION
, sizeof(*h
), GFP_NOFS
,
1038 pr_err("create_session_msg ENOMEM creating msg\n");
1041 h
= msg
->front
.iov_base
;
1042 h
->op
= cpu_to_le32(op
);
1043 h
->seq
= cpu_to_le64(seq
);
1048 static void encode_supported_features(void **p
, void *end
)
1050 static const unsigned char bits
[] = CEPHFS_FEATURES_CLIENT_SUPPORTED
;
1051 static const size_t count
= ARRAY_SIZE(bits
);
1055 size_t size
= ((size_t)bits
[count
- 1] + 64) / 64 * 8;
1057 BUG_ON(*p
+ 4 + size
> end
);
1058 ceph_encode_32(p
, size
);
1059 memset(*p
, 0, size
);
1060 for (i
= 0; i
< count
; i
++)
1061 ((unsigned char*)(*p
))[i
/ 8] |= 1 << (bits
[i
] % 8);
1064 BUG_ON(*p
+ 4 > end
);
1065 ceph_encode_32(p
, 0);
1070 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1071 * to include additional client metadata fields.
1073 static struct ceph_msg
*create_session_open_msg(struct ceph_mds_client
*mdsc
, u64 seq
)
1075 struct ceph_msg
*msg
;
1076 struct ceph_mds_session_head
*h
;
1078 int extra_bytes
= 0;
1079 int metadata_key_count
= 0;
1080 struct ceph_options
*opt
= mdsc
->fsc
->client
->options
;
1081 struct ceph_mount_options
*fsopt
= mdsc
->fsc
->mount_options
;
1084 const char* metadata
[][2] = {
1085 {"hostname", mdsc
->nodename
},
1086 {"kernel_version", init_utsname()->release
},
1087 {"entity_id", opt
->name
? : ""},
1088 {"root", fsopt
->server_path
? : "/"},
1092 /* Calculate serialized length of metadata */
1093 extra_bytes
= 4; /* map length */
1094 for (i
= 0; metadata
[i
][0]; ++i
) {
1095 extra_bytes
+= 8 + strlen(metadata
[i
][0]) +
1096 strlen(metadata
[i
][1]);
1097 metadata_key_count
++;
1099 /* supported feature */
1100 extra_bytes
+= 4 + 8;
1102 /* Allocate the message */
1103 msg
= ceph_msg_new(CEPH_MSG_CLIENT_SESSION
, sizeof(*h
) + extra_bytes
,
1106 pr_err("create_session_msg ENOMEM creating msg\n");
1109 p
= msg
->front
.iov_base
;
1110 end
= p
+ msg
->front
.iov_len
;
1113 h
->op
= cpu_to_le32(CEPH_SESSION_REQUEST_OPEN
);
1114 h
->seq
= cpu_to_le64(seq
);
1117 * Serialize client metadata into waiting buffer space, using
1118 * the format that userspace expects for map<string, string>
1120 * ClientSession messages with metadata are v2
1122 msg
->hdr
.version
= cpu_to_le16(3);
1123 msg
->hdr
.compat_version
= cpu_to_le16(1);
1125 /* The write pointer, following the session_head structure */
1128 /* Number of entries in the map */
1129 ceph_encode_32(&p
, metadata_key_count
);
1131 /* Two length-prefixed strings for each entry in the map */
1132 for (i
= 0; metadata
[i
][0]; ++i
) {
1133 size_t const key_len
= strlen(metadata
[i
][0]);
1134 size_t const val_len
= strlen(metadata
[i
][1]);
1136 ceph_encode_32(&p
, key_len
);
1137 memcpy(p
, metadata
[i
][0], key_len
);
1139 ceph_encode_32(&p
, val_len
);
1140 memcpy(p
, metadata
[i
][1], val_len
);
1144 encode_supported_features(&p
, end
);
1145 msg
->front
.iov_len
= p
- msg
->front
.iov_base
;
1146 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
1152 * send session open request.
1154 * called under mdsc->mutex
1156 static int __open_session(struct ceph_mds_client
*mdsc
,
1157 struct ceph_mds_session
*session
)
1159 struct ceph_msg
*msg
;
1161 int mds
= session
->s_mds
;
1163 /* wait for mds to go active? */
1164 mstate
= ceph_mdsmap_get_state(mdsc
->mdsmap
, mds
);
1165 dout("open_session to mds%d (%s)\n", mds
,
1166 ceph_mds_state_name(mstate
));
1167 session
->s_state
= CEPH_MDS_SESSION_OPENING
;
1168 session
->s_renew_requested
= jiffies
;
1170 /* send connect message */
1171 msg
= create_session_open_msg(mdsc
, session
->s_seq
);
1174 ceph_con_send(&session
->s_con
, msg
);
1179 * open sessions for any export targets for the given mds
1181 * called under mdsc->mutex
1183 static struct ceph_mds_session
*
1184 __open_export_target_session(struct ceph_mds_client
*mdsc
, int target
)
1186 struct ceph_mds_session
*session
;
1188 session
= __ceph_lookup_mds_session(mdsc
, target
);
1190 session
= register_session(mdsc
, target
);
1191 if (IS_ERR(session
))
1194 if (session
->s_state
== CEPH_MDS_SESSION_NEW
||
1195 session
->s_state
== CEPH_MDS_SESSION_CLOSING
)
1196 __open_session(mdsc
, session
);
1201 struct ceph_mds_session
*
1202 ceph_mdsc_open_export_target_session(struct ceph_mds_client
*mdsc
, int target
)
1204 struct ceph_mds_session
*session
;
1206 dout("open_export_target_session to mds%d\n", target
);
1208 mutex_lock(&mdsc
->mutex
);
1209 session
= __open_export_target_session(mdsc
, target
);
1210 mutex_unlock(&mdsc
->mutex
);
1215 static void __open_export_target_sessions(struct ceph_mds_client
*mdsc
,
1216 struct ceph_mds_session
*session
)
1218 struct ceph_mds_info
*mi
;
1219 struct ceph_mds_session
*ts
;
1220 int i
, mds
= session
->s_mds
;
1222 if (mds
>= mdsc
->mdsmap
->m_num_mds
)
1225 mi
= &mdsc
->mdsmap
->m_info
[mds
];
1226 dout("open_export_target_sessions for mds%d (%d targets)\n",
1227 session
->s_mds
, mi
->num_export_targets
);
1229 for (i
= 0; i
< mi
->num_export_targets
; i
++) {
1230 ts
= __open_export_target_session(mdsc
, mi
->export_targets
[i
]);
1232 ceph_put_mds_session(ts
);
1236 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client
*mdsc
,
1237 struct ceph_mds_session
*session
)
1239 mutex_lock(&mdsc
->mutex
);
1240 __open_export_target_sessions(mdsc
, session
);
1241 mutex_unlock(&mdsc
->mutex
);
1248 static void detach_cap_releases(struct ceph_mds_session
*session
,
1249 struct list_head
*target
)
1251 lockdep_assert_held(&session
->s_cap_lock
);
1253 list_splice_init(&session
->s_cap_releases
, target
);
1254 session
->s_num_cap_releases
= 0;
1255 dout("dispose_cap_releases mds%d\n", session
->s_mds
);
1258 static void dispose_cap_releases(struct ceph_mds_client
*mdsc
,
1259 struct list_head
*dispose
)
1261 while (!list_empty(dispose
)) {
1262 struct ceph_cap
*cap
;
1263 /* zero out the in-progress message */
1264 cap
= list_first_entry(dispose
, struct ceph_cap
, session_caps
);
1265 list_del(&cap
->session_caps
);
1266 ceph_put_cap(mdsc
, cap
);
1270 static void cleanup_session_requests(struct ceph_mds_client
*mdsc
,
1271 struct ceph_mds_session
*session
)
1273 struct ceph_mds_request
*req
;
1275 struct ceph_inode_info
*ci
;
1277 dout("cleanup_session_requests mds%d\n", session
->s_mds
);
1278 mutex_lock(&mdsc
->mutex
);
1279 while (!list_empty(&session
->s_unsafe
)) {
1280 req
= list_first_entry(&session
->s_unsafe
,
1281 struct ceph_mds_request
, r_unsafe_item
);
1282 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1284 if (req
->r_target_inode
) {
1285 /* dropping unsafe change of inode's attributes */
1286 ci
= ceph_inode(req
->r_target_inode
);
1287 errseq_set(&ci
->i_meta_err
, -EIO
);
1289 if (req
->r_unsafe_dir
) {
1290 /* dropping unsafe directory operation */
1291 ci
= ceph_inode(req
->r_unsafe_dir
);
1292 errseq_set(&ci
->i_meta_err
, -EIO
);
1294 __unregister_request(mdsc
, req
);
1296 /* zero r_attempts, so kick_requests() will re-send requests */
1297 p
= rb_first(&mdsc
->request_tree
);
1299 req
= rb_entry(p
, struct ceph_mds_request
, r_node
);
1301 if (req
->r_session
&&
1302 req
->r_session
->s_mds
== session
->s_mds
)
1303 req
->r_attempts
= 0;
1305 mutex_unlock(&mdsc
->mutex
);
1309 * Helper to safely iterate over all caps associated with a session, with
1310 * special care taken to handle a racing __ceph_remove_cap().
1312 * Caller must hold session s_mutex.
1314 int ceph_iterate_session_caps(struct ceph_mds_session
*session
,
1315 int (*cb
)(struct inode
*, struct ceph_cap
*,
1318 struct list_head
*p
;
1319 struct ceph_cap
*cap
;
1320 struct inode
*inode
, *last_inode
= NULL
;
1321 struct ceph_cap
*old_cap
= NULL
;
1324 dout("iterate_session_caps %p mds%d\n", session
, session
->s_mds
);
1325 spin_lock(&session
->s_cap_lock
);
1326 p
= session
->s_caps
.next
;
1327 while (p
!= &session
->s_caps
) {
1328 cap
= list_entry(p
, struct ceph_cap
, session_caps
);
1329 inode
= igrab(&cap
->ci
->vfs_inode
);
1334 session
->s_cap_iterator
= cap
;
1335 spin_unlock(&session
->s_cap_lock
);
1338 /* avoid calling iput_final() while holding
1339 * s_mutex or in mds dispatch threads */
1340 ceph_async_iput(last_inode
);
1344 ceph_put_cap(session
->s_mdsc
, old_cap
);
1348 ret
= cb(inode
, cap
, arg
);
1351 spin_lock(&session
->s_cap_lock
);
1354 dout("iterate_session_caps finishing cap %p removal\n",
1356 BUG_ON(cap
->session
!= session
);
1357 cap
->session
= NULL
;
1358 list_del_init(&cap
->session_caps
);
1359 session
->s_nr_caps
--;
1360 if (cap
->queue_release
)
1361 __ceph_queue_cap_release(session
, cap
);
1363 old_cap
= cap
; /* put_cap it w/o locks held */
1370 session
->s_cap_iterator
= NULL
;
1371 spin_unlock(&session
->s_cap_lock
);
1373 ceph_async_iput(last_inode
);
1375 ceph_put_cap(session
->s_mdsc
, old_cap
);
1380 static int remove_session_caps_cb(struct inode
*inode
, struct ceph_cap
*cap
,
1383 struct ceph_fs_client
*fsc
= (struct ceph_fs_client
*)arg
;
1384 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1385 LIST_HEAD(to_remove
);
1386 bool dirty_dropped
= false;
1387 bool invalidate
= false;
1389 dout("removing cap %p, ci is %p, inode is %p\n",
1390 cap
, ci
, &ci
->vfs_inode
);
1391 spin_lock(&ci
->i_ceph_lock
);
1392 if (cap
->mds_wanted
| cap
->issued
)
1393 ci
->i_ceph_flags
|= CEPH_I_CAP_DROPPED
;
1394 __ceph_remove_cap(cap
, false);
1395 if (!ci
->i_auth_cap
) {
1396 struct ceph_cap_flush
*cf
;
1397 struct ceph_mds_client
*mdsc
= fsc
->mdsc
;
1399 if (READ_ONCE(fsc
->mount_state
) == CEPH_MOUNT_SHUTDOWN
) {
1400 if (inode
->i_data
.nrpages
> 0)
1402 if (ci
->i_wrbuffer_ref
> 0)
1403 mapping_set_error(&inode
->i_data
, -EIO
);
1406 while (!list_empty(&ci
->i_cap_flush_list
)) {
1407 cf
= list_first_entry(&ci
->i_cap_flush_list
,
1408 struct ceph_cap_flush
, i_list
);
1409 list_move(&cf
->i_list
, &to_remove
);
1412 spin_lock(&mdsc
->cap_dirty_lock
);
1414 list_for_each_entry(cf
, &to_remove
, i_list
)
1415 list_del(&cf
->g_list
);
1417 if (!list_empty(&ci
->i_dirty_item
)) {
1418 pr_warn_ratelimited(
1419 " dropping dirty %s state for %p %lld\n",
1420 ceph_cap_string(ci
->i_dirty_caps
),
1421 inode
, ceph_ino(inode
));
1422 ci
->i_dirty_caps
= 0;
1423 list_del_init(&ci
->i_dirty_item
);
1424 dirty_dropped
= true;
1426 if (!list_empty(&ci
->i_flushing_item
)) {
1427 pr_warn_ratelimited(
1428 " dropping dirty+flushing %s state for %p %lld\n",
1429 ceph_cap_string(ci
->i_flushing_caps
),
1430 inode
, ceph_ino(inode
));
1431 ci
->i_flushing_caps
= 0;
1432 list_del_init(&ci
->i_flushing_item
);
1433 mdsc
->num_cap_flushing
--;
1434 dirty_dropped
= true;
1436 spin_unlock(&mdsc
->cap_dirty_lock
);
1438 if (dirty_dropped
) {
1439 errseq_set(&ci
->i_meta_err
, -EIO
);
1441 if (ci
->i_wrbuffer_ref_head
== 0 &&
1442 ci
->i_wr_ref
== 0 &&
1443 ci
->i_dirty_caps
== 0 &&
1444 ci
->i_flushing_caps
== 0) {
1445 ceph_put_snap_context(ci
->i_head_snapc
);
1446 ci
->i_head_snapc
= NULL
;
1450 if (atomic_read(&ci
->i_filelock_ref
) > 0) {
1451 /* make further file lock syscall return -EIO */
1452 ci
->i_ceph_flags
|= CEPH_I_ERROR_FILELOCK
;
1453 pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1454 inode
, ceph_ino(inode
));
1457 if (!ci
->i_dirty_caps
&& ci
->i_prealloc_cap_flush
) {
1458 list_add(&ci
->i_prealloc_cap_flush
->i_list
, &to_remove
);
1459 ci
->i_prealloc_cap_flush
= NULL
;
1462 spin_unlock(&ci
->i_ceph_lock
);
1463 while (!list_empty(&to_remove
)) {
1464 struct ceph_cap_flush
*cf
;
1465 cf
= list_first_entry(&to_remove
,
1466 struct ceph_cap_flush
, i_list
);
1467 list_del(&cf
->i_list
);
1468 ceph_free_cap_flush(cf
);
1471 wake_up_all(&ci
->i_cap_wq
);
1473 ceph_queue_invalidate(inode
);
1480 * caller must hold session s_mutex
1482 static void remove_session_caps(struct ceph_mds_session
*session
)
1484 struct ceph_fs_client
*fsc
= session
->s_mdsc
->fsc
;
1485 struct super_block
*sb
= fsc
->sb
;
1488 dout("remove_session_caps on %p\n", session
);
1489 ceph_iterate_session_caps(session
, remove_session_caps_cb
, fsc
);
1491 wake_up_all(&fsc
->mdsc
->cap_flushing_wq
);
1493 spin_lock(&session
->s_cap_lock
);
1494 if (session
->s_nr_caps
> 0) {
1495 struct inode
*inode
;
1496 struct ceph_cap
*cap
, *prev
= NULL
;
1497 struct ceph_vino vino
;
1499 * iterate_session_caps() skips inodes that are being
1500 * deleted, we need to wait until deletions are complete.
1501 * __wait_on_freeing_inode() is designed for the job,
1502 * but it is not exported, so use lookup inode function
1505 while (!list_empty(&session
->s_caps
)) {
1506 cap
= list_entry(session
->s_caps
.next
,
1507 struct ceph_cap
, session_caps
);
1511 vino
= cap
->ci
->i_vino
;
1512 spin_unlock(&session
->s_cap_lock
);
1514 inode
= ceph_find_inode(sb
, vino
);
1515 /* avoid calling iput_final() while holding s_mutex */
1516 ceph_async_iput(inode
);
1518 spin_lock(&session
->s_cap_lock
);
1522 // drop cap expires and unlock s_cap_lock
1523 detach_cap_releases(session
, &dispose
);
1525 BUG_ON(session
->s_nr_caps
> 0);
1526 BUG_ON(!list_empty(&session
->s_cap_flushing
));
1527 spin_unlock(&session
->s_cap_lock
);
1528 dispose_cap_releases(session
->s_mdsc
, &dispose
);
1538 * wake up any threads waiting on this session's caps. if the cap is
1539 * old (didn't get renewed on the client reconnect), remove it now.
1541 * caller must hold s_mutex.
1543 static int wake_up_session_cb(struct inode
*inode
, struct ceph_cap
*cap
,
1546 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1547 unsigned long ev
= (unsigned long)arg
;
1549 if (ev
== RECONNECT
) {
1550 spin_lock(&ci
->i_ceph_lock
);
1551 ci
->i_wanted_max_size
= 0;
1552 ci
->i_requested_max_size
= 0;
1553 spin_unlock(&ci
->i_ceph_lock
);
1554 } else if (ev
== RENEWCAPS
) {
1555 if (cap
->cap_gen
< cap
->session
->s_cap_gen
) {
1556 /* mds did not re-issue stale cap */
1557 spin_lock(&ci
->i_ceph_lock
);
1558 cap
->issued
= cap
->implemented
= CEPH_CAP_PIN
;
1559 /* make sure mds knows what we want */
1560 if (__ceph_caps_file_wanted(ci
) & ~cap
->mds_wanted
)
1561 ci
->i_ceph_flags
|= CEPH_I_CAP_DROPPED
;
1562 spin_unlock(&ci
->i_ceph_lock
);
1564 } else if (ev
== FORCE_RO
) {
1566 wake_up_all(&ci
->i_cap_wq
);
1570 static void wake_up_session_caps(struct ceph_mds_session
*session
, int ev
)
1572 dout("wake_up_session_caps %p mds%d\n", session
, session
->s_mds
);
1573 ceph_iterate_session_caps(session
, wake_up_session_cb
,
1574 (void *)(unsigned long)ev
);
1578 * Send periodic message to MDS renewing all currently held caps. The
1579 * ack will reset the expiration for all caps from this session.
1581 * caller holds s_mutex
1583 static int send_renew_caps(struct ceph_mds_client
*mdsc
,
1584 struct ceph_mds_session
*session
)
1586 struct ceph_msg
*msg
;
1589 if (time_after_eq(jiffies
, session
->s_cap_ttl
) &&
1590 time_after_eq(session
->s_cap_ttl
, session
->s_renew_requested
))
1591 pr_info("mds%d caps stale\n", session
->s_mds
);
1592 session
->s_renew_requested
= jiffies
;
1594 /* do not try to renew caps until a recovering mds has reconnected
1595 * with its clients. */
1596 state
= ceph_mdsmap_get_state(mdsc
->mdsmap
, session
->s_mds
);
1597 if (state
< CEPH_MDS_STATE_RECONNECT
) {
1598 dout("send_renew_caps ignoring mds%d (%s)\n",
1599 session
->s_mds
, ceph_mds_state_name(state
));
1603 dout("send_renew_caps to mds%d (%s)\n", session
->s_mds
,
1604 ceph_mds_state_name(state
));
1605 msg
= create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS
,
1606 ++session
->s_renew_seq
);
1609 ceph_con_send(&session
->s_con
, msg
);
1613 static int send_flushmsg_ack(struct ceph_mds_client
*mdsc
,
1614 struct ceph_mds_session
*session
, u64 seq
)
1616 struct ceph_msg
*msg
;
1618 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1619 session
->s_mds
, ceph_session_state_name(session
->s_state
), seq
);
1620 msg
= create_session_msg(CEPH_SESSION_FLUSHMSG_ACK
, seq
);
1623 ceph_con_send(&session
->s_con
, msg
);
1629 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1631 * Called under session->s_mutex
1633 static void renewed_caps(struct ceph_mds_client
*mdsc
,
1634 struct ceph_mds_session
*session
, int is_renew
)
1639 spin_lock(&session
->s_cap_lock
);
1640 was_stale
= is_renew
&& time_after_eq(jiffies
, session
->s_cap_ttl
);
1642 session
->s_cap_ttl
= session
->s_renew_requested
+
1643 mdsc
->mdsmap
->m_session_timeout
*HZ
;
1646 if (time_before(jiffies
, session
->s_cap_ttl
)) {
1647 pr_info("mds%d caps renewed\n", session
->s_mds
);
1650 pr_info("mds%d caps still stale\n", session
->s_mds
);
1653 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1654 session
->s_mds
, session
->s_cap_ttl
, was_stale
? "stale" : "fresh",
1655 time_before(jiffies
, session
->s_cap_ttl
) ? "stale" : "fresh");
1656 spin_unlock(&session
->s_cap_lock
);
1659 wake_up_session_caps(session
, RENEWCAPS
);
1663 * send a session close request
1665 static int request_close_session(struct ceph_mds_client
*mdsc
,
1666 struct ceph_mds_session
*session
)
1668 struct ceph_msg
*msg
;
1670 dout("request_close_session mds%d state %s seq %lld\n",
1671 session
->s_mds
, ceph_session_state_name(session
->s_state
),
1673 msg
= create_session_msg(CEPH_SESSION_REQUEST_CLOSE
, session
->s_seq
);
1676 ceph_con_send(&session
->s_con
, msg
);
1681 * Called with s_mutex held.
1683 static int __close_session(struct ceph_mds_client
*mdsc
,
1684 struct ceph_mds_session
*session
)
1686 if (session
->s_state
>= CEPH_MDS_SESSION_CLOSING
)
1688 session
->s_state
= CEPH_MDS_SESSION_CLOSING
;
1689 return request_close_session(mdsc
, session
);
1692 static bool drop_negative_children(struct dentry
*dentry
)
1694 struct dentry
*child
;
1695 bool all_negative
= true;
1697 if (!d_is_dir(dentry
))
1700 spin_lock(&dentry
->d_lock
);
1701 list_for_each_entry(child
, &dentry
->d_subdirs
, d_child
) {
1702 if (d_really_is_positive(child
)) {
1703 all_negative
= false;
1707 spin_unlock(&dentry
->d_lock
);
1710 shrink_dcache_parent(dentry
);
1712 return all_negative
;
1716 * Trim old(er) caps.
1718 * Because we can't cache an inode without one or more caps, we do
1719 * this indirectly: if a cap is unused, we prune its aliases, at which
1720 * point the inode will hopefully get dropped to.
1722 * Yes, this is a bit sloppy. Our only real goal here is to respond to
1723 * memory pressure from the MDS, though, so it needn't be perfect.
1725 static int trim_caps_cb(struct inode
*inode
, struct ceph_cap
*cap
, void *arg
)
1727 int *remaining
= arg
;
1728 struct ceph_inode_info
*ci
= ceph_inode(inode
);
1729 int used
, wanted
, oissued
, mine
;
1731 if (*remaining
<= 0)
1734 spin_lock(&ci
->i_ceph_lock
);
1735 mine
= cap
->issued
| cap
->implemented
;
1736 used
= __ceph_caps_used(ci
);
1737 wanted
= __ceph_caps_file_wanted(ci
);
1738 oissued
= __ceph_caps_issued_other(ci
, cap
);
1740 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1741 inode
, cap
, ceph_cap_string(mine
), ceph_cap_string(oissued
),
1742 ceph_cap_string(used
), ceph_cap_string(wanted
));
1743 if (cap
== ci
->i_auth_cap
) {
1744 if (ci
->i_dirty_caps
|| ci
->i_flushing_caps
||
1745 !list_empty(&ci
->i_cap_snaps
))
1747 if ((used
| wanted
) & CEPH_CAP_ANY_WR
)
1749 /* Note: it's possible that i_filelock_ref becomes non-zero
1750 * after dropping auth caps. It doesn't hurt because reply
1751 * of lock mds request will re-add auth caps. */
1752 if (atomic_read(&ci
->i_filelock_ref
) > 0)
1755 /* The inode has cached pages, but it's no longer used.
1756 * we can safely drop it */
1757 if (wanted
== 0 && used
== CEPH_CAP_FILE_CACHE
&&
1758 !(oissued
& CEPH_CAP_FILE_CACHE
)) {
1762 if ((used
| wanted
) & ~oissued
& mine
)
1763 goto out
; /* we need these caps */
1766 /* we aren't the only cap.. just remove us */
1767 __ceph_remove_cap(cap
, true);
1770 struct dentry
*dentry
;
1771 /* try dropping referring dentries */
1772 spin_unlock(&ci
->i_ceph_lock
);
1773 dentry
= d_find_any_alias(inode
);
1774 if (dentry
&& drop_negative_children(dentry
)) {
1777 d_prune_aliases(inode
);
1778 count
= atomic_read(&inode
->i_count
);
1781 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1790 spin_unlock(&ci
->i_ceph_lock
);
1795 * Trim session cap count down to some max number.
1797 int ceph_trim_caps(struct ceph_mds_client
*mdsc
,
1798 struct ceph_mds_session
*session
,
1801 int trim_caps
= session
->s_nr_caps
- max_caps
;
1803 dout("trim_caps mds%d start: %d / %d, trim %d\n",
1804 session
->s_mds
, session
->s_nr_caps
, max_caps
, trim_caps
);
1805 if (trim_caps
> 0) {
1806 int remaining
= trim_caps
;
1808 ceph_iterate_session_caps(session
, trim_caps_cb
, &remaining
);
1809 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1810 session
->s_mds
, session
->s_nr_caps
, max_caps
,
1811 trim_caps
- remaining
);
1814 ceph_flush_cap_releases(mdsc
, session
);
1818 static int check_caps_flush(struct ceph_mds_client
*mdsc
,
1823 spin_lock(&mdsc
->cap_dirty_lock
);
1824 if (!list_empty(&mdsc
->cap_flush_list
)) {
1825 struct ceph_cap_flush
*cf
=
1826 list_first_entry(&mdsc
->cap_flush_list
,
1827 struct ceph_cap_flush
, g_list
);
1828 if (cf
->tid
<= want_flush_tid
) {
1829 dout("check_caps_flush still flushing tid "
1830 "%llu <= %llu\n", cf
->tid
, want_flush_tid
);
1834 spin_unlock(&mdsc
->cap_dirty_lock
);
1839 * flush all dirty inode data to disk.
1841 * returns true if we've flushed through want_flush_tid
1843 static void wait_caps_flush(struct ceph_mds_client
*mdsc
,
1846 dout("check_caps_flush want %llu\n", want_flush_tid
);
1848 wait_event(mdsc
->cap_flushing_wq
,
1849 check_caps_flush(mdsc
, want_flush_tid
));
1851 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid
);
1855 * called under s_mutex
1857 static void ceph_send_cap_releases(struct ceph_mds_client
*mdsc
,
1858 struct ceph_mds_session
*session
)
1860 struct ceph_msg
*msg
= NULL
;
1861 struct ceph_mds_cap_release
*head
;
1862 struct ceph_mds_cap_item
*item
;
1863 struct ceph_osd_client
*osdc
= &mdsc
->fsc
->client
->osdc
;
1864 struct ceph_cap
*cap
;
1865 LIST_HEAD(tmp_list
);
1866 int num_cap_releases
;
1867 __le32 barrier
, *cap_barrier
;
1869 down_read(&osdc
->lock
);
1870 barrier
= cpu_to_le32(osdc
->epoch_barrier
);
1871 up_read(&osdc
->lock
);
1873 spin_lock(&session
->s_cap_lock
);
1875 list_splice_init(&session
->s_cap_releases
, &tmp_list
);
1876 num_cap_releases
= session
->s_num_cap_releases
;
1877 session
->s_num_cap_releases
= 0;
1878 spin_unlock(&session
->s_cap_lock
);
1880 while (!list_empty(&tmp_list
)) {
1882 msg
= ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE
,
1883 PAGE_SIZE
, GFP_NOFS
, false);
1886 head
= msg
->front
.iov_base
;
1887 head
->num
= cpu_to_le32(0);
1888 msg
->front
.iov_len
= sizeof(*head
);
1890 msg
->hdr
.version
= cpu_to_le16(2);
1891 msg
->hdr
.compat_version
= cpu_to_le16(1);
1894 cap
= list_first_entry(&tmp_list
, struct ceph_cap
,
1896 list_del(&cap
->session_caps
);
1899 head
= msg
->front
.iov_base
;
1900 put_unaligned_le32(get_unaligned_le32(&head
->num
) + 1,
1902 item
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
1903 item
->ino
= cpu_to_le64(cap
->cap_ino
);
1904 item
->cap_id
= cpu_to_le64(cap
->cap_id
);
1905 item
->migrate_seq
= cpu_to_le32(cap
->mseq
);
1906 item
->seq
= cpu_to_le32(cap
->issue_seq
);
1907 msg
->front
.iov_len
+= sizeof(*item
);
1909 ceph_put_cap(mdsc
, cap
);
1911 if (le32_to_cpu(head
->num
) == CEPH_CAPS_PER_RELEASE
) {
1912 // Append cap_barrier field
1913 cap_barrier
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
1914 *cap_barrier
= barrier
;
1915 msg
->front
.iov_len
+= sizeof(*cap_barrier
);
1917 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
1918 dout("send_cap_releases mds%d %p\n", session
->s_mds
, msg
);
1919 ceph_con_send(&session
->s_con
, msg
);
1924 BUG_ON(num_cap_releases
!= 0);
1926 spin_lock(&session
->s_cap_lock
);
1927 if (!list_empty(&session
->s_cap_releases
))
1929 spin_unlock(&session
->s_cap_lock
);
1932 // Append cap_barrier field
1933 cap_barrier
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
1934 *cap_barrier
= barrier
;
1935 msg
->front
.iov_len
+= sizeof(*cap_barrier
);
1937 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
1938 dout("send_cap_releases mds%d %p\n", session
->s_mds
, msg
);
1939 ceph_con_send(&session
->s_con
, msg
);
1943 pr_err("send_cap_releases mds%d, failed to allocate message\n",
1945 spin_lock(&session
->s_cap_lock
);
1946 list_splice(&tmp_list
, &session
->s_cap_releases
);
1947 session
->s_num_cap_releases
+= num_cap_releases
;
1948 spin_unlock(&session
->s_cap_lock
);
1951 static void ceph_cap_release_work(struct work_struct
*work
)
1953 struct ceph_mds_session
*session
=
1954 container_of(work
, struct ceph_mds_session
, s_cap_release_work
);
1956 mutex_lock(&session
->s_mutex
);
1957 if (session
->s_state
== CEPH_MDS_SESSION_OPEN
||
1958 session
->s_state
== CEPH_MDS_SESSION_HUNG
)
1959 ceph_send_cap_releases(session
->s_mdsc
, session
);
1960 mutex_unlock(&session
->s_mutex
);
1961 ceph_put_mds_session(session
);
1964 void ceph_flush_cap_releases(struct ceph_mds_client
*mdsc
,
1965 struct ceph_mds_session
*session
)
1970 get_session(session
);
1971 if (queue_work(mdsc
->fsc
->cap_wq
,
1972 &session
->s_cap_release_work
)) {
1973 dout("cap release work queued\n");
1975 ceph_put_mds_session(session
);
1976 dout("failed to queue cap release work\n");
1981 * caller holds session->s_cap_lock
1983 void __ceph_queue_cap_release(struct ceph_mds_session
*session
,
1984 struct ceph_cap
*cap
)
1986 list_add_tail(&cap
->session_caps
, &session
->s_cap_releases
);
1987 session
->s_num_cap_releases
++;
1989 if (!(session
->s_num_cap_releases
% CEPH_CAPS_PER_RELEASE
))
1990 ceph_flush_cap_releases(session
->s_mdsc
, session
);
1993 static void ceph_cap_reclaim_work(struct work_struct
*work
)
1995 struct ceph_mds_client
*mdsc
=
1996 container_of(work
, struct ceph_mds_client
, cap_reclaim_work
);
1997 int ret
= ceph_trim_dentries(mdsc
);
1999 ceph_queue_cap_reclaim_work(mdsc
);
2002 void ceph_queue_cap_reclaim_work(struct ceph_mds_client
*mdsc
)
2007 if (queue_work(mdsc
->fsc
->cap_wq
, &mdsc
->cap_reclaim_work
)) {
2008 dout("caps reclaim work queued\n");
2010 dout("failed to queue caps release work\n");
2014 void ceph_reclaim_caps_nr(struct ceph_mds_client
*mdsc
, int nr
)
2019 val
= atomic_add_return(nr
, &mdsc
->cap_reclaim_pending
);
2020 if ((val
% CEPH_CAPS_PER_RELEASE
) < nr
) {
2021 atomic_set(&mdsc
->cap_reclaim_pending
, 0);
2022 ceph_queue_cap_reclaim_work(mdsc
);
2030 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request
*req
,
2033 struct ceph_inode_info
*ci
= ceph_inode(dir
);
2034 struct ceph_mds_reply_info_parsed
*rinfo
= &req
->r_reply_info
;
2035 struct ceph_mount_options
*opt
= req
->r_mdsc
->fsc
->mount_options
;
2036 size_t size
= sizeof(struct ceph_mds_reply_dir_entry
);
2037 unsigned int num_entries
;
2040 spin_lock(&ci
->i_ceph_lock
);
2041 num_entries
= ci
->i_files
+ ci
->i_subdirs
;
2042 spin_unlock(&ci
->i_ceph_lock
);
2043 num_entries
= max(num_entries
, 1U);
2044 num_entries
= min(num_entries
, opt
->max_readdir
);
2046 order
= get_order(size
* num_entries
);
2047 while (order
>= 0) {
2048 rinfo
->dir_entries
= (void*)__get_free_pages(GFP_KERNEL
|
2051 if (rinfo
->dir_entries
)
2055 if (!rinfo
->dir_entries
)
2058 num_entries
= (PAGE_SIZE
<< order
) / size
;
2059 num_entries
= min(num_entries
, opt
->max_readdir
);
2061 rinfo
->dir_buf_size
= PAGE_SIZE
<< order
;
2062 req
->r_num_caps
= num_entries
+ 1;
2063 req
->r_args
.readdir
.max_entries
= cpu_to_le32(num_entries
);
2064 req
->r_args
.readdir
.max_bytes
= cpu_to_le32(opt
->max_readdir_bytes
);
2069 * Create an mds request.
2071 struct ceph_mds_request
*
2072 ceph_mdsc_create_request(struct ceph_mds_client
*mdsc
, int op
, int mode
)
2074 struct ceph_mds_request
*req
= kzalloc(sizeof(*req
), GFP_NOFS
);
2075 struct timespec64 ts
;
2078 return ERR_PTR(-ENOMEM
);
2080 mutex_init(&req
->r_fill_mutex
);
2082 req
->r_started
= jiffies
;
2083 req
->r_resend_mds
= -1;
2084 INIT_LIST_HEAD(&req
->r_unsafe_dir_item
);
2085 INIT_LIST_HEAD(&req
->r_unsafe_target_item
);
2087 kref_init(&req
->r_kref
);
2088 RB_CLEAR_NODE(&req
->r_node
);
2089 INIT_LIST_HEAD(&req
->r_wait
);
2090 init_completion(&req
->r_completion
);
2091 init_completion(&req
->r_safe_completion
);
2092 INIT_LIST_HEAD(&req
->r_unsafe_item
);
2094 ktime_get_coarse_real_ts64(&ts
);
2095 req
->r_stamp
= timespec64_trunc(ts
, mdsc
->fsc
->sb
->s_time_gran
);
2098 req
->r_direct_mode
= mode
;
2103 * return oldest (lowest) request, tid in request tree, 0 if none.
2105 * called under mdsc->mutex.
2107 static struct ceph_mds_request
*__get_oldest_req(struct ceph_mds_client
*mdsc
)
2109 if (RB_EMPTY_ROOT(&mdsc
->request_tree
))
2111 return rb_entry(rb_first(&mdsc
->request_tree
),
2112 struct ceph_mds_request
, r_node
);
2115 static inline u64
__get_oldest_tid(struct ceph_mds_client
*mdsc
)
2117 return mdsc
->oldest_tid
;
2121 * Build a dentry's path. Allocate on heap; caller must kfree. Based
2122 * on build_path_from_dentry in fs/cifs/dir.c.
2124 * If @stop_on_nosnap, generate path relative to the first non-snapped
2127 * Encode hidden .snap dirs as a double /, i.e.
2128 * foo/.snap/bar -> foo//bar
2130 char *ceph_mdsc_build_path(struct dentry
*dentry
, int *plen
, u64
*pbase
,
2133 struct dentry
*temp
;
2140 return ERR_PTR(-EINVAL
);
2144 return ERR_PTR(-ENOMEM
);
2149 seq
= read_seqbegin(&rename_lock
);
2153 struct inode
*inode
;
2155 spin_lock(&temp
->d_lock
);
2156 inode
= d_inode(temp
);
2157 if (inode
&& ceph_snap(inode
) == CEPH_SNAPDIR
) {
2158 dout("build_path path+%d: %p SNAPDIR\n",
2160 } else if (stop_on_nosnap
&& inode
&& dentry
!= temp
&&
2161 ceph_snap(inode
) == CEPH_NOSNAP
) {
2162 spin_unlock(&temp
->d_lock
);
2163 pos
++; /* get rid of any prepended '/' */
2166 pos
-= temp
->d_name
.len
;
2168 spin_unlock(&temp
->d_lock
);
2171 memcpy(path
+ pos
, temp
->d_name
.name
, temp
->d_name
.len
);
2173 spin_unlock(&temp
->d_lock
);
2174 temp
= READ_ONCE(temp
->d_parent
);
2176 /* Are we at the root? */
2180 /* Are we out of buffer? */
2186 base
= ceph_ino(d_inode(temp
));
2189 if (read_seqretry(&rename_lock
, seq
))
2194 * A rename didn't occur, but somehow we didn't end up where
2195 * we thought we would. Throw a warning and try again.
2197 pr_warn("build_path did not end path lookup where "
2198 "expected, pos is %d\n", pos
);
2203 *plen
= PATH_MAX
- 1 - pos
;
2204 dout("build_path on %p %d built %llx '%.*s'\n",
2205 dentry
, d_count(dentry
), base
, *plen
, path
+ pos
);
2209 static int build_dentry_path(struct dentry
*dentry
, struct inode
*dir
,
2210 const char **ppath
, int *ppathlen
, u64
*pino
,
2211 bool *pfreepath
, bool parent_locked
)
2217 dir
= d_inode_rcu(dentry
->d_parent
);
2218 if (dir
&& parent_locked
&& ceph_snap(dir
) == CEPH_NOSNAP
) {
2219 *pino
= ceph_ino(dir
);
2221 *ppath
= dentry
->d_name
.name
;
2222 *ppathlen
= dentry
->d_name
.len
;
2226 path
= ceph_mdsc_build_path(dentry
, ppathlen
, pino
, 1);
2228 return PTR_ERR(path
);
2234 static int build_inode_path(struct inode
*inode
,
2235 const char **ppath
, int *ppathlen
, u64
*pino
,
2238 struct dentry
*dentry
;
2241 if (ceph_snap(inode
) == CEPH_NOSNAP
) {
2242 *pino
= ceph_ino(inode
);
2246 dentry
= d_find_alias(inode
);
2247 path
= ceph_mdsc_build_path(dentry
, ppathlen
, pino
, 1);
2250 return PTR_ERR(path
);
2257 * request arguments may be specified via an inode *, a dentry *, or
2258 * an explicit ino+path.
2260 static int set_request_path_attr(struct inode
*rinode
, struct dentry
*rdentry
,
2261 struct inode
*rdiri
, const char *rpath
,
2262 u64 rino
, const char **ppath
, int *pathlen
,
2263 u64
*ino
, bool *freepath
, bool parent_locked
)
2268 r
= build_inode_path(rinode
, ppath
, pathlen
, ino
, freepath
);
2269 dout(" inode %p %llx.%llx\n", rinode
, ceph_ino(rinode
),
2271 } else if (rdentry
) {
2272 r
= build_dentry_path(rdentry
, rdiri
, ppath
, pathlen
, ino
,
2273 freepath
, parent_locked
);
2274 dout(" dentry %p %llx/%.*s\n", rdentry
, *ino
, *pathlen
,
2276 } else if (rpath
|| rino
) {
2279 *pathlen
= rpath
? strlen(rpath
) : 0;
2280 dout(" path %.*s\n", *pathlen
, rpath
);
2287 * called under mdsc->mutex
2289 static struct ceph_msg
*create_request_message(struct ceph_mds_client
*mdsc
,
2290 struct ceph_mds_request
*req
,
2291 int mds
, bool drop_cap_releases
)
2293 struct ceph_msg
*msg
;
2294 struct ceph_mds_request_head
*head
;
2295 const char *path1
= NULL
;
2296 const char *path2
= NULL
;
2297 u64 ino1
= 0, ino2
= 0;
2298 int pathlen1
= 0, pathlen2
= 0;
2299 bool freepath1
= false, freepath2
= false;
2305 ret
= set_request_path_attr(req
->r_inode
, req
->r_dentry
,
2306 req
->r_parent
, req
->r_path1
, req
->r_ino1
.ino
,
2307 &path1
, &pathlen1
, &ino1
, &freepath1
,
2308 test_bit(CEPH_MDS_R_PARENT_LOCKED
,
2309 &req
->r_req_flags
));
2315 /* If r_old_dentry is set, then assume that its parent is locked */
2316 ret
= set_request_path_attr(NULL
, req
->r_old_dentry
,
2317 req
->r_old_dentry_dir
,
2318 req
->r_path2
, req
->r_ino2
.ino
,
2319 &path2
, &pathlen2
, &ino2
, &freepath2
, true);
2325 len
= sizeof(*head
) +
2326 pathlen1
+ pathlen2
+ 2*(1 + sizeof(u32
) + sizeof(u64
)) +
2327 sizeof(struct ceph_timespec
);
2329 /* calculate (max) length for cap releases */
2330 len
+= sizeof(struct ceph_mds_request_release
) *
2331 (!!req
->r_inode_drop
+ !!req
->r_dentry_drop
+
2332 !!req
->r_old_inode_drop
+ !!req
->r_old_dentry_drop
);
2333 if (req
->r_dentry_drop
)
2335 if (req
->r_old_dentry_drop
)
2338 msg
= ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST
, len
, 1, GFP_NOFS
, false);
2340 msg
= ERR_PTR(-ENOMEM
);
2344 msg
->hdr
.version
= cpu_to_le16(2);
2345 msg
->hdr
.tid
= cpu_to_le64(req
->r_tid
);
2347 head
= msg
->front
.iov_base
;
2348 p
= msg
->front
.iov_base
+ sizeof(*head
);
2349 end
= msg
->front
.iov_base
+ msg
->front
.iov_len
;
2351 head
->mdsmap_epoch
= cpu_to_le32(mdsc
->mdsmap
->m_epoch
);
2352 head
->op
= cpu_to_le32(req
->r_op
);
2353 head
->caller_uid
= cpu_to_le32(from_kuid(&init_user_ns
, req
->r_uid
));
2354 head
->caller_gid
= cpu_to_le32(from_kgid(&init_user_ns
, req
->r_gid
));
2356 head
->args
= req
->r_args
;
2358 ceph_encode_filepath(&p
, end
, ino1
, path1
);
2359 ceph_encode_filepath(&p
, end
, ino2
, path2
);
2361 /* make note of release offset, in case we need to replay */
2362 req
->r_request_release_offset
= p
- msg
->front
.iov_base
;
2366 if (req
->r_inode_drop
)
2367 releases
+= ceph_encode_inode_release(&p
,
2368 req
->r_inode
? req
->r_inode
: d_inode(req
->r_dentry
),
2369 mds
, req
->r_inode_drop
, req
->r_inode_unless
, 0);
2370 if (req
->r_dentry_drop
)
2371 releases
+= ceph_encode_dentry_release(&p
, req
->r_dentry
,
2372 req
->r_parent
, mds
, req
->r_dentry_drop
,
2373 req
->r_dentry_unless
);
2374 if (req
->r_old_dentry_drop
)
2375 releases
+= ceph_encode_dentry_release(&p
, req
->r_old_dentry
,
2376 req
->r_old_dentry_dir
, mds
,
2377 req
->r_old_dentry_drop
,
2378 req
->r_old_dentry_unless
);
2379 if (req
->r_old_inode_drop
)
2380 releases
+= ceph_encode_inode_release(&p
,
2381 d_inode(req
->r_old_dentry
),
2382 mds
, req
->r_old_inode_drop
, req
->r_old_inode_unless
, 0);
2384 if (drop_cap_releases
) {
2386 p
= msg
->front
.iov_base
+ req
->r_request_release_offset
;
2389 head
->num_releases
= cpu_to_le16(releases
);
2393 struct ceph_timespec ts
;
2394 ceph_encode_timespec64(&ts
, &req
->r_stamp
);
2395 ceph_encode_copy(&p
, &ts
, sizeof(ts
));
2399 msg
->front
.iov_len
= p
- msg
->front
.iov_base
;
2400 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
2402 if (req
->r_pagelist
) {
2403 struct ceph_pagelist
*pagelist
= req
->r_pagelist
;
2404 ceph_msg_data_add_pagelist(msg
, pagelist
);
2405 msg
->hdr
.data_len
= cpu_to_le32(pagelist
->length
);
2407 msg
->hdr
.data_len
= 0;
2410 msg
->hdr
.data_off
= cpu_to_le16(0);
2414 ceph_mdsc_free_path((char *)path2
, pathlen2
);
2417 ceph_mdsc_free_path((char *)path1
, pathlen1
);
2423 * called under mdsc->mutex if error, under no mutex if
2426 static void complete_request(struct ceph_mds_client
*mdsc
,
2427 struct ceph_mds_request
*req
)
2429 if (req
->r_callback
)
2430 req
->r_callback(mdsc
, req
);
2431 complete_all(&req
->r_completion
);
2435 * called under mdsc->mutex
2437 static int __prepare_send_request(struct ceph_mds_client
*mdsc
,
2438 struct ceph_mds_request
*req
,
2439 int mds
, bool drop_cap_releases
)
2441 struct ceph_mds_request_head
*rhead
;
2442 struct ceph_msg
*msg
;
2447 struct ceph_cap
*cap
=
2448 ceph_get_cap_for_mds(ceph_inode(req
->r_inode
), mds
);
2451 req
->r_sent_on_mseq
= cap
->mseq
;
2453 req
->r_sent_on_mseq
= -1;
2455 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req
,
2456 req
->r_tid
, ceph_mds_op_name(req
->r_op
), req
->r_attempts
);
2458 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
2461 * Replay. Do not regenerate message (and rebuild
2462 * paths, etc.); just use the original message.
2463 * Rebuilding paths will break for renames because
2464 * d_move mangles the src name.
2466 msg
= req
->r_request
;
2467 rhead
= msg
->front
.iov_base
;
2469 flags
= le32_to_cpu(rhead
->flags
);
2470 flags
|= CEPH_MDS_FLAG_REPLAY
;
2471 rhead
->flags
= cpu_to_le32(flags
);
2473 if (req
->r_target_inode
)
2474 rhead
->ino
= cpu_to_le64(ceph_ino(req
->r_target_inode
));
2476 rhead
->num_retry
= req
->r_attempts
- 1;
2478 /* remove cap/dentry releases from message */
2479 rhead
->num_releases
= 0;
2482 p
= msg
->front
.iov_base
+ req
->r_request_release_offset
;
2484 struct ceph_timespec ts
;
2485 ceph_encode_timespec64(&ts
, &req
->r_stamp
);
2486 ceph_encode_copy(&p
, &ts
, sizeof(ts
));
2489 msg
->front
.iov_len
= p
- msg
->front
.iov_base
;
2490 msg
->hdr
.front_len
= cpu_to_le32(msg
->front
.iov_len
);
2494 if (req
->r_request
) {
2495 ceph_msg_put(req
->r_request
);
2496 req
->r_request
= NULL
;
2498 msg
= create_request_message(mdsc
, req
, mds
, drop_cap_releases
);
2500 req
->r_err
= PTR_ERR(msg
);
2501 return PTR_ERR(msg
);
2503 req
->r_request
= msg
;
2505 rhead
= msg
->front
.iov_base
;
2506 rhead
->oldest_client_tid
= cpu_to_le64(__get_oldest_tid(mdsc
));
2507 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
))
2508 flags
|= CEPH_MDS_FLAG_REPLAY
;
2510 flags
|= CEPH_MDS_FLAG_WANT_DENTRY
;
2511 rhead
->flags
= cpu_to_le32(flags
);
2512 rhead
->num_fwd
= req
->r_num_fwd
;
2513 rhead
->num_retry
= req
->r_attempts
- 1;
2516 dout(" r_parent = %p\n", req
->r_parent
);
2521 * send request, or put it on the appropriate wait list.
2523 static void __do_request(struct ceph_mds_client
*mdsc
,
2524 struct ceph_mds_request
*req
)
2526 struct ceph_mds_session
*session
= NULL
;
2530 if (req
->r_err
|| test_bit(CEPH_MDS_R_GOT_RESULT
, &req
->r_req_flags
)) {
2531 if (test_bit(CEPH_MDS_R_ABORTED
, &req
->r_req_flags
))
2532 __unregister_request(mdsc
, req
);
2536 if (req
->r_timeout
&&
2537 time_after_eq(jiffies
, req
->r_started
+ req
->r_timeout
)) {
2538 dout("do_request timed out\n");
2542 if (READ_ONCE(mdsc
->fsc
->mount_state
) == CEPH_MOUNT_SHUTDOWN
) {
2543 dout("do_request forced umount\n");
2547 if (READ_ONCE(mdsc
->fsc
->mount_state
) == CEPH_MOUNT_MOUNTING
) {
2548 if (mdsc
->mdsmap_err
) {
2549 err
= mdsc
->mdsmap_err
;
2550 dout("do_request mdsmap err %d\n", err
);
2553 if (mdsc
->mdsmap
->m_epoch
== 0) {
2554 dout("do_request no mdsmap, waiting for map\n");
2555 list_add(&req
->r_wait
, &mdsc
->waiting_for_map
);
2558 if (!(mdsc
->fsc
->mount_options
->flags
&
2559 CEPH_MOUNT_OPT_MOUNTWAIT
) &&
2560 !ceph_mdsmap_is_cluster_available(mdsc
->mdsmap
)) {
2562 pr_info("probably no mds server is up\n");
2567 put_request_session(req
);
2569 mds
= __choose_mds(mdsc
, req
);
2571 ceph_mdsmap_get_state(mdsc
->mdsmap
, mds
) < CEPH_MDS_STATE_ACTIVE
) {
2572 dout("do_request no mds or not active, waiting for map\n");
2573 list_add(&req
->r_wait
, &mdsc
->waiting_for_map
);
2577 /* get, open session */
2578 session
= __ceph_lookup_mds_session(mdsc
, mds
);
2580 session
= register_session(mdsc
, mds
);
2581 if (IS_ERR(session
)) {
2582 err
= PTR_ERR(session
);
2586 req
->r_session
= get_session(session
);
2588 dout("do_request mds%d session %p state %s\n", mds
, session
,
2589 ceph_session_state_name(session
->s_state
));
2590 if (session
->s_state
!= CEPH_MDS_SESSION_OPEN
&&
2591 session
->s_state
!= CEPH_MDS_SESSION_HUNG
) {
2592 if (session
->s_state
== CEPH_MDS_SESSION_REJECTED
) {
2596 if (session
->s_state
== CEPH_MDS_SESSION_NEW
||
2597 session
->s_state
== CEPH_MDS_SESSION_CLOSING
)
2598 __open_session(mdsc
, session
);
2599 list_add(&req
->r_wait
, &session
->s_waiting
);
2604 req
->r_resend_mds
= -1; /* forget any previous mds hint */
2606 if (req
->r_request_started
== 0) /* note request start time */
2607 req
->r_request_started
= jiffies
;
2609 err
= __prepare_send_request(mdsc
, req
, mds
, false);
2611 ceph_msg_get(req
->r_request
);
2612 ceph_con_send(&session
->s_con
, req
->r_request
);
2616 ceph_put_mds_session(session
);
2619 dout("__do_request early error %d\n", err
);
2621 complete_request(mdsc
, req
);
2622 __unregister_request(mdsc
, req
);
2628 * called under mdsc->mutex
2630 static void __wake_requests(struct ceph_mds_client
*mdsc
,
2631 struct list_head
*head
)
2633 struct ceph_mds_request
*req
;
2634 LIST_HEAD(tmp_list
);
2636 list_splice_init(head
, &tmp_list
);
2638 while (!list_empty(&tmp_list
)) {
2639 req
= list_entry(tmp_list
.next
,
2640 struct ceph_mds_request
, r_wait
);
2641 list_del_init(&req
->r_wait
);
2642 dout(" wake request %p tid %llu\n", req
, req
->r_tid
);
2643 __do_request(mdsc
, req
);
2648 * Wake up threads with requests pending for @mds, so that they can
2649 * resubmit their requests to a possibly different mds.
2651 static void kick_requests(struct ceph_mds_client
*mdsc
, int mds
)
2653 struct ceph_mds_request
*req
;
2654 struct rb_node
*p
= rb_first(&mdsc
->request_tree
);
2656 dout("kick_requests mds%d\n", mds
);
2658 req
= rb_entry(p
, struct ceph_mds_request
, r_node
);
2660 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
))
2662 if (req
->r_attempts
> 0)
2663 continue; /* only new requests */
2664 if (req
->r_session
&&
2665 req
->r_session
->s_mds
== mds
) {
2666 dout(" kicking tid %llu\n", req
->r_tid
);
2667 list_del_init(&req
->r_wait
);
2668 __do_request(mdsc
, req
);
2673 int ceph_mdsc_submit_request(struct ceph_mds_client
*mdsc
, struct inode
*dir
,
2674 struct ceph_mds_request
*req
)
2678 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2680 ceph_get_cap_refs(ceph_inode(req
->r_inode
), CEPH_CAP_PIN
);
2681 if (req
->r_parent
) {
2682 ceph_get_cap_refs(ceph_inode(req
->r_parent
), CEPH_CAP_PIN
);
2683 ihold(req
->r_parent
);
2685 if (req
->r_old_dentry_dir
)
2686 ceph_get_cap_refs(ceph_inode(req
->r_old_dentry_dir
),
2689 dout("submit_request on %p for inode %p\n", req
, dir
);
2690 mutex_lock(&mdsc
->mutex
);
2691 __register_request(mdsc
, req
, dir
);
2692 __do_request(mdsc
, req
);
2694 mutex_unlock(&mdsc
->mutex
);
2698 static int ceph_mdsc_wait_request(struct ceph_mds_client
*mdsc
,
2699 struct ceph_mds_request
*req
)
2704 dout("do_request waiting\n");
2705 if (!req
->r_timeout
&& req
->r_wait_for_completion
) {
2706 err
= req
->r_wait_for_completion(mdsc
, req
);
2708 long timeleft
= wait_for_completion_killable_timeout(
2710 ceph_timeout_jiffies(req
->r_timeout
));
2714 err
= -EIO
; /* timed out */
2716 err
= timeleft
; /* killed */
2718 dout("do_request waited, got %d\n", err
);
2719 mutex_lock(&mdsc
->mutex
);
2721 /* only abort if we didn't race with a real reply */
2722 if (test_bit(CEPH_MDS_R_GOT_RESULT
, &req
->r_req_flags
)) {
2723 err
= le32_to_cpu(req
->r_reply_info
.head
->result
);
2724 } else if (err
< 0) {
2725 dout("aborted request %lld with %d\n", req
->r_tid
, err
);
2728 * ensure we aren't running concurrently with
2729 * ceph_fill_trace or ceph_readdir_prepopulate, which
2730 * rely on locks (dir mutex) held by our caller.
2732 mutex_lock(&req
->r_fill_mutex
);
2734 set_bit(CEPH_MDS_R_ABORTED
, &req
->r_req_flags
);
2735 mutex_unlock(&req
->r_fill_mutex
);
2737 if (req
->r_parent
&&
2738 (req
->r_op
& CEPH_MDS_OP_WRITE
))
2739 ceph_invalidate_dir_request(req
);
2744 mutex_unlock(&mdsc
->mutex
);
2749 * Synchrously perform an mds request. Take care of all of the
2750 * session setup, forwarding, retry details.
2752 int ceph_mdsc_do_request(struct ceph_mds_client
*mdsc
,
2754 struct ceph_mds_request
*req
)
2758 dout("do_request on %p\n", req
);
2761 err
= ceph_mdsc_submit_request(mdsc
, dir
, req
);
2763 err
= ceph_mdsc_wait_request(mdsc
, req
);
2764 dout("do_request %p done, result %d\n", req
, err
);
2769 * Invalidate dir's completeness, dentry lease state on an aborted MDS
2770 * namespace request.
2772 void ceph_invalidate_dir_request(struct ceph_mds_request
*req
)
2774 struct inode
*dir
= req
->r_parent
;
2775 struct inode
*old_dir
= req
->r_old_dentry_dir
;
2777 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir
, old_dir
);
2779 ceph_dir_clear_complete(dir
);
2781 ceph_dir_clear_complete(old_dir
);
2783 ceph_invalidate_dentry_lease(req
->r_dentry
);
2784 if (req
->r_old_dentry
)
2785 ceph_invalidate_dentry_lease(req
->r_old_dentry
);
2791 * We take the session mutex and parse and process the reply immediately.
2792 * This preserves the logical ordering of replies, capabilities, etc., sent
2793 * by the MDS as they are applied to our local cache.
2795 static void handle_reply(struct ceph_mds_session
*session
, struct ceph_msg
*msg
)
2797 struct ceph_mds_client
*mdsc
= session
->s_mdsc
;
2798 struct ceph_mds_request
*req
;
2799 struct ceph_mds_reply_head
*head
= msg
->front
.iov_base
;
2800 struct ceph_mds_reply_info_parsed
*rinfo
; /* parsed reply info */
2801 struct ceph_snap_realm
*realm
;
2804 int mds
= session
->s_mds
;
2806 if (msg
->front
.iov_len
< sizeof(*head
)) {
2807 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2812 /* get request, session */
2813 tid
= le64_to_cpu(msg
->hdr
.tid
);
2814 mutex_lock(&mdsc
->mutex
);
2815 req
= lookup_get_request(mdsc
, tid
);
2817 dout("handle_reply on unknown tid %llu\n", tid
);
2818 mutex_unlock(&mdsc
->mutex
);
2821 dout("handle_reply %p\n", req
);
2823 /* correct session? */
2824 if (req
->r_session
!= session
) {
2825 pr_err("mdsc_handle_reply got %llu on session mds%d"
2826 " not mds%d\n", tid
, session
->s_mds
,
2827 req
->r_session
? req
->r_session
->s_mds
: -1);
2828 mutex_unlock(&mdsc
->mutex
);
2833 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
) && !head
->safe
) ||
2834 (test_bit(CEPH_MDS_R_GOT_SAFE
, &req
->r_req_flags
) && head
->safe
)) {
2835 pr_warn("got a dup %s reply on %llu from mds%d\n",
2836 head
->safe
? "safe" : "unsafe", tid
, mds
);
2837 mutex_unlock(&mdsc
->mutex
);
2840 if (test_bit(CEPH_MDS_R_GOT_SAFE
, &req
->r_req_flags
)) {
2841 pr_warn("got unsafe after safe on %llu from mds%d\n",
2843 mutex_unlock(&mdsc
->mutex
);
2847 result
= le32_to_cpu(head
->result
);
2851 * if we're not talking to the authority, send to them
2852 * if the authority has changed while we weren't looking,
2853 * send to new authority
2854 * Otherwise we just have to return an ESTALE
2856 if (result
== -ESTALE
) {
2857 dout("got ESTALE on request %llu\n", req
->r_tid
);
2858 req
->r_resend_mds
= -1;
2859 if (req
->r_direct_mode
!= USE_AUTH_MDS
) {
2860 dout("not using auth, setting for that now\n");
2861 req
->r_direct_mode
= USE_AUTH_MDS
;
2862 __do_request(mdsc
, req
);
2863 mutex_unlock(&mdsc
->mutex
);
2866 int mds
= __choose_mds(mdsc
, req
);
2867 if (mds
>= 0 && mds
!= req
->r_session
->s_mds
) {
2868 dout("but auth changed, so resending\n");
2869 __do_request(mdsc
, req
);
2870 mutex_unlock(&mdsc
->mutex
);
2874 dout("have to return ESTALE on request %llu\n", req
->r_tid
);
2879 set_bit(CEPH_MDS_R_GOT_SAFE
, &req
->r_req_flags
);
2880 __unregister_request(mdsc
, req
);
2882 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
2884 * We already handled the unsafe response, now do the
2885 * cleanup. No need to examine the response; the MDS
2886 * doesn't include any result info in the safe
2887 * response. And even if it did, there is nothing
2888 * useful we could do with a revised return value.
2890 dout("got safe reply %llu, mds%d\n", tid
, mds
);
2892 /* last unsafe request during umount? */
2893 if (mdsc
->stopping
&& !__get_oldest_req(mdsc
))
2894 complete_all(&mdsc
->safe_umount_waiters
);
2895 mutex_unlock(&mdsc
->mutex
);
2899 set_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
);
2900 list_add_tail(&req
->r_unsafe_item
, &req
->r_session
->s_unsafe
);
2901 if (req
->r_unsafe_dir
) {
2902 struct ceph_inode_info
*ci
=
2903 ceph_inode(req
->r_unsafe_dir
);
2904 spin_lock(&ci
->i_unsafe_lock
);
2905 list_add_tail(&req
->r_unsafe_dir_item
,
2906 &ci
->i_unsafe_dirops
);
2907 spin_unlock(&ci
->i_unsafe_lock
);
2911 dout("handle_reply tid %lld result %d\n", tid
, result
);
2912 rinfo
= &req
->r_reply_info
;
2913 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING
, &session
->s_features
))
2914 err
= parse_reply_info(msg
, rinfo
, (u64
)-1);
2916 err
= parse_reply_info(msg
, rinfo
, session
->s_con
.peer_features
);
2917 mutex_unlock(&mdsc
->mutex
);
2919 mutex_lock(&session
->s_mutex
);
2921 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds
, tid
);
2928 if (rinfo
->snapblob_len
) {
2929 down_write(&mdsc
->snap_rwsem
);
2930 ceph_update_snap_trace(mdsc
, rinfo
->snapblob
,
2931 rinfo
->snapblob
+ rinfo
->snapblob_len
,
2932 le32_to_cpu(head
->op
) == CEPH_MDS_OP_RMSNAP
,
2934 downgrade_write(&mdsc
->snap_rwsem
);
2936 down_read(&mdsc
->snap_rwsem
);
2939 /* insert trace into our cache */
2940 mutex_lock(&req
->r_fill_mutex
);
2941 current
->journal_info
= req
;
2942 err
= ceph_fill_trace(mdsc
->fsc
->sb
, req
);
2944 if (result
== 0 && (req
->r_op
== CEPH_MDS_OP_READDIR
||
2945 req
->r_op
== CEPH_MDS_OP_LSSNAP
))
2946 ceph_readdir_prepopulate(req
, req
->r_session
);
2948 current
->journal_info
= NULL
;
2949 mutex_unlock(&req
->r_fill_mutex
);
2951 up_read(&mdsc
->snap_rwsem
);
2953 ceph_put_snap_realm(mdsc
, realm
);
2956 if (req
->r_target_inode
&&
2957 test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
)) {
2958 struct ceph_inode_info
*ci
=
2959 ceph_inode(req
->r_target_inode
);
2960 spin_lock(&ci
->i_unsafe_lock
);
2961 list_add_tail(&req
->r_unsafe_target_item
,
2962 &ci
->i_unsafe_iops
);
2963 spin_unlock(&ci
->i_unsafe_lock
);
2966 ceph_unreserve_caps(mdsc
, &req
->r_caps_reservation
);
2969 mutex_lock(&mdsc
->mutex
);
2970 if (!test_bit(CEPH_MDS_R_ABORTED
, &req
->r_req_flags
)) {
2974 req
->r_reply
= ceph_msg_get(msg
);
2975 set_bit(CEPH_MDS_R_GOT_RESULT
, &req
->r_req_flags
);
2978 dout("reply arrived after request %lld was aborted\n", tid
);
2980 mutex_unlock(&mdsc
->mutex
);
2982 mutex_unlock(&session
->s_mutex
);
2984 /* kick calling process */
2985 complete_request(mdsc
, req
);
2987 ceph_mdsc_put_request(req
);
2994 * handle mds notification that our request has been forwarded.
2996 static void handle_forward(struct ceph_mds_client
*mdsc
,
2997 struct ceph_mds_session
*session
,
2998 struct ceph_msg
*msg
)
3000 struct ceph_mds_request
*req
;
3001 u64 tid
= le64_to_cpu(msg
->hdr
.tid
);
3005 void *p
= msg
->front
.iov_base
;
3006 void *end
= p
+ msg
->front
.iov_len
;
3008 ceph_decode_need(&p
, end
, 2*sizeof(u32
), bad
);
3009 next_mds
= ceph_decode_32(&p
);
3010 fwd_seq
= ceph_decode_32(&p
);
3012 mutex_lock(&mdsc
->mutex
);
3013 req
= lookup_get_request(mdsc
, tid
);
3015 dout("forward tid %llu to mds%d - req dne\n", tid
, next_mds
);
3016 goto out
; /* dup reply? */
3019 if (test_bit(CEPH_MDS_R_ABORTED
, &req
->r_req_flags
)) {
3020 dout("forward tid %llu aborted, unregistering\n", tid
);
3021 __unregister_request(mdsc
, req
);
3022 } else if (fwd_seq
<= req
->r_num_fwd
) {
3023 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3024 tid
, next_mds
, req
->r_num_fwd
, fwd_seq
);
3026 /* resend. forward race not possible; mds would drop */
3027 dout("forward tid %llu to mds%d (we resend)\n", tid
, next_mds
);
3029 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT
, &req
->r_req_flags
));
3030 req
->r_attempts
= 0;
3031 req
->r_num_fwd
= fwd_seq
;
3032 req
->r_resend_mds
= next_mds
;
3033 put_request_session(req
);
3034 __do_request(mdsc
, req
);
3036 ceph_mdsc_put_request(req
);
3038 mutex_unlock(&mdsc
->mutex
);
3042 pr_err("mdsc_handle_forward decode error err=%d\n", err
);
3045 static int __decode_session_metadata(void **p
, void *end
,
3048 /* map<string,string> */
3051 ceph_decode_32_safe(p
, end
, n
, bad
);
3054 ceph_decode_32_safe(p
, end
, len
, bad
);
3055 ceph_decode_need(p
, end
, len
, bad
);
3056 err_str
= !strncmp(*p
, "error_string", len
);
3058 ceph_decode_32_safe(p
, end
, len
, bad
);
3059 ceph_decode_need(p
, end
, len
, bad
);
3060 if (err_str
&& strnstr(*p
, "blacklisted", len
))
3061 *blacklisted
= true;
3070 * handle a mds session control message
3072 static void handle_session(struct ceph_mds_session
*session
,
3073 struct ceph_msg
*msg
)
3075 struct ceph_mds_client
*mdsc
= session
->s_mdsc
;
3076 int mds
= session
->s_mds
;
3077 int msg_version
= le16_to_cpu(msg
->hdr
.version
);
3078 void *p
= msg
->front
.iov_base
;
3079 void *end
= p
+ msg
->front
.iov_len
;
3080 struct ceph_mds_session_head
*h
;
3083 unsigned long features
= 0;
3085 bool blacklisted
= false;
3088 ceph_decode_need(&p
, end
, sizeof(*h
), bad
);
3092 op
= le32_to_cpu(h
->op
);
3093 seq
= le64_to_cpu(h
->seq
);
3095 if (msg_version
>= 3) {
3097 /* version >= 2, metadata */
3098 if (__decode_session_metadata(&p
, end
, &blacklisted
) < 0)
3100 /* version >= 3, feature bits */
3101 ceph_decode_32_safe(&p
, end
, len
, bad
);
3102 ceph_decode_need(&p
, end
, len
, bad
);
3103 memcpy(&features
, p
, min_t(size_t, len
, sizeof(features
)));
3107 mutex_lock(&mdsc
->mutex
);
3108 if (op
== CEPH_SESSION_CLOSE
) {
3109 get_session(session
);
3110 __unregister_session(mdsc
, session
);
3112 /* FIXME: this ttl calculation is generous */
3113 session
->s_ttl
= jiffies
+ HZ
*mdsc
->mdsmap
->m_session_autoclose
;
3114 mutex_unlock(&mdsc
->mutex
);
3116 mutex_lock(&session
->s_mutex
);
3118 dout("handle_session mds%d %s %p state %s seq %llu\n",
3119 mds
, ceph_session_op_name(op
), session
,
3120 ceph_session_state_name(session
->s_state
), seq
);
3122 if (session
->s_state
== CEPH_MDS_SESSION_HUNG
) {
3123 session
->s_state
= CEPH_MDS_SESSION_OPEN
;
3124 pr_info("mds%d came back\n", session
->s_mds
);
3128 case CEPH_SESSION_OPEN
:
3129 if (session
->s_state
== CEPH_MDS_SESSION_RECONNECTING
)
3130 pr_info("mds%d reconnect success\n", session
->s_mds
);
3131 session
->s_state
= CEPH_MDS_SESSION_OPEN
;
3132 session
->s_features
= features
;
3133 renewed_caps(mdsc
, session
, 0);
3136 __close_session(mdsc
, session
);
3139 case CEPH_SESSION_RENEWCAPS
:
3140 if (session
->s_renew_seq
== seq
)
3141 renewed_caps(mdsc
, session
, 1);
3144 case CEPH_SESSION_CLOSE
:
3145 if (session
->s_state
== CEPH_MDS_SESSION_RECONNECTING
)
3146 pr_info("mds%d reconnect denied\n", session
->s_mds
);
3147 cleanup_session_requests(mdsc
, session
);
3148 remove_session_caps(session
);
3149 wake
= 2; /* for good measure */
3150 wake_up_all(&mdsc
->session_close_wq
);
3153 case CEPH_SESSION_STALE
:
3154 pr_info("mds%d caps went stale, renewing\n",
3156 spin_lock(&session
->s_gen_ttl_lock
);
3157 session
->s_cap_gen
++;
3158 session
->s_cap_ttl
= jiffies
- 1;
3159 spin_unlock(&session
->s_gen_ttl_lock
);
3160 send_renew_caps(mdsc
, session
);
3163 case CEPH_SESSION_RECALL_STATE
:
3164 ceph_trim_caps(mdsc
, session
, le32_to_cpu(h
->max_caps
));
3167 case CEPH_SESSION_FLUSHMSG
:
3168 send_flushmsg_ack(mdsc
, session
, seq
);
3171 case CEPH_SESSION_FORCE_RO
:
3172 dout("force_session_readonly %p\n", session
);
3173 spin_lock(&session
->s_cap_lock
);
3174 session
->s_readonly
= true;
3175 spin_unlock(&session
->s_cap_lock
);
3176 wake_up_session_caps(session
, FORCE_RO
);
3179 case CEPH_SESSION_REJECT
:
3180 WARN_ON(session
->s_state
!= CEPH_MDS_SESSION_OPENING
);
3181 pr_info("mds%d rejected session\n", session
->s_mds
);
3182 session
->s_state
= CEPH_MDS_SESSION_REJECTED
;
3183 cleanup_session_requests(mdsc
, session
);
3184 remove_session_caps(session
);
3186 mdsc
->fsc
->blacklisted
= true;
3187 wake
= 2; /* for good measure */
3191 pr_err("mdsc_handle_session bad op %d mds%d\n", op
, mds
);
3195 mutex_unlock(&session
->s_mutex
);
3197 mutex_lock(&mdsc
->mutex
);
3198 __wake_requests(mdsc
, &session
->s_waiting
);
3200 kick_requests(mdsc
, mds
);
3201 mutex_unlock(&mdsc
->mutex
);
3203 if (op
== CEPH_SESSION_CLOSE
)
3204 ceph_put_mds_session(session
);
3208 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds
,
3209 (int)msg
->front
.iov_len
);
3216 * called under session->mutex.
3218 static void replay_unsafe_requests(struct ceph_mds_client
*mdsc
,
3219 struct ceph_mds_session
*session
)
3221 struct ceph_mds_request
*req
, *nreq
;
3225 dout("replay_unsafe_requests mds%d\n", session
->s_mds
);
3227 mutex_lock(&mdsc
->mutex
);
3228 list_for_each_entry_safe(req
, nreq
, &session
->s_unsafe
, r_unsafe_item
) {
3229 err
= __prepare_send_request(mdsc
, req
, session
->s_mds
, true);
3231 ceph_msg_get(req
->r_request
);
3232 ceph_con_send(&session
->s_con
, req
->r_request
);
3237 * also re-send old requests when MDS enters reconnect stage. So that MDS
3238 * can process completed request in clientreplay stage.
3240 p
= rb_first(&mdsc
->request_tree
);
3242 req
= rb_entry(p
, struct ceph_mds_request
, r_node
);
3244 if (test_bit(CEPH_MDS_R_GOT_UNSAFE
, &req
->r_req_flags
))
3246 if (req
->r_attempts
== 0)
3247 continue; /* only old requests */
3248 if (req
->r_session
&&
3249 req
->r_session
->s_mds
== session
->s_mds
) {
3250 err
= __prepare_send_request(mdsc
, req
,
3251 session
->s_mds
, true);
3253 ceph_msg_get(req
->r_request
);
3254 ceph_con_send(&session
->s_con
, req
->r_request
);
3258 mutex_unlock(&mdsc
->mutex
);
3261 static int send_reconnect_partial(struct ceph_reconnect_state
*recon_state
)
3263 struct ceph_msg
*reply
;
3264 struct ceph_pagelist
*_pagelist
;
3269 if (!recon_state
->allow_multi
)
3272 /* can't handle message that contains both caps and realm */
3273 BUG_ON(!recon_state
->nr_caps
== !recon_state
->nr_realms
);
3275 /* pre-allocate new pagelist */
3276 _pagelist
= ceph_pagelist_alloc(GFP_NOFS
);
3280 reply
= ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT
, 0, 1, GFP_NOFS
, false);
3284 /* placeholder for nr_caps */
3285 err
= ceph_pagelist_encode_32(_pagelist
, 0);
3289 if (recon_state
->nr_caps
) {
3290 /* currently encoding caps */
3291 err
= ceph_pagelist_encode_32(recon_state
->pagelist
, 0);
3295 /* placeholder for nr_realms (currently encoding relams) */
3296 err
= ceph_pagelist_encode_32(_pagelist
, 0);
3301 err
= ceph_pagelist_encode_8(recon_state
->pagelist
, 1);
3305 page
= list_first_entry(&recon_state
->pagelist
->head
, struct page
, lru
);
3306 addr
= kmap_atomic(page
);
3307 if (recon_state
->nr_caps
) {
3308 /* currently encoding caps */
3309 *addr
= cpu_to_le32(recon_state
->nr_caps
);
3311 /* currently encoding relams */
3312 *(addr
+ 1) = cpu_to_le32(recon_state
->nr_realms
);
3314 kunmap_atomic(addr
);
3316 reply
->hdr
.version
= cpu_to_le16(5);
3317 reply
->hdr
.compat_version
= cpu_to_le16(4);
3319 reply
->hdr
.data_len
= cpu_to_le32(recon_state
->pagelist
->length
);
3320 ceph_msg_data_add_pagelist(reply
, recon_state
->pagelist
);
3322 ceph_con_send(&recon_state
->session
->s_con
, reply
);
3323 ceph_pagelist_release(recon_state
->pagelist
);
3325 recon_state
->pagelist
= _pagelist
;
3326 recon_state
->nr_caps
= 0;
3327 recon_state
->nr_realms
= 0;
3328 recon_state
->msg_version
= 5;
3331 ceph_msg_put(reply
);
3333 ceph_pagelist_release(_pagelist
);
3338 * Encode information about a cap for a reconnect with the MDS.
3340 static int encode_caps_cb(struct inode
*inode
, struct ceph_cap
*cap
,
3344 struct ceph_mds_cap_reconnect v2
;
3345 struct ceph_mds_cap_reconnect_v1 v1
;
3347 struct ceph_inode_info
*ci
= cap
->ci
;
3348 struct ceph_reconnect_state
*recon_state
= arg
;
3349 struct ceph_pagelist
*pagelist
= recon_state
->pagelist
;
3353 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3354 inode
, ceph_vinop(inode
), cap
, cap
->cap_id
,
3355 ceph_cap_string(cap
->issued
));
3357 spin_lock(&ci
->i_ceph_lock
);
3358 cap
->seq
= 0; /* reset cap seq */
3359 cap
->issue_seq
= 0; /* and issue_seq */
3360 cap
->mseq
= 0; /* and migrate_seq */
3361 cap
->cap_gen
= cap
->session
->s_cap_gen
;
3363 if (recon_state
->msg_version
>= 2) {
3364 rec
.v2
.cap_id
= cpu_to_le64(cap
->cap_id
);
3365 rec
.v2
.wanted
= cpu_to_le32(__ceph_caps_wanted(ci
));
3366 rec
.v2
.issued
= cpu_to_le32(cap
->issued
);
3367 rec
.v2
.snaprealm
= cpu_to_le64(ci
->i_snap_realm
->ino
);
3368 rec
.v2
.pathbase
= 0;
3369 rec
.v2
.flock_len
= (__force __le32
)
3370 ((ci
->i_ceph_flags
& CEPH_I_ERROR_FILELOCK
) ? 0 : 1);
3372 rec
.v1
.cap_id
= cpu_to_le64(cap
->cap_id
);
3373 rec
.v1
.wanted
= cpu_to_le32(__ceph_caps_wanted(ci
));
3374 rec
.v1
.issued
= cpu_to_le32(cap
->issued
);
3375 rec
.v1
.size
= cpu_to_le64(inode
->i_size
);
3376 ceph_encode_timespec64(&rec
.v1
.mtime
, &inode
->i_mtime
);
3377 ceph_encode_timespec64(&rec
.v1
.atime
, &inode
->i_atime
);
3378 rec
.v1
.snaprealm
= cpu_to_le64(ci
->i_snap_realm
->ino
);
3379 rec
.v1
.pathbase
= 0;
3382 if (list_empty(&ci
->i_cap_snaps
)) {
3383 snap_follows
= ci
->i_head_snapc
? ci
->i_head_snapc
->seq
: 0;
3385 struct ceph_cap_snap
*capsnap
=
3386 list_first_entry(&ci
->i_cap_snaps
,
3387 struct ceph_cap_snap
, ci_item
);
3388 snap_follows
= capsnap
->follows
;
3390 spin_unlock(&ci
->i_ceph_lock
);
3392 if (recon_state
->msg_version
>= 2) {
3393 int num_fcntl_locks
, num_flock_locks
;
3394 struct ceph_filelock
*flocks
= NULL
;
3395 size_t struct_len
, total_len
= sizeof(u64
);
3399 if (rec
.v2
.flock_len
) {
3400 ceph_count_locks(inode
, &num_fcntl_locks
, &num_flock_locks
);
3402 num_fcntl_locks
= 0;
3403 num_flock_locks
= 0;
3405 if (num_fcntl_locks
+ num_flock_locks
> 0) {
3406 flocks
= kmalloc_array(num_fcntl_locks
+ num_flock_locks
,
3407 sizeof(struct ceph_filelock
),
3413 err
= ceph_encode_locks_to_buffer(inode
, flocks
,
3428 if (recon_state
->msg_version
>= 3) {
3429 /* version, compat_version and struct_len */
3430 total_len
+= 2 * sizeof(u8
) + sizeof(u32
);
3434 * number of encoded locks is stable, so copy to pagelist
3436 struct_len
= 2 * sizeof(u32
) +
3437 (num_fcntl_locks
+ num_flock_locks
) *
3438 sizeof(struct ceph_filelock
);
3439 rec
.v2
.flock_len
= cpu_to_le32(struct_len
);
3441 struct_len
+= sizeof(u32
) + sizeof(rec
.v2
);
3444 struct_len
+= sizeof(u64
); /* snap_follows */
3446 total_len
+= struct_len
;
3448 if (pagelist
->length
+ total_len
> RECONNECT_MAX_SIZE
) {
3449 err
= send_reconnect_partial(recon_state
);
3451 goto out_freeflocks
;
3452 pagelist
= recon_state
->pagelist
;
3455 err
= ceph_pagelist_reserve(pagelist
, total_len
);
3457 goto out_freeflocks
;
3459 ceph_pagelist_encode_64(pagelist
, ceph_ino(inode
));
3460 if (recon_state
->msg_version
>= 3) {
3461 ceph_pagelist_encode_8(pagelist
, struct_v
);
3462 ceph_pagelist_encode_8(pagelist
, 1);
3463 ceph_pagelist_encode_32(pagelist
, struct_len
);
3465 ceph_pagelist_encode_string(pagelist
, NULL
, 0);
3466 ceph_pagelist_append(pagelist
, &rec
, sizeof(rec
.v2
));
3467 ceph_locks_to_pagelist(flocks
, pagelist
,
3468 num_fcntl_locks
, num_flock_locks
);
3470 ceph_pagelist_encode_64(pagelist
, snap_follows
);
3477 struct dentry
*dentry
;
3479 dentry
= d_find_alias(inode
);
3481 path
= ceph_mdsc_build_path(dentry
,
3482 &pathlen
, &pathbase
, 0);
3485 err
= PTR_ERR(path
);
3488 rec
.v1
.pathbase
= cpu_to_le64(pathbase
);
3491 err
= ceph_pagelist_reserve(pagelist
,
3492 sizeof(u64
) + sizeof(u32
) +
3493 pathlen
+ sizeof(rec
.v1
));
3498 ceph_pagelist_encode_64(pagelist
, ceph_ino(inode
));
3499 ceph_pagelist_encode_string(pagelist
, path
, pathlen
);
3500 ceph_pagelist_append(pagelist
, &rec
, sizeof(rec
.v1
));
3502 ceph_mdsc_free_path(path
, pathlen
);
3507 recon_state
->nr_caps
++;
3511 static int encode_snap_realms(struct ceph_mds_client
*mdsc
,
3512 struct ceph_reconnect_state
*recon_state
)
3515 struct ceph_pagelist
*pagelist
= recon_state
->pagelist
;
3518 if (recon_state
->msg_version
>= 4) {
3519 err
= ceph_pagelist_encode_32(pagelist
, mdsc
->num_snap_realms
);
3525 * snaprealms. we provide mds with the ino, seq (version), and
3526 * parent for all of our realms. If the mds has any newer info,
3529 for (p
= rb_first(&mdsc
->snap_realms
); p
; p
= rb_next(p
)) {
3530 struct ceph_snap_realm
*realm
=
3531 rb_entry(p
, struct ceph_snap_realm
, node
);
3532 struct ceph_mds_snaprealm_reconnect sr_rec
;
3534 if (recon_state
->msg_version
>= 4) {
3535 size_t need
= sizeof(u8
) * 2 + sizeof(u32
) +
3538 if (pagelist
->length
+ need
> RECONNECT_MAX_SIZE
) {
3539 err
= send_reconnect_partial(recon_state
);
3542 pagelist
= recon_state
->pagelist
;
3545 err
= ceph_pagelist_reserve(pagelist
, need
);
3549 ceph_pagelist_encode_8(pagelist
, 1);
3550 ceph_pagelist_encode_8(pagelist
, 1);
3551 ceph_pagelist_encode_32(pagelist
, sizeof(sr_rec
));
3554 dout(" adding snap realm %llx seq %lld parent %llx\n",
3555 realm
->ino
, realm
->seq
, realm
->parent_ino
);
3556 sr_rec
.ino
= cpu_to_le64(realm
->ino
);
3557 sr_rec
.seq
= cpu_to_le64(realm
->seq
);
3558 sr_rec
.parent
= cpu_to_le64(realm
->parent_ino
);
3560 err
= ceph_pagelist_append(pagelist
, &sr_rec
, sizeof(sr_rec
));
3564 recon_state
->nr_realms
++;
3572 * If an MDS fails and recovers, clients need to reconnect in order to
3573 * reestablish shared state. This includes all caps issued through
3574 * this session _and_ the snap_realm hierarchy. Because it's not
3575 * clear which snap realms the mds cares about, we send everything we
3576 * know about.. that ensures we'll then get any new info the
3577 * recovering MDS might have.
3579 * This is a relatively heavyweight operation, but it's rare.
3581 * called with mdsc->mutex held.
3583 static void send_mds_reconnect(struct ceph_mds_client
*mdsc
,
3584 struct ceph_mds_session
*session
)
3586 struct ceph_msg
*reply
;
3587 int mds
= session
->s_mds
;
3589 struct ceph_reconnect_state recon_state
= {
3594 pr_info("mds%d reconnect start\n", mds
);
3596 recon_state
.pagelist
= ceph_pagelist_alloc(GFP_NOFS
);
3597 if (!recon_state
.pagelist
)
3598 goto fail_nopagelist
;
3600 reply
= ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT
, 0, 1, GFP_NOFS
, false);
3604 mutex_lock(&session
->s_mutex
);
3605 session
->s_state
= CEPH_MDS_SESSION_RECONNECTING
;
3608 dout("session %p state %s\n", session
,
3609 ceph_session_state_name(session
->s_state
));
3611 spin_lock(&session
->s_gen_ttl_lock
);
3612 session
->s_cap_gen
++;
3613 spin_unlock(&session
->s_gen_ttl_lock
);
3615 spin_lock(&session
->s_cap_lock
);
3616 /* don't know if session is readonly */
3617 session
->s_readonly
= 0;
3619 * notify __ceph_remove_cap() that we are composing cap reconnect.
3620 * If a cap get released before being added to the cap reconnect,
3621 * __ceph_remove_cap() should skip queuing cap release.
3623 session
->s_cap_reconnect
= 1;
3624 /* drop old cap expires; we're about to reestablish that state */
3625 detach_cap_releases(session
, &dispose
);
3626 spin_unlock(&session
->s_cap_lock
);
3627 dispose_cap_releases(mdsc
, &dispose
);
3629 /* trim unused caps to reduce MDS's cache rejoin time */
3630 if (mdsc
->fsc
->sb
->s_root
)
3631 shrink_dcache_parent(mdsc
->fsc
->sb
->s_root
);
3633 ceph_con_close(&session
->s_con
);
3634 ceph_con_open(&session
->s_con
,
3635 CEPH_ENTITY_TYPE_MDS
, mds
,
3636 ceph_mdsmap_get_addr(mdsc
->mdsmap
, mds
));
3638 /* replay unsafe requests */
3639 replay_unsafe_requests(mdsc
, session
);
3641 ceph_early_kick_flushing_caps(mdsc
, session
);
3643 down_read(&mdsc
->snap_rwsem
);
3645 /* placeholder for nr_caps */
3646 err
= ceph_pagelist_encode_32(recon_state
.pagelist
, 0);
3650 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT
, &session
->s_features
)) {
3651 recon_state
.msg_version
= 3;
3652 recon_state
.allow_multi
= true;
3653 } else if (session
->s_con
.peer_features
& CEPH_FEATURE_MDSENC
) {
3654 recon_state
.msg_version
= 3;
3656 recon_state
.msg_version
= 2;
3658 /* trsaverse this session's caps */
3659 err
= ceph_iterate_session_caps(session
, encode_caps_cb
, &recon_state
);
3661 spin_lock(&session
->s_cap_lock
);
3662 session
->s_cap_reconnect
= 0;
3663 spin_unlock(&session
->s_cap_lock
);
3668 /* check if all realms can be encoded into current message */
3669 if (mdsc
->num_snap_realms
) {
3671 recon_state
.pagelist
->length
+
3672 mdsc
->num_snap_realms
*
3673 sizeof(struct ceph_mds_snaprealm_reconnect
);
3674 if (recon_state
.msg_version
>= 4) {
3675 /* number of realms */
3676 total_len
+= sizeof(u32
);
3677 /* version, compat_version and struct_len */
3678 total_len
+= mdsc
->num_snap_realms
*
3679 (2 * sizeof(u8
) + sizeof(u32
));
3681 if (total_len
> RECONNECT_MAX_SIZE
) {
3682 if (!recon_state
.allow_multi
) {
3686 if (recon_state
.nr_caps
) {
3687 err
= send_reconnect_partial(&recon_state
);
3691 recon_state
.msg_version
= 5;
3695 err
= encode_snap_realms(mdsc
, &recon_state
);
3699 if (recon_state
.msg_version
>= 5) {
3700 err
= ceph_pagelist_encode_8(recon_state
.pagelist
, 0);
3705 if (recon_state
.nr_caps
|| recon_state
.nr_realms
) {
3707 list_first_entry(&recon_state
.pagelist
->head
,
3709 __le32
*addr
= kmap_atomic(page
);
3710 if (recon_state
.nr_caps
) {
3711 WARN_ON(recon_state
.nr_realms
!= mdsc
->num_snap_realms
);
3712 *addr
= cpu_to_le32(recon_state
.nr_caps
);
3713 } else if (recon_state
.msg_version
>= 4) {
3714 *(addr
+ 1) = cpu_to_le32(recon_state
.nr_realms
);
3716 kunmap_atomic(addr
);
3719 reply
->hdr
.version
= cpu_to_le16(recon_state
.msg_version
);
3720 if (recon_state
.msg_version
>= 4)
3721 reply
->hdr
.compat_version
= cpu_to_le16(4);
3723 reply
->hdr
.data_len
= cpu_to_le32(recon_state
.pagelist
->length
);
3724 ceph_msg_data_add_pagelist(reply
, recon_state
.pagelist
);
3726 ceph_con_send(&session
->s_con
, reply
);
3728 mutex_unlock(&session
->s_mutex
);
3730 mutex_lock(&mdsc
->mutex
);
3731 __wake_requests(mdsc
, &session
->s_waiting
);
3732 mutex_unlock(&mdsc
->mutex
);
3734 up_read(&mdsc
->snap_rwsem
);
3735 ceph_pagelist_release(recon_state
.pagelist
);
3739 ceph_msg_put(reply
);
3740 up_read(&mdsc
->snap_rwsem
);
3741 mutex_unlock(&session
->s_mutex
);
3743 ceph_pagelist_release(recon_state
.pagelist
);
3745 pr_err("error %d preparing reconnect for mds%d\n", err
, mds
);
3751 * compare old and new mdsmaps, kicking requests
3752 * and closing out old connections as necessary
3754 * called under mdsc->mutex.
3756 static void check_new_map(struct ceph_mds_client
*mdsc
,
3757 struct ceph_mdsmap
*newmap
,
3758 struct ceph_mdsmap
*oldmap
)
3761 int oldstate
, newstate
;
3762 struct ceph_mds_session
*s
;
3764 dout("check_new_map new %u old %u\n",
3765 newmap
->m_epoch
, oldmap
->m_epoch
);
3767 for (i
= 0; i
< oldmap
->m_num_mds
&& i
< mdsc
->max_sessions
; i
++) {
3768 if (!mdsc
->sessions
[i
])
3770 s
= mdsc
->sessions
[i
];
3771 oldstate
= ceph_mdsmap_get_state(oldmap
, i
);
3772 newstate
= ceph_mdsmap_get_state(newmap
, i
);
3774 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3775 i
, ceph_mds_state_name(oldstate
),
3776 ceph_mdsmap_is_laggy(oldmap
, i
) ? " (laggy)" : "",
3777 ceph_mds_state_name(newstate
),
3778 ceph_mdsmap_is_laggy(newmap
, i
) ? " (laggy)" : "",
3779 ceph_session_state_name(s
->s_state
));
3781 if (i
>= newmap
->m_num_mds
) {
3782 /* force close session for stopped mds */
3784 __unregister_session(mdsc
, s
);
3785 __wake_requests(mdsc
, &s
->s_waiting
);
3786 mutex_unlock(&mdsc
->mutex
);
3788 mutex_lock(&s
->s_mutex
);
3789 cleanup_session_requests(mdsc
, s
);
3790 remove_session_caps(s
);
3791 mutex_unlock(&s
->s_mutex
);
3793 ceph_put_mds_session(s
);
3795 mutex_lock(&mdsc
->mutex
);
3796 kick_requests(mdsc
, i
);
3800 if (memcmp(ceph_mdsmap_get_addr(oldmap
, i
),
3801 ceph_mdsmap_get_addr(newmap
, i
),
3802 sizeof(struct ceph_entity_addr
))) {
3804 mutex_unlock(&mdsc
->mutex
);
3805 mutex_lock(&s
->s_mutex
);
3806 mutex_lock(&mdsc
->mutex
);
3807 ceph_con_close(&s
->s_con
);
3808 mutex_unlock(&s
->s_mutex
);
3809 s
->s_state
= CEPH_MDS_SESSION_RESTARTING
;
3810 } else if (oldstate
== newstate
) {
3811 continue; /* nothing new with this mds */
3817 if (s
->s_state
== CEPH_MDS_SESSION_RESTARTING
&&
3818 newstate
>= CEPH_MDS_STATE_RECONNECT
) {
3819 mutex_unlock(&mdsc
->mutex
);
3820 send_mds_reconnect(mdsc
, s
);
3821 mutex_lock(&mdsc
->mutex
);
3825 * kick request on any mds that has gone active.
3827 if (oldstate
< CEPH_MDS_STATE_ACTIVE
&&
3828 newstate
>= CEPH_MDS_STATE_ACTIVE
) {
3829 if (oldstate
!= CEPH_MDS_STATE_CREATING
&&
3830 oldstate
!= CEPH_MDS_STATE_STARTING
)
3831 pr_info("mds%d recovery completed\n", s
->s_mds
);
3832 kick_requests(mdsc
, i
);
3833 ceph_kick_flushing_caps(mdsc
, s
);
3834 wake_up_session_caps(s
, RECONNECT
);
3838 for (i
= 0; i
< newmap
->m_num_mds
&& i
< mdsc
->max_sessions
; i
++) {
3839 s
= mdsc
->sessions
[i
];
3842 if (!ceph_mdsmap_is_laggy(newmap
, i
))
3844 if (s
->s_state
== CEPH_MDS_SESSION_OPEN
||
3845 s
->s_state
== CEPH_MDS_SESSION_HUNG
||
3846 s
->s_state
== CEPH_MDS_SESSION_CLOSING
) {
3847 dout(" connecting to export targets of laggy mds%d\n",
3849 __open_export_target_sessions(mdsc
, s
);
3861 * caller must hold session s_mutex, dentry->d_lock
3863 void __ceph_mdsc_drop_dentry_lease(struct dentry
*dentry
)
3865 struct ceph_dentry_info
*di
= ceph_dentry(dentry
);
3867 ceph_put_mds_session(di
->lease_session
);
3868 di
->lease_session
= NULL
;
3871 static void handle_lease(struct ceph_mds_client
*mdsc
,
3872 struct ceph_mds_session
*session
,
3873 struct ceph_msg
*msg
)
3875 struct super_block
*sb
= mdsc
->fsc
->sb
;
3876 struct inode
*inode
;
3877 struct dentry
*parent
, *dentry
;
3878 struct ceph_dentry_info
*di
;
3879 int mds
= session
->s_mds
;
3880 struct ceph_mds_lease
*h
= msg
->front
.iov_base
;
3882 struct ceph_vino vino
;
3886 dout("handle_lease from mds%d\n", mds
);
3889 if (msg
->front
.iov_len
< sizeof(*h
) + sizeof(u32
))
3891 vino
.ino
= le64_to_cpu(h
->ino
);
3892 vino
.snap
= CEPH_NOSNAP
;
3893 seq
= le32_to_cpu(h
->seq
);
3894 dname
.len
= get_unaligned_le32(h
+ 1);
3895 if (msg
->front
.iov_len
< sizeof(*h
) + sizeof(u32
) + dname
.len
)
3897 dname
.name
= (void *)(h
+ 1) + sizeof(u32
);
3900 inode
= ceph_find_inode(sb
, vino
);
3901 dout("handle_lease %s, ino %llx %p %.*s\n",
3902 ceph_lease_op_name(h
->action
), vino
.ino
, inode
,
3903 dname
.len
, dname
.name
);
3905 mutex_lock(&session
->s_mutex
);
3909 dout("handle_lease no inode %llx\n", vino
.ino
);
3914 parent
= d_find_alias(inode
);
3916 dout("no parent dentry on inode %p\n", inode
);
3918 goto release
; /* hrm... */
3920 dname
.hash
= full_name_hash(parent
, dname
.name
, dname
.len
);
3921 dentry
= d_lookup(parent
, &dname
);
3926 spin_lock(&dentry
->d_lock
);
3927 di
= ceph_dentry(dentry
);
3928 switch (h
->action
) {
3929 case CEPH_MDS_LEASE_REVOKE
:
3930 if (di
->lease_session
== session
) {
3931 if (ceph_seq_cmp(di
->lease_seq
, seq
) > 0)
3932 h
->seq
= cpu_to_le32(di
->lease_seq
);
3933 __ceph_mdsc_drop_dentry_lease(dentry
);
3938 case CEPH_MDS_LEASE_RENEW
:
3939 if (di
->lease_session
== session
&&
3940 di
->lease_gen
== session
->s_cap_gen
&&
3941 di
->lease_renew_from
&&
3942 di
->lease_renew_after
== 0) {
3943 unsigned long duration
=
3944 msecs_to_jiffies(le32_to_cpu(h
->duration_ms
));
3946 di
->lease_seq
= seq
;
3947 di
->time
= di
->lease_renew_from
+ duration
;
3948 di
->lease_renew_after
= di
->lease_renew_from
+
3950 di
->lease_renew_from
= 0;
3954 spin_unlock(&dentry
->d_lock
);
3961 /* let's just reuse the same message */
3962 h
->action
= CEPH_MDS_LEASE_REVOKE_ACK
;
3964 ceph_con_send(&session
->s_con
, msg
);
3967 mutex_unlock(&session
->s_mutex
);
3968 /* avoid calling iput_final() in mds dispatch threads */
3969 ceph_async_iput(inode
);
3973 pr_err("corrupt lease message\n");
3977 void ceph_mdsc_lease_send_msg(struct ceph_mds_session
*session
,
3978 struct dentry
*dentry
, char action
,
3981 struct ceph_msg
*msg
;
3982 struct ceph_mds_lease
*lease
;
3984 int len
= sizeof(*lease
) + sizeof(u32
) + NAME_MAX
;
3986 dout("lease_send_msg identry %p %s to mds%d\n",
3987 dentry
, ceph_lease_op_name(action
), session
->s_mds
);
3989 msg
= ceph_msg_new(CEPH_MSG_CLIENT_LEASE
, len
, GFP_NOFS
, false);
3992 lease
= msg
->front
.iov_base
;
3993 lease
->action
= action
;
3994 lease
->seq
= cpu_to_le32(seq
);
3996 spin_lock(&dentry
->d_lock
);
3997 dir
= d_inode(dentry
->d_parent
);
3998 lease
->ino
= cpu_to_le64(ceph_ino(dir
));
3999 lease
->first
= lease
->last
= cpu_to_le64(ceph_snap(dir
));
4001 put_unaligned_le32(dentry
->d_name
.len
, lease
+ 1);
4002 memcpy((void *)(lease
+ 1) + 4,
4003 dentry
->d_name
.name
, dentry
->d_name
.len
);
4004 spin_unlock(&dentry
->d_lock
);
4006 * if this is a preemptive lease RELEASE, no need to
4007 * flush request stream, since the actual request will
4010 msg
->more_to_follow
= (action
== CEPH_MDS_LEASE_RELEASE
);
4012 ceph_con_send(&session
->s_con
, msg
);
4016 * lock unlock sessions, to wait ongoing session activities
4018 static void lock_unlock_sessions(struct ceph_mds_client
*mdsc
)
4022 mutex_lock(&mdsc
->mutex
);
4023 for (i
= 0; i
< mdsc
->max_sessions
; i
++) {
4024 struct ceph_mds_session
*s
= __ceph_lookup_mds_session(mdsc
, i
);
4027 mutex_unlock(&mdsc
->mutex
);
4028 mutex_lock(&s
->s_mutex
);
4029 mutex_unlock(&s
->s_mutex
);
4030 ceph_put_mds_session(s
);
4031 mutex_lock(&mdsc
->mutex
);
4033 mutex_unlock(&mdsc
->mutex
);
4036 static void maybe_recover_session(struct ceph_mds_client
*mdsc
)
4038 struct ceph_fs_client
*fsc
= mdsc
->fsc
;
4040 if (!ceph_test_mount_opt(fsc
, CLEANRECOVER
))
4043 if (READ_ONCE(fsc
->mount_state
) != CEPH_MOUNT_MOUNTED
)
4046 if (!READ_ONCE(fsc
->blacklisted
))
4049 if (fsc
->last_auto_reconnect
&&
4050 time_before(jiffies
, fsc
->last_auto_reconnect
+ HZ
* 60 * 30))
4053 pr_info("auto reconnect after blacklisted\n");
4054 fsc
->last_auto_reconnect
= jiffies
;
4055 ceph_force_reconnect(fsc
->sb
);
4059 * delayed work -- periodically trim expired leases, renew caps with mds
4061 static void schedule_delayed(struct ceph_mds_client
*mdsc
)
4064 unsigned hz
= round_jiffies_relative(HZ
* delay
);
4065 schedule_delayed_work(&mdsc
->delayed_work
, hz
);
4068 static void delayed_work(struct work_struct
*work
)
4071 struct ceph_mds_client
*mdsc
=
4072 container_of(work
, struct ceph_mds_client
, delayed_work
.work
);
4076 dout("mdsc delayed_work\n");
4078 mutex_lock(&mdsc
->mutex
);
4079 renew_interval
= mdsc
->mdsmap
->m_session_timeout
>> 2;
4080 renew_caps
= time_after_eq(jiffies
, HZ
*renew_interval
+
4081 mdsc
->last_renew_caps
);
4083 mdsc
->last_renew_caps
= jiffies
;
4085 for (i
= 0; i
< mdsc
->max_sessions
; i
++) {
4086 struct ceph_mds_session
*s
= __ceph_lookup_mds_session(mdsc
, i
);
4089 if (s
->s_state
== CEPH_MDS_SESSION_CLOSING
) {
4090 dout("resending session close request for mds%d\n",
4092 request_close_session(mdsc
, s
);
4093 ceph_put_mds_session(s
);
4096 if (s
->s_ttl
&& time_after(jiffies
, s
->s_ttl
)) {
4097 if (s
->s_state
== CEPH_MDS_SESSION_OPEN
) {
4098 s
->s_state
= CEPH_MDS_SESSION_HUNG
;
4099 pr_info("mds%d hung\n", s
->s_mds
);
4102 if (s
->s_state
== CEPH_MDS_SESSION_NEW
||
4103 s
->s_state
== CEPH_MDS_SESSION_RESTARTING
||
4104 s
->s_state
== CEPH_MDS_SESSION_REJECTED
) {
4105 /* this mds is failed or recovering, just wait */
4106 ceph_put_mds_session(s
);
4109 mutex_unlock(&mdsc
->mutex
);
4111 mutex_lock(&s
->s_mutex
);
4113 send_renew_caps(mdsc
, s
);
4115 ceph_con_keepalive(&s
->s_con
);
4116 if (s
->s_state
== CEPH_MDS_SESSION_OPEN
||
4117 s
->s_state
== CEPH_MDS_SESSION_HUNG
)
4118 ceph_send_cap_releases(mdsc
, s
);
4119 mutex_unlock(&s
->s_mutex
);
4120 ceph_put_mds_session(s
);
4122 mutex_lock(&mdsc
->mutex
);
4124 mutex_unlock(&mdsc
->mutex
);
4126 ceph_check_delayed_caps(mdsc
);
4128 ceph_queue_cap_reclaim_work(mdsc
);
4130 ceph_trim_snapid_map(mdsc
);
4132 maybe_recover_session(mdsc
);
4134 schedule_delayed(mdsc
);
4137 int ceph_mdsc_init(struct ceph_fs_client
*fsc
)
4140 struct ceph_mds_client
*mdsc
;
4142 mdsc
= kzalloc(sizeof(struct ceph_mds_client
), GFP_NOFS
);
4146 mutex_init(&mdsc
->mutex
);
4147 mdsc
->mdsmap
= kzalloc(sizeof(*mdsc
->mdsmap
), GFP_NOFS
);
4148 if (!mdsc
->mdsmap
) {
4154 init_completion(&mdsc
->safe_umount_waiters
);
4155 init_waitqueue_head(&mdsc
->session_close_wq
);
4156 INIT_LIST_HEAD(&mdsc
->waiting_for_map
);
4157 mdsc
->sessions
= NULL
;
4158 atomic_set(&mdsc
->num_sessions
, 0);
4159 mdsc
->max_sessions
= 0;
4161 atomic64_set(&mdsc
->quotarealms_count
, 0);
4162 mdsc
->quotarealms_inodes
= RB_ROOT
;
4163 mutex_init(&mdsc
->quotarealms_inodes_mutex
);
4164 mdsc
->last_snap_seq
= 0;
4165 init_rwsem(&mdsc
->snap_rwsem
);
4166 mdsc
->snap_realms
= RB_ROOT
;
4167 INIT_LIST_HEAD(&mdsc
->snap_empty
);
4168 mdsc
->num_snap_realms
= 0;
4169 spin_lock_init(&mdsc
->snap_empty_lock
);
4171 mdsc
->oldest_tid
= 0;
4172 mdsc
->request_tree
= RB_ROOT
;
4173 INIT_DELAYED_WORK(&mdsc
->delayed_work
, delayed_work
);
4174 mdsc
->last_renew_caps
= jiffies
;
4175 INIT_LIST_HEAD(&mdsc
->cap_delay_list
);
4176 INIT_LIST_HEAD(&mdsc
->cap_wait_list
);
4177 spin_lock_init(&mdsc
->cap_delay_lock
);
4178 INIT_LIST_HEAD(&mdsc
->snap_flush_list
);
4179 spin_lock_init(&mdsc
->snap_flush_lock
);
4180 mdsc
->last_cap_flush_tid
= 1;
4181 INIT_LIST_HEAD(&mdsc
->cap_flush_list
);
4182 INIT_LIST_HEAD(&mdsc
->cap_dirty
);
4183 INIT_LIST_HEAD(&mdsc
->cap_dirty_migrating
);
4184 mdsc
->num_cap_flushing
= 0;
4185 spin_lock_init(&mdsc
->cap_dirty_lock
);
4186 init_waitqueue_head(&mdsc
->cap_flushing_wq
);
4187 INIT_WORK(&mdsc
->cap_reclaim_work
, ceph_cap_reclaim_work
);
4188 atomic_set(&mdsc
->cap_reclaim_pending
, 0);
4190 spin_lock_init(&mdsc
->dentry_list_lock
);
4191 INIT_LIST_HEAD(&mdsc
->dentry_leases
);
4192 INIT_LIST_HEAD(&mdsc
->dentry_dir_leases
);
4194 ceph_caps_init(mdsc
);
4195 ceph_adjust_caps_max_min(mdsc
, fsc
->mount_options
);
4197 spin_lock_init(&mdsc
->snapid_map_lock
);
4198 mdsc
->snapid_map_tree
= RB_ROOT
;
4199 INIT_LIST_HEAD(&mdsc
->snapid_map_lru
);
4201 init_rwsem(&mdsc
->pool_perm_rwsem
);
4202 mdsc
->pool_perm_tree
= RB_ROOT
;
4204 strscpy(mdsc
->nodename
, utsname()->nodename
,
4205 sizeof(mdsc
->nodename
));
4210 * Wait for safe replies on open mds requests. If we time out, drop
4211 * all requests from the tree to avoid dangling dentry refs.
4213 static void wait_requests(struct ceph_mds_client
*mdsc
)
4215 struct ceph_options
*opts
= mdsc
->fsc
->client
->options
;
4216 struct ceph_mds_request
*req
;
4218 mutex_lock(&mdsc
->mutex
);
4219 if (__get_oldest_req(mdsc
)) {
4220 mutex_unlock(&mdsc
->mutex
);
4222 dout("wait_requests waiting for requests\n");
4223 wait_for_completion_timeout(&mdsc
->safe_umount_waiters
,
4224 ceph_timeout_jiffies(opts
->mount_timeout
));
4226 /* tear down remaining requests */
4227 mutex_lock(&mdsc
->mutex
);
4228 while ((req
= __get_oldest_req(mdsc
))) {
4229 dout("wait_requests timed out on tid %llu\n",
4231 list_del_init(&req
->r_wait
);
4232 __unregister_request(mdsc
, req
);
4235 mutex_unlock(&mdsc
->mutex
);
4236 dout("wait_requests done\n");
4240 * called before mount is ro, and before dentries are torn down.
4241 * (hmm, does this still race with new lookups?)
4243 void ceph_mdsc_pre_umount(struct ceph_mds_client
*mdsc
)
4245 dout("pre_umount\n");
4248 lock_unlock_sessions(mdsc
);
4249 ceph_flush_dirty_caps(mdsc
);
4250 wait_requests(mdsc
);
4253 * wait for reply handlers to drop their request refs and
4254 * their inode/dcache refs
4258 ceph_cleanup_quotarealms_inodes(mdsc
);
4262 * wait for all write mds requests to flush.
4264 static void wait_unsafe_requests(struct ceph_mds_client
*mdsc
, u64 want_tid
)
4266 struct ceph_mds_request
*req
= NULL
, *nextreq
;
4269 mutex_lock(&mdsc
->mutex
);
4270 dout("wait_unsafe_requests want %lld\n", want_tid
);
4272 req
= __get_oldest_req(mdsc
);
4273 while (req
&& req
->r_tid
<= want_tid
) {
4274 /* find next request */
4275 n
= rb_next(&req
->r_node
);
4277 nextreq
= rb_entry(n
, struct ceph_mds_request
, r_node
);
4280 if (req
->r_op
!= CEPH_MDS_OP_SETFILELOCK
&&
4281 (req
->r_op
& CEPH_MDS_OP_WRITE
)) {
4283 ceph_mdsc_get_request(req
);
4285 ceph_mdsc_get_request(nextreq
);
4286 mutex_unlock(&mdsc
->mutex
);
4287 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
4288 req
->r_tid
, want_tid
);
4289 wait_for_completion(&req
->r_safe_completion
);
4290 mutex_lock(&mdsc
->mutex
);
4291 ceph_mdsc_put_request(req
);
4293 break; /* next dne before, so we're done! */
4294 if (RB_EMPTY_NODE(&nextreq
->r_node
)) {
4295 /* next request was removed from tree */
4296 ceph_mdsc_put_request(nextreq
);
4299 ceph_mdsc_put_request(nextreq
); /* won't go away */
4303 mutex_unlock(&mdsc
->mutex
);
4304 dout("wait_unsafe_requests done\n");
4307 void ceph_mdsc_sync(struct ceph_mds_client
*mdsc
)
4309 u64 want_tid
, want_flush
;
4311 if (READ_ONCE(mdsc
->fsc
->mount_state
) == CEPH_MOUNT_SHUTDOWN
)
4315 mutex_lock(&mdsc
->mutex
);
4316 want_tid
= mdsc
->last_tid
;
4317 mutex_unlock(&mdsc
->mutex
);
4319 ceph_flush_dirty_caps(mdsc
);
4320 spin_lock(&mdsc
->cap_dirty_lock
);
4321 want_flush
= mdsc
->last_cap_flush_tid
;
4322 if (!list_empty(&mdsc
->cap_flush_list
)) {
4323 struct ceph_cap_flush
*cf
=
4324 list_last_entry(&mdsc
->cap_flush_list
,
4325 struct ceph_cap_flush
, g_list
);
4328 spin_unlock(&mdsc
->cap_dirty_lock
);
4330 dout("sync want tid %lld flush_seq %lld\n",
4331 want_tid
, want_flush
);
4333 wait_unsafe_requests(mdsc
, want_tid
);
4334 wait_caps_flush(mdsc
, want_flush
);
4338 * true if all sessions are closed, or we force unmount
4340 static bool done_closing_sessions(struct ceph_mds_client
*mdsc
, int skipped
)
4342 if (READ_ONCE(mdsc
->fsc
->mount_state
) == CEPH_MOUNT_SHUTDOWN
)
4344 return atomic_read(&mdsc
->num_sessions
) <= skipped
;
4348 * called after sb is ro.
4350 void ceph_mdsc_close_sessions(struct ceph_mds_client
*mdsc
)
4352 struct ceph_options
*opts
= mdsc
->fsc
->client
->options
;
4353 struct ceph_mds_session
*session
;
4357 dout("close_sessions\n");
4359 /* close sessions */
4360 mutex_lock(&mdsc
->mutex
);
4361 for (i
= 0; i
< mdsc
->max_sessions
; i
++) {
4362 session
= __ceph_lookup_mds_session(mdsc
, i
);
4365 mutex_unlock(&mdsc
->mutex
);
4366 mutex_lock(&session
->s_mutex
);
4367 if (__close_session(mdsc
, session
) <= 0)
4369 mutex_unlock(&session
->s_mutex
);
4370 ceph_put_mds_session(session
);
4371 mutex_lock(&mdsc
->mutex
);
4373 mutex_unlock(&mdsc
->mutex
);
4375 dout("waiting for sessions to close\n");
4376 wait_event_timeout(mdsc
->session_close_wq
,
4377 done_closing_sessions(mdsc
, skipped
),
4378 ceph_timeout_jiffies(opts
->mount_timeout
));
4380 /* tear down remaining sessions */
4381 mutex_lock(&mdsc
->mutex
);
4382 for (i
= 0; i
< mdsc
->max_sessions
; i
++) {
4383 if (mdsc
->sessions
[i
]) {
4384 session
= get_session(mdsc
->sessions
[i
]);
4385 __unregister_session(mdsc
, session
);
4386 mutex_unlock(&mdsc
->mutex
);
4387 mutex_lock(&session
->s_mutex
);
4388 remove_session_caps(session
);
4389 mutex_unlock(&session
->s_mutex
);
4390 ceph_put_mds_session(session
);
4391 mutex_lock(&mdsc
->mutex
);
4394 WARN_ON(!list_empty(&mdsc
->cap_delay_list
));
4395 mutex_unlock(&mdsc
->mutex
);
4397 ceph_cleanup_snapid_map(mdsc
);
4398 ceph_cleanup_empty_realms(mdsc
);
4400 cancel_work_sync(&mdsc
->cap_reclaim_work
);
4401 cancel_delayed_work_sync(&mdsc
->delayed_work
); /* cancel timer */
4406 void ceph_mdsc_force_umount(struct ceph_mds_client
*mdsc
)
4408 struct ceph_mds_session
*session
;
4411 dout("force umount\n");
4413 mutex_lock(&mdsc
->mutex
);
4414 for (mds
= 0; mds
< mdsc
->max_sessions
; mds
++) {
4415 session
= __ceph_lookup_mds_session(mdsc
, mds
);
4419 if (session
->s_state
== CEPH_MDS_SESSION_REJECTED
)
4420 __unregister_session(mdsc
, session
);
4421 __wake_requests(mdsc
, &session
->s_waiting
);
4422 mutex_unlock(&mdsc
->mutex
);
4424 mutex_lock(&session
->s_mutex
);
4425 __close_session(mdsc
, session
);
4426 if (session
->s_state
== CEPH_MDS_SESSION_CLOSING
) {
4427 cleanup_session_requests(mdsc
, session
);
4428 remove_session_caps(session
);
4430 mutex_unlock(&session
->s_mutex
);
4431 ceph_put_mds_session(session
);
4433 mutex_lock(&mdsc
->mutex
);
4434 kick_requests(mdsc
, mds
);
4436 __wake_requests(mdsc
, &mdsc
->waiting_for_map
);
4437 mutex_unlock(&mdsc
->mutex
);
4440 static void ceph_mdsc_stop(struct ceph_mds_client
*mdsc
)
4443 cancel_delayed_work_sync(&mdsc
->delayed_work
); /* cancel timer */
4445 ceph_mdsmap_destroy(mdsc
->mdsmap
);
4446 kfree(mdsc
->sessions
);
4447 ceph_caps_finalize(mdsc
);
4448 ceph_pool_perm_destroy(mdsc
);
4451 void ceph_mdsc_destroy(struct ceph_fs_client
*fsc
)
4453 struct ceph_mds_client
*mdsc
= fsc
->mdsc
;
4454 dout("mdsc_destroy %p\n", mdsc
);
4459 /* flush out any connection work with references to us */
4462 ceph_mdsc_stop(mdsc
);
4466 dout("mdsc_destroy %p done\n", mdsc
);
4469 void ceph_mdsc_handle_fsmap(struct ceph_mds_client
*mdsc
, struct ceph_msg
*msg
)
4471 struct ceph_fs_client
*fsc
= mdsc
->fsc
;
4472 const char *mds_namespace
= fsc
->mount_options
->mds_namespace
;
4473 void *p
= msg
->front
.iov_base
;
4474 void *end
= p
+ msg
->front
.iov_len
;
4478 u32 mount_fscid
= (u32
)-1;
4479 u8 struct_v
, struct_cv
;
4482 ceph_decode_need(&p
, end
, sizeof(u32
), bad
);
4483 epoch
= ceph_decode_32(&p
);
4485 dout("handle_fsmap epoch %u\n", epoch
);
4487 ceph_decode_need(&p
, end
, 2 + sizeof(u32
), bad
);
4488 struct_v
= ceph_decode_8(&p
);
4489 struct_cv
= ceph_decode_8(&p
);
4490 map_len
= ceph_decode_32(&p
);
4492 ceph_decode_need(&p
, end
, sizeof(u32
) * 3, bad
);
4493 p
+= sizeof(u32
) * 2; /* skip epoch and legacy_client_fscid */
4495 num_fs
= ceph_decode_32(&p
);
4496 while (num_fs
-- > 0) {
4497 void *info_p
, *info_end
;
4502 ceph_decode_need(&p
, end
, 2 + sizeof(u32
), bad
);
4503 info_v
= ceph_decode_8(&p
);
4504 info_cv
= ceph_decode_8(&p
);
4505 info_len
= ceph_decode_32(&p
);
4506 ceph_decode_need(&p
, end
, info_len
, bad
);
4508 info_end
= p
+ info_len
;
4511 ceph_decode_need(&info_p
, info_end
, sizeof(u32
) * 2, bad
);
4512 fscid
= ceph_decode_32(&info_p
);
4513 namelen
= ceph_decode_32(&info_p
);
4514 ceph_decode_need(&info_p
, info_end
, namelen
, bad
);
4516 if (mds_namespace
&&
4517 strlen(mds_namespace
) == namelen
&&
4518 !strncmp(mds_namespace
, (char *)info_p
, namelen
)) {
4519 mount_fscid
= fscid
;
4524 ceph_monc_got_map(&fsc
->client
->monc
, CEPH_SUB_FSMAP
, epoch
);
4525 if (mount_fscid
!= (u32
)-1) {
4526 fsc
->client
->monc
.fs_cluster_id
= mount_fscid
;
4527 ceph_monc_want_map(&fsc
->client
->monc
, CEPH_SUB_MDSMAP
,
4529 ceph_monc_renew_subs(&fsc
->client
->monc
);
4537 pr_err("error decoding fsmap\n");
4539 mutex_lock(&mdsc
->mutex
);
4540 mdsc
->mdsmap_err
= err
;
4541 __wake_requests(mdsc
, &mdsc
->waiting_for_map
);
4542 mutex_unlock(&mdsc
->mutex
);
4546 * handle mds map update.
4548 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client
*mdsc
, struct ceph_msg
*msg
)
4552 void *p
= msg
->front
.iov_base
;
4553 void *end
= p
+ msg
->front
.iov_len
;
4554 struct ceph_mdsmap
*newmap
, *oldmap
;
4555 struct ceph_fsid fsid
;
4558 ceph_decode_need(&p
, end
, sizeof(fsid
)+2*sizeof(u32
), bad
);
4559 ceph_decode_copy(&p
, &fsid
, sizeof(fsid
));
4560 if (ceph_check_fsid(mdsc
->fsc
->client
, &fsid
) < 0)
4562 epoch
= ceph_decode_32(&p
);
4563 maplen
= ceph_decode_32(&p
);
4564 dout("handle_map epoch %u len %d\n", epoch
, (int)maplen
);
4566 /* do we need it? */
4567 mutex_lock(&mdsc
->mutex
);
4568 if (mdsc
->mdsmap
&& epoch
<= mdsc
->mdsmap
->m_epoch
) {
4569 dout("handle_map epoch %u <= our %u\n",
4570 epoch
, mdsc
->mdsmap
->m_epoch
);
4571 mutex_unlock(&mdsc
->mutex
);
4575 newmap
= ceph_mdsmap_decode(&p
, end
);
4576 if (IS_ERR(newmap
)) {
4577 err
= PTR_ERR(newmap
);
4581 /* swap into place */
4583 oldmap
= mdsc
->mdsmap
;
4584 mdsc
->mdsmap
= newmap
;
4585 check_new_map(mdsc
, newmap
, oldmap
);
4586 ceph_mdsmap_destroy(oldmap
);
4588 mdsc
->mdsmap
= newmap
; /* first mds map */
4590 mdsc
->fsc
->max_file_size
= min((loff_t
)mdsc
->mdsmap
->m_max_file_size
,
4593 __wake_requests(mdsc
, &mdsc
->waiting_for_map
);
4594 ceph_monc_got_map(&mdsc
->fsc
->client
->monc
, CEPH_SUB_MDSMAP
,
4595 mdsc
->mdsmap
->m_epoch
);
4597 mutex_unlock(&mdsc
->mutex
);
4598 schedule_delayed(mdsc
);
4602 mutex_unlock(&mdsc
->mutex
);
4604 pr_err("error decoding mdsmap %d\n", err
);
4608 static struct ceph_connection
*con_get(struct ceph_connection
*con
)
4610 struct ceph_mds_session
*s
= con
->private;
4612 if (get_session(s
)) {
4613 dout("mdsc con_get %p ok (%d)\n", s
, refcount_read(&s
->s_ref
));
4616 dout("mdsc con_get %p FAIL\n", s
);
4620 static void con_put(struct ceph_connection
*con
)
4622 struct ceph_mds_session
*s
= con
->private;
4624 dout("mdsc con_put %p (%d)\n", s
, refcount_read(&s
->s_ref
) - 1);
4625 ceph_put_mds_session(s
);
4629 * if the client is unresponsive for long enough, the mds will kill
4630 * the session entirely.
4632 static void peer_reset(struct ceph_connection
*con
)
4634 struct ceph_mds_session
*s
= con
->private;
4635 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4637 pr_warn("mds%d closed our session\n", s
->s_mds
);
4638 send_mds_reconnect(mdsc
, s
);
4641 static void dispatch(struct ceph_connection
*con
, struct ceph_msg
*msg
)
4643 struct ceph_mds_session
*s
= con
->private;
4644 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4645 int type
= le16_to_cpu(msg
->hdr
.type
);
4647 mutex_lock(&mdsc
->mutex
);
4648 if (__verify_registered_session(mdsc
, s
) < 0) {
4649 mutex_unlock(&mdsc
->mutex
);
4652 mutex_unlock(&mdsc
->mutex
);
4655 case CEPH_MSG_MDS_MAP
:
4656 ceph_mdsc_handle_mdsmap(mdsc
, msg
);
4658 case CEPH_MSG_FS_MAP_USER
:
4659 ceph_mdsc_handle_fsmap(mdsc
, msg
);
4661 case CEPH_MSG_CLIENT_SESSION
:
4662 handle_session(s
, msg
);
4664 case CEPH_MSG_CLIENT_REPLY
:
4665 handle_reply(s
, msg
);
4667 case CEPH_MSG_CLIENT_REQUEST_FORWARD
:
4668 handle_forward(mdsc
, s
, msg
);
4670 case CEPH_MSG_CLIENT_CAPS
:
4671 ceph_handle_caps(s
, msg
);
4673 case CEPH_MSG_CLIENT_SNAP
:
4674 ceph_handle_snap(mdsc
, s
, msg
);
4676 case CEPH_MSG_CLIENT_LEASE
:
4677 handle_lease(mdsc
, s
, msg
);
4679 case CEPH_MSG_CLIENT_QUOTA
:
4680 ceph_handle_quota(mdsc
, s
, msg
);
4684 pr_err("received unknown message type %d %s\n", type
,
4685 ceph_msg_type_name(type
));
4696 * Note: returned pointer is the address of a structure that's
4697 * managed separately. Caller must *not* attempt to free it.
4699 static struct ceph_auth_handshake
*get_authorizer(struct ceph_connection
*con
,
4700 int *proto
, int force_new
)
4702 struct ceph_mds_session
*s
= con
->private;
4703 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4704 struct ceph_auth_client
*ac
= mdsc
->fsc
->client
->monc
.auth
;
4705 struct ceph_auth_handshake
*auth
= &s
->s_auth
;
4707 if (force_new
&& auth
->authorizer
) {
4708 ceph_auth_destroy_authorizer(auth
->authorizer
);
4709 auth
->authorizer
= NULL
;
4711 if (!auth
->authorizer
) {
4712 int ret
= ceph_auth_create_authorizer(ac
, CEPH_ENTITY_TYPE_MDS
,
4715 return ERR_PTR(ret
);
4717 int ret
= ceph_auth_update_authorizer(ac
, CEPH_ENTITY_TYPE_MDS
,
4720 return ERR_PTR(ret
);
4722 *proto
= ac
->protocol
;
4727 static int add_authorizer_challenge(struct ceph_connection
*con
,
4728 void *challenge_buf
, int challenge_buf_len
)
4730 struct ceph_mds_session
*s
= con
->private;
4731 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4732 struct ceph_auth_client
*ac
= mdsc
->fsc
->client
->monc
.auth
;
4734 return ceph_auth_add_authorizer_challenge(ac
, s
->s_auth
.authorizer
,
4735 challenge_buf
, challenge_buf_len
);
4738 static int verify_authorizer_reply(struct ceph_connection
*con
)
4740 struct ceph_mds_session
*s
= con
->private;
4741 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4742 struct ceph_auth_client
*ac
= mdsc
->fsc
->client
->monc
.auth
;
4744 return ceph_auth_verify_authorizer_reply(ac
, s
->s_auth
.authorizer
);
4747 static int invalidate_authorizer(struct ceph_connection
*con
)
4749 struct ceph_mds_session
*s
= con
->private;
4750 struct ceph_mds_client
*mdsc
= s
->s_mdsc
;
4751 struct ceph_auth_client
*ac
= mdsc
->fsc
->client
->monc
.auth
;
4753 ceph_auth_invalidate_authorizer(ac
, CEPH_ENTITY_TYPE_MDS
);
4755 return ceph_monc_validate_auth(&mdsc
->fsc
->client
->monc
);
4758 static struct ceph_msg
*mds_alloc_msg(struct ceph_connection
*con
,
4759 struct ceph_msg_header
*hdr
, int *skip
)
4761 struct ceph_msg
*msg
;
4762 int type
= (int) le16_to_cpu(hdr
->type
);
4763 int front_len
= (int) le32_to_cpu(hdr
->front_len
);
4769 msg
= ceph_msg_new(type
, front_len
, GFP_NOFS
, false);
4771 pr_err("unable to allocate msg type %d len %d\n",
4779 static int mds_sign_message(struct ceph_msg
*msg
)
4781 struct ceph_mds_session
*s
= msg
->con
->private;
4782 struct ceph_auth_handshake
*auth
= &s
->s_auth
;
4784 return ceph_auth_sign_message(auth
, msg
);
4787 static int mds_check_message_signature(struct ceph_msg
*msg
)
4789 struct ceph_mds_session
*s
= msg
->con
->private;
4790 struct ceph_auth_handshake
*auth
= &s
->s_auth
;
4792 return ceph_auth_check_message_signature(auth
, msg
);
4795 static const struct ceph_connection_operations mds_con_ops
= {
4798 .dispatch
= dispatch
,
4799 .get_authorizer
= get_authorizer
,
4800 .add_authorizer_challenge
= add_authorizer_challenge
,
4801 .verify_authorizer_reply
= verify_authorizer_reply
,
4802 .invalidate_authorizer
= invalidate_authorizer
,
4803 .peer_reset
= peer_reset
,
4804 .alloc_msg
= mds_alloc_msg
,
4805 .sign_message
= mds_sign_message
,
4806 .check_message_signature
= mds_check_message_signature
,